block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7    Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use futures::future::{Fuse, FusedFuture, join};
12use futures::stream::FuturesUnordered;
13use futures::{FutureExt, StreamExt, select_biased};
14use std::borrow::Cow;
15use std::collections::VecDeque;
16use std::future::{Future, poll_fn};
17use std::mem::MaybeUninit;
18use std::pin::pin;
19use std::sync::{Arc, OnceLock};
20use std::task::{Poll, ready};
21use storage_device::buffer::Buffer;
22use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
23use {
24    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
25    fuchsia_async as fasync,
26};
27
28pub trait Interface: Send + Sync + Unpin + 'static {
29    /// Runs `stream` to completion.
30    ///
31    /// Implementors can override this method if they want to create a passthrough session instead
32    /// (and can use `[PassthroughSession]` below to do so).  See
33    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
34    ///
35    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
36    /// will not be called, and can be stubbed out:
37    ///   - on_attach_vmo
38    ///   - on_detach_vmo
39    ///   - read
40    ///   - write
41    ///   - flush
42    ///   - trim
43    fn open_session(
44        &self,
45        session_manager: Arc<SessionManager<Self>>,
46        stream: fblock::SessionRequestStream,
47        offset_map: OffsetMap,
48        block_size: u32,
49    ) -> impl Future<Output = Result<(), Error>> + Send {
50        // By default, serve the session rather than forwarding/proxying it.
51        session_manager.serve_session(stream, offset_map, block_size)
52    }
53
54    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
55    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
56    /// value (as, say, a key into a HashMap).
57    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
58        async { Ok(()) }
59    }
60
61    /// Called whenever a VMO is detached.
62    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
63
64    /// Called to get block/partition information.
65    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
66
67    /// Called for a request to read bytes.
68    fn read(
69        &self,
70        device_block_offset: u64,
71        block_count: u32,
72        vmo: &Arc<zx::Vmo>,
73        vmo_offset: u64, // *bytes* not blocks
74        opts: ReadOptions,
75        trace_flow_id: TraceFlowId,
76    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
77
78    /// Called for a request to write bytes. WriteOptions::PRE_BARRIER should never be seen
79    /// for this call. See barrier().
80    fn write(
81        &self,
82        device_block_offset: u64,
83        block_count: u32,
84        vmo: &Arc<zx::Vmo>,
85        vmo_offset: u64, // *bytes* not blocks
86        opts: WriteOptions,
87        trace_flow_id: TraceFlowId,
88    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
89
90    /// Indicates that all previously completed write operations should be made persistent prior to
91    /// any write operations issued after this call. It is not specified how the barrier affects
92    /// currently in-flight write operations. This corresponds to the use of the PRE_BARRIER flag
93    /// that can be used on a write request. Requests with that flag will be converted into
94    /// separate barrier and write calls, and the write call above will not ever include the
95    /// WriteOptions::PRE_BARRIER within opts.
96    fn barrier(&self) -> Result<(), zx::Status>;
97
98    /// Called to flush the device.
99    fn flush(
100        &self,
101        trace_flow_id: TraceFlowId,
102    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
103
104    /// Called to trim a region.
105    fn trim(
106        &self,
107        device_block_offset: u64,
108        block_count: u32,
109        trace_flow_id: TraceFlowId,
110    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
111
112    /// Called to handle the GetVolumeInfo FIDL call.
113    fn get_volume_info(
114        &self,
115    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
116    {
117        async { Err(zx::Status::NOT_SUPPORTED) }
118    }
119
120    /// Called to handle the QuerySlices FIDL call.
121    fn query_slices(
122        &self,
123        _start_slices: &[u64],
124    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
125        async { Err(zx::Status::NOT_SUPPORTED) }
126    }
127
128    /// Called to handle the Extend FIDL call.
129    fn extend(
130        &self,
131        _start_slice: u64,
132        _slice_count: u64,
133    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
134        async { Err(zx::Status::NOT_SUPPORTED) }
135    }
136
137    /// Called to handle the Shrink FIDL call.
138    fn shrink(
139        &self,
140        _start_slice: u64,
141        _slice_count: u64,
142    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
143        async { Err(zx::Status::NOT_SUPPORTED) }
144    }
145}
146
147/// A helper object to run a passthrough (proxy) session.
148pub struct PassthroughSession(fblock::SessionProxy);
149
150impl PassthroughSession {
151    pub fn new(proxy: fblock::SessionProxy) -> Self {
152        Self(proxy)
153    }
154
155    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
156        match request {
157            fblock::SessionRequest::GetFifo { responder } => {
158                responder.send(self.0.get_fifo().await?)?;
159            }
160            fblock::SessionRequest::AttachVmo { vmo, responder } => {
161                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
162            }
163            fblock::SessionRequest::Close { responder } => {
164                responder.send(self.0.close().await?)?;
165            }
166        }
167        Ok(())
168    }
169
170    /// Runs `stream` until completion.
171    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
172        while let Some(Ok(request)) = stream.next().await {
173            if let Err(error) = self.handle_request(request).await {
174                log::warn!(error:?; "FIDL error");
175            }
176        }
177        Ok(())
178    }
179}
180
181pub struct SessionManager<I: Interface + ?Sized> {
182    interface: Arc<I>,
183    active_requests: ActiveRequests<usize>,
184
185    // NOTE: This must be dropped *after* `active_requests` because we store `Buffer<'_>` with an
186    // erased ('static) lifetime in `ActiveRequest`.
187    buffer_allocator: OnceLock<BufferAllocator>,
188}
189
190impl<I: Interface + ?Sized> Drop for SessionManager<I> {
191    fn drop(&mut self) {
192        if let Some(allocator) = self.buffer_allocator.get() {
193            self.interface.on_detach_vmo(allocator.buffer_source().vmo());
194        }
195    }
196}
197
198impl<I: Interface + ?Sized> SessionManager<I> {
199    pub fn new(interface: Arc<I>) -> Self {
200        Self {
201            interface,
202            active_requests: ActiveRequests::default(),
203            buffer_allocator: OnceLock::new(),
204        }
205    }
206
207    pub fn interface(&self) -> &I {
208        self.interface.as_ref()
209    }
210
211    /// Runs `stream` until completion.
212    pub async fn serve_session(
213        self: Arc<Self>,
214        stream: fblock::SessionRequestStream,
215        offset_map: OffsetMap,
216        block_size: u32,
217    ) -> Result<(), Error> {
218        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
219        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
220        let mut stream = stream.fuse();
221        let scope = fasync::Scope::new();
222        let helper = session.helper.clone();
223        let mut fifo_task = scope
224            .spawn(async move {
225                if let Err(status) = session.run_fifo(fifo).await {
226                    if status != zx::Status::PEER_CLOSED {
227                        log::error!(status:?; "FIFO error");
228                    }
229                }
230            })
231            .fuse();
232
233        // Make sure we detach VMOs when we go out of scope.
234        scopeguard::defer! {
235            for (_, (vmo, _)) in helper.take_vmos() {
236                self.interface.on_detach_vmo(&vmo);
237            }
238        }
239
240        loop {
241            futures::select! {
242                maybe_req = stream.next() => {
243                    if let Some(req) = maybe_req {
244                        helper.handle_request(req?).await?;
245                    } else {
246                        break;
247                    }
248                }
249                _ = fifo_task => break,
250            }
251        }
252
253        Ok(())
254    }
255}
256
257struct Session<I: Interface + ?Sized> {
258    interface: Arc<I>,
259    helper: Arc<SessionHelper<SessionManager<I>>>,
260}
261
262impl<I: Interface + ?Sized> Session<I> {
263    // A task loop for receiving and responding to FIFO requests.
264    async fn run_fifo(
265        &self,
266        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
267    ) -> Result<(), zx::Status> {
268        scopeguard::defer! {
269            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
270        }
271
272        // The FIFO has to be processed by a single task due to implementation constraints on
273        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
274        // writes can happen in batch, and request processing is parallel.
275        //
276        // The general flow is:
277        //  - Read messages from the FIFO, write into `requests`.
278        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
279        //    which will eventually write them into `responses`.
280        //  - Read `responses` and write out to the FIFO.
281        let mut fifo = fasync::Fifo::from_fifo(fifo);
282        let (mut reader, mut writer) = fifo.async_io();
283        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
284        let active_requests = &self.helper.session_manager.active_requests;
285        let mut active_request_futures = FuturesUnordered::new();
286        let mut responses = Vec::new();
287
288        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
289        // up requests that need to be mapped.  This will serialise how mappings occur which might
290        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
291        // optimise it.
292        let mut map_future = pin!(Fuse::terminated());
293        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
294
295        loop {
296            let new_requests = {
297                // We provide some flow control by limiting how many in-flight requests we will
298                // allow.
299                let pending_requests = active_request_futures.len() + responses.len();
300                let count = requests.len().saturating_sub(pending_requests);
301                let mut receive_requests = pin!(if count == 0 {
302                    Fuse::terminated()
303                } else {
304                    reader.read_entries(&mut requests[..count]).fuse()
305                });
306                let mut send_responses = pin!(if responses.is_empty() {
307                    Fuse::terminated()
308                } else {
309                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
310                        match ready!(writer.try_write(cx, &responses[..])) {
311                            Ok(written) => {
312                                responses.drain(..written);
313                                Poll::Ready(Ok(()))
314                            }
315                            Err(status) => Poll::Ready(Err(status)),
316                        }
317                    })
318                    .fuse()
319                });
320
321                // Order is important here.  We want to prioritize sending results on the FIFO and
322                // processing FIFO messages over receiving new ones, to provide flow control.
323                select_biased!(
324                    res = send_responses => {
325                        res?;
326                        0
327                    },
328                    response = active_request_futures.select_next_some() => {
329                        responses.extend(response);
330                        0
331                    }
332                    result = map_future => {
333                        match result {
334                            Ok((request, remainder, commit_decompression_buffers)) => {
335                                active_request_futures.push(self.process_fifo_request(
336                                    request,
337                                    commit_decompression_buffers
338                                ));
339                                if let Some(remainder) = remainder {
340                                    map_future.set(
341                                        self.map_request_or_get_response(remainder).fuse()
342                                    );
343                                }
344                            }
345                            Err(response) => responses.extend(response),
346                        }
347                        if map_future.is_terminated() {
348                            if let Some(request) = pending_mappings.pop_front() {
349                                map_future.set(self.map_request_or_get_response(request).fuse());
350                            }
351                        }
352                        0
353                    }
354                    count = receive_requests => {
355                        count?
356                    }
357                )
358            };
359
360            // NB: It is very important that there are no `await`s for the rest of the loop body, as
361            // otherwise active requests might become stalled.
362            for request in &mut requests[..new_requests] {
363                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
364                    request.assume_init_mut()
365                }) {
366                    Ok(DecodedRequest {
367                        operation: Operation::CloseVmo, vmo, request_id, ..
368                    }) => {
369                        if let Some(vmo) = vmo {
370                            self.interface.on_detach_vmo(vmo.as_ref());
371                        }
372                        responses.extend(
373                            active_requests
374                                .complete_and_take_response(request_id, zx::Status::OK)
375                                .map(|(_, response)| response),
376                        );
377                    }
378                    Ok(mut request) => {
379                        // If `request` contains WriteOptions::PRE_BARRIER, issue the barrier
380                        // prior to mapping the request. If the barrier fails, we can
381                        // immediately respond to the request without splitting it up.
382                        if let Err(status) = self.maybe_issue_barrier(&mut request) {
383                            let response = self
384                                .helper
385                                .session_manager
386                                .active_requests
387                                .complete_and_take_response(request.request_id, status)
388                                .map(|(_, r)| r);
389                            responses.extend(response);
390                        } else if map_future.is_terminated() {
391                            map_future.set(self.map_request_or_get_response(request).fuse());
392                        } else {
393                            pending_mappings.push_back(request);
394                        }
395                    }
396                    Err(None) => {}
397                    Err(Some(response)) => responses.push(response),
398                }
399            }
400        }
401    }
402
403    fn maybe_issue_barrier(&self, request: &mut DecodedRequest) -> Result<(), zx::Status> {
404        if let Operation::Write {
405            device_block_offset: _,
406            block_count: _,
407            _unused,
408            options,
409            vmo_offset: _,
410            ..
411        } = &mut request.operation
412        {
413            if options.flags.contains(WriteFlags::PRE_BARRIER) {
414                self.interface.barrier()?;
415                options.flags &= !WriteFlags::PRE_BARRIER;
416            }
417        }
418        Ok(())
419    }
420
421    async fn map_request_or_get_response(
422        &self,
423        request: DecodedRequest,
424    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
425        let request_id = request.request_id;
426        self.map_request(request).await.map_err(|status| {
427            self.helper
428                .session_manager
429                .active_requests
430                .complete_and_take_response(request_id, status)
431                .map(|(_, r)| r)
432        })
433    }
434
435    // NOTE: The implementation of this currently assumes that we are only processing a single map
436    // request at a time.
437    async fn map_request(
438        &self,
439        mut request: DecodedRequest,
440    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
441        let mut active_requests;
442        let active_request;
443        let mut commit_decompression_buffers = false;
444        // Handle decompressed read operations by turning them into regular read operations.
445        match request.operation {
446            Operation::StartDecompressedRead {
447                required_buffer_size,
448                device_block_offset,
449                block_count,
450                options,
451            } => {
452                let allocator = match self.helper.session_manager.buffer_allocator.get() {
453                    Some(a) => a,
454                    None => {
455                        // This isn't racy because there should only be one `map_request` future
456                        // running at any one time.
457                        let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
458                        self.interface.on_attach_vmo(&source.vmo()).await?;
459                        let allocator = BufferAllocator::new(
460                            std::cmp::max(
461                                self.helper.block_size as usize,
462                                zx::system_get_page_size() as usize,
463                            ),
464                            source,
465                        );
466                        self.helper.session_manager.buffer_allocator.set(allocator).unwrap();
467                        self.helper.session_manager.buffer_allocator.get().unwrap()
468                    }
469                };
470
471                if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
472                    return Err(zx::Status::OUT_OF_RANGE);
473                }
474
475                let buffer = allocator.allocate_buffer(required_buffer_size).await;
476                let vmo_offset = buffer.range().start as u64;
477
478                // # Safety
479                //
480                // See below.
481                unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
482                    unsafe { std::mem::transmute(buffer) }
483                }
484
485                active_requests = self.helper.session_manager.active_requests.0.lock();
486                active_request = &mut active_requests.requests[request.request_id.0];
487
488                // SAFETY: We guarantee that `buffer_allocator` is dropped after `active_requests`,
489                // so this should be safe.
490                active_request.decompression_info.as_mut().unwrap().buffer =
491                    Some(unsafe { remove_lifetime(buffer) });
492
493                request.operation = Operation::Read {
494                    device_block_offset,
495                    block_count,
496                    _unused: 0,
497                    vmo_offset,
498                    options,
499                };
500                request.vmo = Some(allocator.buffer_source().vmo().clone());
501
502                commit_decompression_buffers = true;
503            }
504            Operation::ContinueDecompressedRead {
505                offset,
506                device_block_offset,
507                block_count,
508                options,
509            } => {
510                active_requests = self.helper.session_manager.active_requests.0.lock();
511                active_request = &mut active_requests.requests[request.request_id.0];
512
513                let buffer =
514                    active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
515
516                // Make sure this read won't overflow our buffer.
517                if offset >= buffer.len() as u64
518                    || buffer.len() as u64 - offset
519                        < block_count as u64 * self.helper.block_size as u64
520                {
521                    return Err(zx::Status::OUT_OF_RANGE);
522                }
523
524                request.operation = Operation::Read {
525                    device_block_offset,
526                    block_count,
527                    _unused: 0,
528                    vmo_offset: buffer.range().start as u64 + offset,
529                    options,
530                };
531
532                let allocator = self.helper.session_manager.buffer_allocator.get().unwrap();
533                request.vmo = Some(allocator.buffer_source().vmo().clone());
534            }
535            _ => {
536                active_requests = self.helper.session_manager.active_requests.0.lock();
537                active_request = &mut active_requests.requests[request.request_id.0];
538            }
539        }
540
541        self.helper
542            .map_request(request, active_request)
543            .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
544    }
545
546    /// Processes a fifo request.
547    async fn process_fifo_request(
548        &self,
549        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
550        commit_decompression_buffers: bool,
551    ) -> Option<BlockFifoResponse> {
552        let result = match operation {
553            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
554                join(
555                    self.interface.read(
556                        device_block_offset,
557                        block_count,
558                        vmo.as_ref().unwrap(),
559                        vmo_offset,
560                        options,
561                        trace_flow_id,
562                    ),
563                    async {
564                        if commit_decompression_buffers {
565                            let (target_slice, buffer_slice, buffer_range) = {
566                                let active_request =
567                                    self.helper.session_manager.active_requests.request(request_id);
568                                let info = active_request.decompression_info.as_ref().unwrap();
569                                (
570                                    info.uncompressed_slice(),
571                                    self.helper
572                                        .session_manager
573                                        .buffer_allocator
574                                        .get()
575                                        .unwrap()
576                                        .buffer_source()
577                                        .slice(),
578                                    info.buffer.as_ref().unwrap().range(),
579                                )
580                            };
581                            let vmar = fuchsia_runtime::vmar_root_self();
582                            // The target slice might not be page aligned.
583                            let addr = target_slice.addr();
584                            let unaligned = addr % zx::system_get_page_size() as usize;
585                            if let Err(error) = vmar.op_range(
586                                zx::VmarOp::COMMIT,
587                                addr - unaligned,
588                                target_slice.len() + unaligned,
589                            ) {
590                                log::warn!(error:?; "Unable to commit target range");
591                            }
592                            // But the buffer range should be.
593                            if let Err(error) = vmar.op_range(
594                                zx::VmarOp::PREFETCH,
595                                buffer_slice.addr() + buffer_range.start,
596                                buffer_range.len(),
597                            ) {
598                                log::warn!(
599                                    error:?,
600                                    buffer_range:?;
601                                    "Unable to prefetch source range",
602                                );
603                            }
604                        }
605                    },
606                )
607                .await
608                .0
609            }
610            Operation::Write { device_block_offset, block_count, _unused, vmo_offset, options } => {
611                self.interface
612                    .write(
613                        device_block_offset,
614                        block_count,
615                        vmo.as_ref().unwrap(),
616                        vmo_offset,
617                        options,
618                        trace_flow_id,
619                    )
620                    .await
621            }
622            Operation::Flush => self.interface.flush(trace_flow_id).await,
623            Operation::Trim { device_block_offset, block_count } => {
624                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
625            }
626            Operation::CloseVmo
627            | Operation::StartDecompressedRead { .. }
628            | Operation::ContinueDecompressedRead { .. } => {
629                // Handled in main request loop
630                unreachable!()
631            }
632        };
633        self.helper
634            .session_manager
635            .active_requests
636            .complete_and_take_response(request_id, result.into())
637            .map(|(_, r)| r)
638    }
639}
640
641impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
642    const SUPPORTS_DECOMPRESSION: bool = true;
643
644    // We don't need the session, we just need something unique to identify the session.
645    type Session = usize;
646
647    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
648        self.interface.on_attach_vmo(vmo).await
649    }
650
651    async fn open_session(
652        self: Arc<Self>,
653        stream: fblock::SessionRequestStream,
654        offset_map: OffsetMap,
655        block_size: u32,
656    ) -> Result<(), Error> {
657        self.interface.clone().open_session(self, stream, offset_map, block_size).await
658    }
659
660    async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
661        self.interface.get_info().await
662    }
663
664    async fn get_volume_info(
665        &self,
666    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
667        self.interface.get_volume_info().await
668    }
669
670    async fn query_slices(
671        &self,
672        start_slices: &[u64],
673    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
674        self.interface.query_slices(start_slices).await
675    }
676
677    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
678        self.interface.extend(start_slice, slice_count).await
679    }
680
681    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
682        self.interface.shrink(start_slice, slice_count).await
683    }
684
685    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
686        return &self.active_requests;
687    }
688}
689
690impl<I: Interface> IntoSessionManager for Arc<I> {
691    type SM = SessionManager<I>;
692
693    fn into_session_manager(self) -> Arc<Self::SM> {
694        Arc::new(SessionManager {
695            interface: self,
696            active_requests: ActiveRequests::default(),
697            buffer_allocator: OnceLock::new(),
698        })
699    }
700}