_block_server_c_rustc_static/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation,
7    SessionHelper, TraceFlowId, FIFO_MAX_REQUESTS,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
11use futures::future::{Fuse, FusedFuture};
12use futures::stream::FuturesUnordered;
13use futures::{select_biased, FutureExt, StreamExt};
14use std::borrow::Cow;
15use std::collections::VecDeque;
16use std::future::{poll_fn, Future};
17use std::mem::MaybeUninit;
18use std::pin::pin;
19use std::sync::Arc;
20use std::task::{ready, Poll};
21use {
22    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
23    fuchsia_async as fasync,
24};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27    /// Runs `stream` to completion.
28    ///
29    /// Implementors can override this method if they want to create a passthrough session instead
30    /// (and can use `[PassthroughSession]` below to do so).  See
31    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
32    ///
33    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
34    /// will not be called, and can be stubbed out:
35    ///   - on_attach_vmo
36    ///   - on_detach_vmo
37    ///   - read
38    ///   - write
39    ///   - flush
40    ///   - trim
41    fn open_session(
42        &self,
43        session_manager: Arc<SessionManager<Self>>,
44        stream: fblock::SessionRequestStream,
45        offset_map: OffsetMap,
46        block_size: u32,
47    ) -> impl Future<Output = Result<(), Error>> + Send {
48        // By default, serve the session rather than forwarding/proxying it.
49        session_manager.serve_session(stream, offset_map, block_size)
50    }
51
52    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
53    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
54    /// value (as, say, a key into a HashMap).
55    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56        async { Ok(()) }
57    }
58
59    /// Called whenever a VMO is detached.
60    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62    /// Called to get block/partition information.
63    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
64
65    /// Called for a request to read bytes.
66    fn read(
67        &self,
68        device_block_offset: u64,
69        block_count: u32,
70        vmo: &Arc<zx::Vmo>,
71        vmo_offset: u64, // *bytes* not blocks
72        trace_flow_id: TraceFlowId,
73    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
74
75    /// Called for a request to write bytes. WriteOptions::PRE_BARRIER should never be seen
76    /// for this call. See barrier().
77    fn write(
78        &self,
79        device_block_offset: u64,
80        block_count: u32,
81        vmo: &Arc<zx::Vmo>,
82        vmo_offset: u64, // *bytes* not blocks
83        opts: WriteOptions,
84        trace_flow_id: TraceFlowId,
85    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87    /// Indicates that all previously completed write operations should be made persistent prior to
88    /// any write operations issued after this call. It is not specified how the barrier affects
89    /// currently in-flight write operations. This corresponds to the use of the PRE_BARRIER flag
90    /// that can be used on a write request. Requests with that flag will be converted into
91    /// separate barrier and write calls, and the write call above will not ever include the
92    /// WriteOptions::PRE_BARRIER within opts.
93    fn barrier(&self) -> Result<(), zx::Status>;
94
95    /// Called to flush the device.
96    fn flush(
97        &self,
98        trace_flow_id: TraceFlowId,
99    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101    /// Called to trim a region.
102    fn trim(
103        &self,
104        device_block_offset: u64,
105        block_count: u32,
106        trace_flow_id: TraceFlowId,
107    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
108
109    /// Called to handle the GetVolumeInfo FIDL call.
110    fn get_volume_info(
111        &self,
112    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
113    {
114        async { Err(zx::Status::NOT_SUPPORTED) }
115    }
116
117    /// Called to handle the QuerySlices FIDL call.
118    fn query_slices(
119        &self,
120        _start_slices: &[u64],
121    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
122        async { Err(zx::Status::NOT_SUPPORTED) }
123    }
124
125    /// Called to handle the Extend FIDL call.
126    fn extend(
127        &self,
128        _start_slice: u64,
129        _slice_count: u64,
130    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
131        async { Err(zx::Status::NOT_SUPPORTED) }
132    }
133
134    /// Called to handle the Shrink FIDL call.
135    fn shrink(
136        &self,
137        _start_slice: u64,
138        _slice_count: u64,
139    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
140        async { Err(zx::Status::NOT_SUPPORTED) }
141    }
142}
143
144/// A helper object to run a passthrough (proxy) session.
145pub struct PassthroughSession(fblock::SessionProxy);
146
147impl PassthroughSession {
148    pub fn new(proxy: fblock::SessionProxy) -> Self {
149        Self(proxy)
150    }
151
152    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
153        match request {
154            fblock::SessionRequest::GetFifo { responder } => {
155                responder.send(self.0.get_fifo().await?)?;
156            }
157            fblock::SessionRequest::AttachVmo { vmo, responder } => {
158                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
159            }
160            fblock::SessionRequest::Close { responder } => {
161                responder.send(self.0.close().await?)?;
162            }
163        }
164        Ok(())
165    }
166
167    /// Runs `stream` until completion.
168    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
169        while let Some(Ok(request)) = stream.next().await {
170            if let Err(error) = self.handle_request(request).await {
171                log::warn!(error:?; "FIDL error");
172            }
173        }
174        Ok(())
175    }
176}
177
178pub struct SessionManager<I: Interface + ?Sized> {
179    interface: Arc<I>,
180    active_requests: ActiveRequests<usize>,
181}
182
183impl<I: Interface + ?Sized> SessionManager<I> {
184    pub fn new(interface: Arc<I>) -> Self {
185        Self { interface, active_requests: ActiveRequests::default() }
186    }
187
188    /// Runs `stream` until completion.
189    pub async fn serve_session(
190        self: Arc<Self>,
191        stream: fblock::SessionRequestStream,
192        offset_map: OffsetMap,
193        block_size: u32,
194    ) -> Result<(), Error> {
195        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
196        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
197        let mut stream = stream.fuse();
198        let scope = fasync::Scope::new();
199        let helper = session.helper.clone();
200        let mut fifo_task = scope
201            .spawn(async move {
202                if let Err(status) = session.run_fifo(fifo).await {
203                    if status != zx::Status::PEER_CLOSED {
204                        log::error!(status:?; "FIFO error");
205                    }
206                }
207            })
208            .fuse();
209
210        // Make sure we detach VMOs when we go out of scope.
211        scopeguard::defer! {
212            for (_, vmo) in helper.take_vmos() {
213                self.interface.on_detach_vmo(&vmo);
214            }
215        }
216
217        loop {
218            futures::select! {
219                maybe_req = stream.next() => {
220                    if let Some(req) = maybe_req {
221                        helper.handle_request(req?).await?;
222                    } else {
223                        break;
224                    }
225                }
226                _ = fifo_task => break,
227            }
228        }
229
230        Ok(())
231    }
232}
233
234struct Session<I: Interface + ?Sized> {
235    interface: Arc<I>,
236    helper: Arc<SessionHelper<SessionManager<I>>>,
237}
238
239impl<I: Interface + ?Sized> Session<I> {
240    // A task loop for receiving and responding to FIFO requests.
241    async fn run_fifo(
242        &self,
243        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
244    ) -> Result<(), zx::Status> {
245        scopeguard::defer! {
246            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
247        }
248
249        // The FIFO has to be processed by a single task due to implementation constraints on
250        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
251        // writes can happen in batch, and request processing is parallel.
252        //
253        // The general flow is:
254        //  - Read messages from the FIFO, write into `requests`.
255        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
256        //    which will eventually write them into `responses`.
257        //  - Read `responses` and write out to the FIFO.
258        let mut fifo = fasync::Fifo::from_fifo(fifo);
259        let (mut reader, mut writer) = fifo.async_io();
260        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
261        let active_requests = &self.helper.session_manager.active_requests;
262        let mut active_request_futures = FuturesUnordered::new();
263        let mut responses = Vec::new();
264
265        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
266        // up requests that need to be mapped.  This will serialise how mappings occur which might
267        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
268        // optimise it.
269        let mut map_future = pin!(Fuse::terminated());
270        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
271
272        loop {
273            let new_requests = {
274                // We provide some flow control by limiting how many in-flight requests we will
275                // allow.
276                let pending_requests = active_request_futures.len() + responses.len();
277                let count = requests.len().saturating_sub(pending_requests);
278                let mut receive_requests = pin!(if count == 0 {
279                    Fuse::terminated()
280                } else {
281                    reader.read_entries(&mut requests[..count]).fuse()
282                });
283                let mut send_responses = pin!(if responses.is_empty() {
284                    Fuse::terminated()
285                } else {
286                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
287                        match ready!(writer.try_write(cx, &responses[..])) {
288                            Ok(written) => {
289                                responses.drain(..written);
290                                Poll::Ready(Ok(()))
291                            }
292                            Err(status) => Poll::Ready(Err(status)),
293                        }
294                    })
295                    .fuse()
296                });
297
298                // Order is important here.  We want to prioritize sending results on the FIFO and
299                // processing FIFO messages over receiving new ones, to provide flow control.
300                select_biased!(
301                    res = send_responses => {
302                        res?;
303                        0
304                    },
305                    response = active_request_futures.select_next_some() => {
306                        responses.extend(response);
307                        0
308                    }
309                    result = map_future => {
310                        match result {
311                            Ok((request, remainder)) => {
312                                active_request_futures.push(self.process_fifo_request(request));
313                                if let Some(remainder) = remainder {
314                                    map_future.set(self.map_request(remainder).fuse());
315                                }
316                            }
317                            Err(response) => responses.extend(response),
318                        }
319                        if map_future.is_terminated() {
320                            if let Some(request) = pending_mappings.pop_front() {
321                                map_future.set(self.map_request(request).fuse());
322                            }
323                        }
324                        0
325                    }
326                    count = receive_requests => {
327                        count?
328                    }
329                )
330            };
331
332            // NB: It is very important that there are no `await`s for the rest of the loop body, as
333            // otherwise active requests might become stalled.
334            for request in &mut requests[..new_requests] {
335                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
336                    request.assume_init_mut()
337                }) {
338                    Ok(DecodedRequest {
339                        operation: Operation::CloseVmo, vmo, request_id, ..
340                    }) => {
341                        if let Some(vmo) = vmo {
342                            self.interface.on_detach_vmo(vmo.as_ref());
343                        }
344                        responses.extend(
345                            active_requests
346                                .complete_and_take_response(request_id, zx::Status::OK)
347                                .map(|(_, response)| response),
348                        );
349                    }
350                    Ok(mut request) => {
351                        // If `request` contains WriteOptions::PRE_BARRIER, issue the barrier
352                        // prior to mapping the request. If the barrier fails, we can
353                        // immediately respond to the request without splitting it up.
354                        if let Err(status) = self.maybe_issue_barrier(&mut request) {
355                            let response = self
356                                .helper
357                                .session_manager
358                                .active_requests
359                                .complete_and_take_response(request.request_id, status)
360                                .map(|(_, r)| r);
361                            responses.extend(response);
362                        } else if map_future.is_terminated() {
363                            map_future.set(self.map_request(request).fuse());
364                        } else {
365                            pending_mappings.push_back(request);
366                        }
367                    }
368                    Err(None) => {}
369                    Err(Some(response)) => responses.push(response),
370                }
371            }
372        }
373    }
374
375    fn maybe_issue_barrier(&self, request: &mut DecodedRequest) -> Result<(), zx::Status> {
376        if let Operation::Write {
377            device_block_offset: _,
378            block_count: _,
379            mut options,
380            vmo_offset: _,
381        } = &request.operation
382        {
383            if options.contains(WriteOptions::PRE_BARRIER) {
384                self.interface.barrier()?;
385                options &= !WriteOptions::PRE_BARRIER;
386            }
387        }
388        Ok(())
389    }
390
391    // This is currently async when it doesn't need to be to allow for upcoming changes.
392    async fn map_request(
393        &self,
394        request: DecodedRequest,
395    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
396        self.helper.map_request(request)
397    }
398
399    /// Processes a fifo request.
400    async fn process_fifo_request(
401        &self,
402        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
403    ) -> Option<BlockFifoResponse> {
404        let result = match operation {
405            Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
406                self.interface
407                    .read(
408                        device_block_offset,
409                        block_count,
410                        vmo.as_ref().unwrap(),
411                        vmo_offset,
412                        trace_flow_id,
413                    )
414                    .await
415            }
416            Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
417                self.interface
418                    .write(
419                        device_block_offset,
420                        block_count,
421                        vmo.as_ref().unwrap(),
422                        vmo_offset,
423                        options,
424                        trace_flow_id,
425                    )
426                    .await
427            }
428            Operation::Flush => self.interface.flush(trace_flow_id).await,
429            Operation::Trim { device_block_offset, block_count } => {
430                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
431            }
432            Operation::CloseVmo => {
433                // Handled in main request loop
434                unreachable!()
435            }
436        };
437        self.helper
438            .session_manager
439            .active_requests
440            .complete_and_take_response(request_id, result.into())
441            .map(|(_, r)| r)
442    }
443}
444
445impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
446    // We don't need the session, we just need something unique to identify the session.
447    type Session = usize;
448
449    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
450        self.interface.on_attach_vmo(vmo).await
451    }
452
453    async fn open_session(
454        self: Arc<Self>,
455        stream: fblock::SessionRequestStream,
456        offset_map: OffsetMap,
457        block_size: u32,
458    ) -> Result<(), Error> {
459        self.interface.clone().open_session(self, stream, offset_map, block_size).await
460    }
461
462    async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
463        self.interface.get_info().await
464    }
465
466    async fn get_volume_info(
467        &self,
468    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
469        self.interface.get_volume_info().await
470    }
471
472    async fn query_slices(
473        &self,
474        start_slices: &[u64],
475    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
476        self.interface.query_slices(start_slices).await
477    }
478
479    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
480        self.interface.extend(start_slice, slice_count).await
481    }
482
483    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
484        self.interface.shrink(start_slice, slice_count).await
485    }
486
487    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
488        return &self.active_requests;
489    }
490}
491
492impl<I: Interface> IntoSessionManager for Arc<I> {
493    type SM = SessionManager<I>;
494
495    fn into_session_manager(self) -> Arc<Self::SM> {
496        Arc::new(SessionManager { interface: self, active_requests: ActiveRequests::default() })
497    }
498}