block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7    Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block::DeviceFlag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27    /// Runs `stream` to completion.
28    ///
29    /// Implementors can override this method if they want to create a passthrough session instead
30    /// (and can use `[PassthroughSession]` below to do so).  See
31    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
32    ///
33    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
34    /// will not be called, and can be stubbed out:
35    ///   - on_attach_vmo
36    ///   - on_detach_vmo
37    ///   - read
38    ///   - write
39    ///   - flush
40    ///   - trim
41    fn open_session(
42        &self,
43        session_manager: Arc<SessionManager<Self>>,
44        stream: fblock::SessionRequestStream,
45        offset_map: OffsetMap,
46        block_size: u32,
47    ) -> impl Future<Output = Result<(), Error>> + Send {
48        // By default, serve the session rather than forwarding/proxying it.
49        session_manager.serve_session(stream, offset_map, block_size)
50    }
51
52    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
53    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
54    /// value (as, say, a key into a HashMap).
55    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56        async { Ok(()) }
57    }
58
59    /// Called whenever a VMO is detached.
60    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62    /// Called to get block/partition information.
63    fn get_info(&self) -> Cow<'_, DeviceInfo>;
64
65    /// Called for a request to read bytes.
66    fn read(
67        &self,
68        device_block_offset: u64,
69        block_count: u32,
70        vmo: &Arc<zx::Vmo>,
71        vmo_offset: u64, // *bytes* not blocks
72        opts: ReadOptions,
73        trace_flow_id: TraceFlowId,
74    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76    /// Called for a request to write bytes.
77    fn write(
78        &self,
79        device_block_offset: u64,
80        block_count: u32,
81        vmo: &Arc<zx::Vmo>,
82        vmo_offset: u64, // *bytes* not blocks
83        opts: WriteOptions,
84        trace_flow_id: TraceFlowId,
85    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87    /// Called to flush the device.
88    fn flush(
89        &self,
90        trace_flow_id: TraceFlowId,
91    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
92
93    /// Called to trim a region.
94    fn trim(
95        &self,
96        device_block_offset: u64,
97        block_count: u32,
98        trace_flow_id: TraceFlowId,
99    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101    /// Called to handle the GetVolumeInfo FIDL call.
102    fn get_volume_info(
103        &self,
104    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
105    {
106        async { Err(zx::Status::NOT_SUPPORTED) }
107    }
108
109    /// Called to handle the QuerySlices FIDL call.
110    fn query_slices(
111        &self,
112        _start_slices: &[u64],
113    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
114        async { Err(zx::Status::NOT_SUPPORTED) }
115    }
116
117    /// Called to handle the Extend FIDL call.
118    fn extend(
119        &self,
120        _start_slice: u64,
121        _slice_count: u64,
122    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
123        async { Err(zx::Status::NOT_SUPPORTED) }
124    }
125
126    /// Called to handle the Shrink FIDL call.
127    fn shrink(
128        &self,
129        _start_slice: u64,
130        _slice_count: u64,
131    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132        async { Err(zx::Status::NOT_SUPPORTED) }
133    }
134}
135
136/// A helper object to run a passthrough (proxy) session.
137pub struct PassthroughSession(fblock::SessionProxy);
138
139impl PassthroughSession {
140    pub fn new(proxy: fblock::SessionProxy) -> Self {
141        Self(proxy)
142    }
143
144    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
145        match request {
146            fblock::SessionRequest::GetFifo { responder } => {
147                responder.send(self.0.get_fifo().await?)?;
148            }
149            fblock::SessionRequest::AttachVmo { vmo, responder } => {
150                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
151            }
152            fblock::SessionRequest::Close { responder } => {
153                responder.send(self.0.close().await?)?;
154            }
155        }
156        Ok(())
157    }
158
159    /// Runs `stream` until completion.
160    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
161        while let Some(Ok(request)) = stream.next().await {
162            if let Err(error) = self.handle_request(request).await {
163                log::warn!(error:?; "FIDL error");
164            }
165        }
166        Ok(())
167    }
168}
169
170pub struct SessionManager<I: Interface + ?Sized> {
171    interface: Arc<I>,
172    active_requests: ActiveRequests<usize>,
173
174    // NOTE: This must be dropped *after* `active_requests` because we store `Buffer<'_>` with an
175    // erased ('static) lifetime in `ActiveRequest`.
176    buffer_allocator: OnceLock<BufferAllocator>,
177}
178
179impl<I: Interface + ?Sized> Drop for SessionManager<I> {
180    fn drop(&mut self) {
181        if let Some(allocator) = self.buffer_allocator.get() {
182            self.interface.on_detach_vmo(allocator.buffer_source().vmo());
183        }
184    }
185}
186
187impl<I: Interface + ?Sized> SessionManager<I> {
188    pub fn new(interface: Arc<I>) -> Self {
189        Self {
190            interface,
191            active_requests: ActiveRequests::default(),
192            buffer_allocator: OnceLock::new(),
193        }
194    }
195
196    pub fn interface(&self) -> &I {
197        self.interface.as_ref()
198    }
199
200    /// Runs `stream` until completion.
201    pub async fn serve_session(
202        self: Arc<Self>,
203        stream: fblock::SessionRequestStream,
204        offset_map: OffsetMap,
205        block_size: u32,
206    ) -> Result<(), Error> {
207        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
208        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
209        let mut stream = stream.fuse();
210        let scope = fasync::Scope::new();
211        let helper = session.helper.clone();
212        let mut fifo_task = scope
213            .spawn(async move {
214                if let Err(status) = session.run_fifo(fifo).await {
215                    if status != zx::Status::PEER_CLOSED {
216                        log::error!(status:?; "FIFO error");
217                    }
218                }
219            })
220            .fuse();
221
222        // Make sure we detach VMOs when we go out of scope.
223        scopeguard::defer! {
224            for (_, (vmo, _)) in helper.take_vmos() {
225                self.interface.on_detach_vmo(&vmo);
226            }
227        }
228
229        loop {
230            futures::select! {
231                maybe_req = stream.next() => {
232                    if let Some(req) = maybe_req {
233                        helper.handle_request(req?).await?;
234                    } else {
235                        break;
236                    }
237                }
238                _ = fifo_task => break,
239            }
240        }
241
242        Ok(())
243    }
244}
245
246struct Session<I: Interface + ?Sized> {
247    interface: Arc<I>,
248    helper: Arc<SessionHelper<SessionManager<I>>>,
249}
250
251impl<I: Interface + ?Sized> Session<I> {
252    // A task loop for receiving and responding to FIFO requests.
253    async fn run_fifo(
254        &self,
255        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
256    ) -> Result<(), zx::Status> {
257        scopeguard::defer! {
258            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
259        }
260
261        // The FIFO has to be processed by a single task due to implementation constraints on
262        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
263        // writes can happen in batch, and request processing is parallel.
264        //
265        // The general flow is:
266        //  - Read messages from the FIFO, write into `requests`.
267        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
268        //    which will eventually write them into `responses`.
269        //  - Read `responses` and write out to the FIFO.
270        let mut fifo = fasync::Fifo::from_fifo(fifo);
271        let (mut reader, mut writer) = fifo.async_io();
272        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
273        let active_requests = &self.helper.session_manager.active_requests;
274        let mut active_request_futures = FuturesUnordered::new();
275        let mut responses = Vec::new();
276
277        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
278        // up requests that need to be mapped.  This will serialise how mappings occur which might
279        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
280        // optimise it.
281        let mut map_future = pin!(Fuse::terminated());
282        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
283
284        loop {
285            let new_requests = {
286                // We provide some flow control by limiting how many in-flight requests we will
287                // allow.
288                let pending_requests = active_request_futures.len() + responses.len();
289                let count = requests.len().saturating_sub(pending_requests);
290                let mut receive_requests = pin!(if count == 0 {
291                    Fuse::terminated()
292                } else {
293                    reader.read_entries(&mut requests[..count]).fuse()
294                });
295                let mut send_responses = pin!(if responses.is_empty() {
296                    Fuse::terminated()
297                } else {
298                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
299                        match ready!(writer.try_write(cx, &responses[..])) {
300                            Ok(written) => {
301                                responses.drain(..written);
302                                Poll::Ready(Ok(()))
303                            }
304                            Err(status) => Poll::Ready(Err(status)),
305                        }
306                    })
307                    .fuse()
308                });
309
310                // Order is important here.  We want to prioritize sending results on the FIFO and
311                // processing FIFO messages over receiving new ones, to provide flow control.
312                select_biased!(
313                    res = send_responses => {
314                        res?;
315                        0
316                    },
317                    response = active_request_futures.select_next_some() => {
318                        responses.extend(response);
319                        0
320                    }
321                    result = map_future => {
322                        match result {
323                            Ok((request, remainder, commit_decompression_buffers)) => {
324                                active_request_futures.push(self.process_fifo_request(
325                                    request,
326                                    commit_decompression_buffers,
327                                ));
328                                if let Some(remainder) = remainder {
329                                    map_future.set(
330                                        self.map_request_or_get_response(remainder).fuse()
331                                    );
332                                }
333                            }
334                            Err(response) => responses.extend(response),
335                        }
336                        if map_future.is_terminated() {
337                            if let Some(request) = pending_mappings.pop_front() {
338                                map_future.set(self.map_request_or_get_response(request).fuse());
339                            }
340                        }
341                        0
342                    }
343                    count = receive_requests => {
344                        count?
345                    }
346                )
347            };
348
349            // NB: It is very important that there are no `await`s for the rest of the loop body, as
350            // otherwise active requests might become stalled.
351            for request in &mut requests[..new_requests] {
352                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
353                    request.assume_init_mut()
354                }) {
355                    Ok(DecodedRequest {
356                        operation: Operation::CloseVmo, vmo, request_id, ..
357                    }) => {
358                        if let Some(vmo) = vmo {
359                            self.interface.on_detach_vmo(vmo.as_ref());
360                        }
361                        responses.extend(
362                            active_requests
363                                .complete_and_take_response(request_id, zx::Status::OK)
364                                .map(|(_, response)| response),
365                        );
366                    }
367                    Ok(request) => {
368                        if map_future.is_terminated() {
369                            map_future.set(self.map_request_or_get_response(request).fuse());
370                        } else {
371                            pending_mappings.push_back(request);
372                        }
373                    }
374                    Err(None) => {}
375                    Err(Some(response)) => responses.push(response),
376                }
377            }
378        }
379    }
380
381    async fn map_request_or_get_response(
382        &self,
383        request: DecodedRequest,
384    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
385        let request_id = request.request_id;
386        self.map_request(request).await.map_err(|status| {
387            self.helper
388                .session_manager
389                .active_requests
390                .complete_and_take_response(request_id, status)
391                .map(|(_, r)| r)
392        })
393    }
394
395    // NOTE: The implementation of this currently assumes that we are only processing a single map
396    // request at a time.
397    async fn map_request(
398        &self,
399        mut request: DecodedRequest,
400    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
401        let mut active_requests;
402        let active_request;
403        let mut commit_decompression_buffers = false;
404        let flags = self.interface.get_info().as_ref().device_flags();
405        // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier with a
406        // pre-flush.
407        if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
408            && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
409        {
410            if let Some(id) = request.trace_flow_id {
411                fuchsia_trace::async_instant!(
412                    fuchsia_trace::Id::from(id.get()),
413                    "storage",
414                    "block_server::SimulatedBarrier",
415                    "request_id" => request.request_id.0
416                );
417            }
418            self.interface.flush(request.trace_flow_id).await?;
419        }
420
421        // Handle decompressed read operations by turning them into regular read operations.
422        match request.operation {
423            Operation::StartDecompressedRead {
424                required_buffer_size,
425                device_block_offset,
426                block_count,
427                options,
428            } => {
429                let allocator = match self.helper.session_manager.buffer_allocator.get() {
430                    Some(a) => a,
431                    None => {
432                        // This isn't racy because there should only be one `map_request` future
433                        // running at any one time.
434                        let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
435                        self.interface.on_attach_vmo(&source.vmo()).await?;
436                        let allocator = BufferAllocator::new(
437                            std::cmp::max(
438                                self.helper.block_size as usize,
439                                zx::system_get_page_size() as usize,
440                            ),
441                            source,
442                        );
443                        self.helper.session_manager.buffer_allocator.set(allocator).unwrap();
444                        self.helper.session_manager.buffer_allocator.get().unwrap()
445                    }
446                };
447
448                if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
449                    return Err(zx::Status::OUT_OF_RANGE);
450                }
451
452                let buffer = allocator.allocate_buffer(required_buffer_size).await;
453                let vmo_offset = buffer.range().start as u64;
454
455                // # Safety
456                //
457                // See below.
458                unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
459                    unsafe { std::mem::transmute(buffer) }
460                }
461
462                active_requests = self.helper.session_manager.active_requests.0.lock();
463                active_request = &mut active_requests.requests[request.request_id.0];
464
465                // SAFETY: We guarantee that `buffer_allocator` is dropped after `active_requests`,
466                // so this should be safe.
467                active_request.decompression_info.as_mut().unwrap().buffer =
468                    Some(unsafe { remove_lifetime(buffer) });
469
470                request.operation = Operation::Read {
471                    device_block_offset,
472                    block_count,
473                    _unused: 0,
474                    vmo_offset,
475                    options,
476                };
477                request.vmo = Some(allocator.buffer_source().vmo().clone());
478
479                commit_decompression_buffers = true;
480            }
481            Operation::ContinueDecompressedRead {
482                offset,
483                device_block_offset,
484                block_count,
485                options,
486            } => {
487                active_requests = self.helper.session_manager.active_requests.0.lock();
488                active_request = &mut active_requests.requests[request.request_id.0];
489
490                let buffer =
491                    active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
492
493                // Make sure this read won't overflow our buffer.
494                if offset >= buffer.len() as u64
495                    || buffer.len() as u64 - offset
496                        < block_count as u64 * self.helper.block_size as u64
497                {
498                    return Err(zx::Status::OUT_OF_RANGE);
499                }
500
501                request.operation = Operation::Read {
502                    device_block_offset,
503                    block_count,
504                    _unused: 0,
505                    vmo_offset: buffer.range().start as u64 + offset,
506                    options,
507                };
508
509                let allocator = self.helper.session_manager.buffer_allocator.get().unwrap();
510                request.vmo = Some(allocator.buffer_source().vmo().clone());
511            }
512            _ => {
513                active_requests = self.helper.session_manager.active_requests.0.lock();
514                active_request = &mut active_requests.requests[request.request_id.0];
515            }
516        }
517
518        // NB: We propagate the FORCE_ACCESS flag to *both* request and remainder, even if we're
519        // using simulated FUA.  However, in `process_fifo_request`, we'll only do the post-flush
520        // once the last request completes.
521        self.helper
522            .map_request(request, active_request)
523            .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
524    }
525
526    /// Processes a fifo request.
527    async fn process_fifo_request(
528        &self,
529        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
530        commit_decompression_buffers: bool,
531    ) -> Option<BlockFifoResponse> {
532        let mut needs_postflush = false;
533        let result = match operation {
534            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
535                join(
536                    self.interface.read(
537                        device_block_offset,
538                        block_count,
539                        vmo.as_ref().unwrap(),
540                        vmo_offset,
541                        options,
542                        trace_flow_id,
543                    ),
544                    async {
545                        if commit_decompression_buffers {
546                            let (target_slice, buffer_slice, buffer_range) = {
547                                let active_request =
548                                    self.helper.session_manager.active_requests.request(request_id);
549                                let info = active_request.decompression_info.as_ref().unwrap();
550                                (
551                                    info.uncompressed_slice(),
552                                    self.helper
553                                        .session_manager
554                                        .buffer_allocator
555                                        .get()
556                                        .unwrap()
557                                        .buffer_source()
558                                        .slice(),
559                                    info.buffer.as_ref().unwrap().range(),
560                                )
561                            };
562                            let vmar = fuchsia_runtime::vmar_root_self();
563                            // The target slice might not be page aligned.
564                            let addr = target_slice.addr();
565                            let unaligned = addr % zx::system_get_page_size() as usize;
566                            if let Err(error) = vmar.op_range(
567                                zx::VmarOp::COMMIT,
568                                addr - unaligned,
569                                target_slice.len() + unaligned,
570                            ) {
571                                log::warn!(error:?; "Unable to commit target range");
572                            }
573                            // But the buffer range should be.
574                            if let Err(error) = vmar.op_range(
575                                zx::VmarOp::PREFETCH,
576                                buffer_slice.addr() + buffer_range.start,
577                                buffer_range.len(),
578                            ) {
579                                log::warn!(
580                                    error:?,
581                                    buffer_range:?;
582                                    "Unable to prefetch source range",
583                                );
584                            }
585                        }
586                    },
587                )
588                .await
589                .0
590            }
591            Operation::Write {
592                device_block_offset,
593                block_count,
594                _unused,
595                vmo_offset,
596                mut options,
597            } => {
598                // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with a
599                // post-flush.
600                if options.flags.contains(WriteFlags::FORCE_ACCESS) {
601                    let flags = self.interface.get_info().as_ref().device_flags();
602                    if !flags.contains(DeviceFlag::FUA_SUPPORT) {
603                        options.flags.remove(WriteFlags::FORCE_ACCESS);
604                        needs_postflush = true;
605                    }
606                }
607                self.interface
608                    .write(
609                        device_block_offset,
610                        block_count,
611                        vmo.as_ref().unwrap(),
612                        vmo_offset,
613                        options,
614                        trace_flow_id,
615                    )
616                    .await
617            }
618            Operation::Flush => self.interface.flush(trace_flow_id).await,
619            Operation::Trim { device_block_offset, block_count } => {
620                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
621            }
622            Operation::CloseVmo
623            | Operation::StartDecompressedRead { .. }
624            | Operation::ContinueDecompressedRead { .. } => {
625                // Handled in main request loop
626                unreachable!()
627            }
628        };
629        let response = self
630            .helper
631            .session_manager
632            .active_requests
633            .complete_and_take_response(request_id, result.into())
634            .map(|(_, r)| r);
635        if let Some(mut response) = response {
636            // Only do the post-flush on the very last request, and only if successful.
637            if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
638                if let Some(id) = trace_flow_id {
639                    fuchsia_trace::async_instant!(
640                        fuchsia_trace::Id::from(id.get()),
641                        "storage",
642                        "block_server::SimulatedFUA",
643                        "request_id" => request_id.0
644                    );
645                }
646                response.status =
647                    zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
648            }
649            Some(response)
650        } else {
651            response
652        }
653    }
654}
655
656impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
657    const SUPPORTS_DECOMPRESSION: bool = true;
658
659    // We don't need the session, we just need something unique to identify the session.
660    type Session = usize;
661
662    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
663        self.interface.on_attach_vmo(vmo).await
664    }
665
666    async fn open_session(
667        self: Arc<Self>,
668        stream: fblock::SessionRequestStream,
669        offset_map: OffsetMap,
670        block_size: u32,
671    ) -> Result<(), Error> {
672        self.interface.clone().open_session(self, stream, offset_map, block_size).await
673    }
674
675    fn get_info(&self) -> Cow<'_, DeviceInfo> {
676        self.interface.get_info()
677    }
678
679    async fn get_volume_info(
680        &self,
681    ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
682        self.interface.get_volume_info().await
683    }
684
685    async fn query_slices(
686        &self,
687        start_slices: &[u64],
688    ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
689        self.interface.query_slices(start_slices).await
690    }
691
692    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
693        self.interface.extend(start_slice, slice_count).await
694    }
695
696    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
697        self.interface.shrink(start_slice, slice_count).await
698    }
699
700    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
701        return &self.active_requests;
702    }
703}
704
705impl<I: Interface> IntoSessionManager for Arc<I> {
706    type SM = SessionManager<I>;
707
708    fn into_session_manager(self) -> Arc<Self::SM> {
709        Arc::new(SessionManager {
710            interface: self,
711            active_requests: ActiveRequests::default(),
712            buffer_allocator: OnceLock::new(),
713        })
714    }
715}