_block_server_c_rustc_static/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    DecodeResult, DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation,
7    SessionHelper, FIFO_MAX_REQUESTS,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
11use futures::future::Fuse;
12use futures::stream::FuturesUnordered;
13use futures::{select_biased, FutureExt, StreamExt};
14use std::borrow::Cow;
15use std::future::{poll_fn, Future};
16use std::mem::MaybeUninit;
17use std::num::NonZero;
18use std::pin::pin;
19use std::sync::Arc;
20use std::task::{ready, Poll};
21use {
22    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
23    fuchsia_async as fasync,
24};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27    /// Runs `stream` to completion.
28    ///
29    /// Implementors can override this method if they want to create a passthrough session instead
30    /// (and can use `[PassthroughSession]` below to do so).  See
31    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
32    ///
33    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
34    /// will not be called, and can be stubbed out:
35    ///   - on_attach_vmo
36    ///   - on_detach_vmo
37    ///   - read
38    ///   - write
39    ///   - flush
40    ///   - trim
41    fn open_session(
42        &self,
43        session_manager: Arc<SessionManager<Self>>,
44        stream: fblock::SessionRequestStream,
45        offset_map: OffsetMap,
46        block_size: u32,
47    ) -> impl Future<Output = Result<(), Error>> + Send {
48        // By default, serve the session rather than forwarding/proxying it.
49        session_manager.serve_session(stream, offset_map, block_size)
50    }
51
52    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
53    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
54    /// value (as, say, a key into a HashMap).
55    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56        async { Ok(()) }
57    }
58
59    /// Called whenever a VMO is detached.
60    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62    /// Called to get block/partition information.
63    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
64
65    /// Called for a request to read bytes.
66    fn read(
67        &self,
68        device_block_offset: u64,
69        block_count: u32,
70        vmo: &Arc<zx::Vmo>,
71        vmo_offset: u64, // *bytes* not blocks
72        trace_flow_id: Option<NonZero<u64>>,
73    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
74
75    /// Called for a request to write bytes.
76    fn write(
77        &self,
78        device_block_offset: u64,
79        block_count: u32,
80        vmo: &Arc<zx::Vmo>,
81        vmo_offset: u64, // *bytes* not blocks
82        opts: WriteOptions,
83        trace_flow_id: Option<NonZero<u64>>,
84    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
85
86    /// Called to flush the device.
87    fn flush(
88        &self,
89        trace_flow_id: Option<NonZero<u64>>,
90    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
91
92    /// Called to trim a region.
93    fn trim(
94        &self,
95        device_block_offset: u64,
96        block_count: u32,
97        trace_flow_id: Option<NonZero<u64>>,
98    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
99
100    /// Called to handle the GetVolumeInfo FIDL call.
101    fn get_volume_info(
102        &self,
103    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
104    {
105        async { Err(zx::Status::NOT_SUPPORTED) }
106    }
107
108    /// Called to handle the QuerySlices FIDL call.
109    fn query_slices(
110        &self,
111        _start_slices: &[u64],
112    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
113        async { Err(zx::Status::NOT_SUPPORTED) }
114    }
115
116    /// Called to handle the Extend FIDL call.
117    fn extend(
118        &self,
119        _start_slice: u64,
120        _slice_count: u64,
121    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
122        async { Err(zx::Status::NOT_SUPPORTED) }
123    }
124
125    /// Called to handle the Shrink FIDL call.
126    fn shrink(
127        &self,
128        _start_slice: u64,
129        _slice_count: u64,
130    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
131        async { Err(zx::Status::NOT_SUPPORTED) }
132    }
133}
134
135/// A helper object to run a passthrough (proxy) session.
136pub struct PassthroughSession(fblock::SessionProxy);
137
138impl PassthroughSession {
139    pub fn new(proxy: fblock::SessionProxy) -> Self {
140        Self(proxy)
141    }
142
143    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
144        match request {
145            fblock::SessionRequest::GetFifo { responder } => {
146                responder.send(self.0.get_fifo().await?)?;
147            }
148            fblock::SessionRequest::AttachVmo { vmo, responder } => {
149                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
150            }
151            fblock::SessionRequest::Close { responder } => {
152                responder.send(self.0.close().await?)?;
153            }
154        }
155        Ok(())
156    }
157
158    /// Runs `stream` until completion.
159    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
160        while let Some(Ok(request)) = stream.next().await {
161            if let Err(error) = self.handle_request(request).await {
162                log::warn!(error:?; "FIDL error");
163            }
164        }
165        Ok(())
166    }
167}
168
169pub struct SessionManager<I: ?Sized> {
170    interface: Arc<I>,
171}
172
173impl<I: Interface + ?Sized> SessionManager<I> {
174    pub fn new(interface: Arc<I>) -> Self {
175        Self { interface }
176    }
177
178    /// Runs `stream` until completion.
179    pub async fn serve_session(
180        self: Arc<Self>,
181        stream: fblock::SessionRequestStream,
182        offset_map: OffsetMap,
183        block_size: u32,
184    ) -> Result<(), Error> {
185        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
186        let helper = Arc::new(helper);
187        let interface = self.interface.clone();
188
189        let mut stream = stream.fuse();
190
191        let scope = fasync::Scope::new();
192        let helper_clone = helper.clone();
193        let mut fifo_task = scope
194            .spawn(async move {
195                if let Err(status) = run_fifo(fifo, interface, helper).await {
196                    if status != zx::Status::PEER_CLOSED {
197                        log::error!(status:?; "FIFO error");
198                    }
199                }
200            })
201            .fuse();
202
203        // Make sure we detach VMOs when we go out of scope.
204        scopeguard::defer! {
205            for (_, vmo) in helper_clone.take_vmos() {
206                self.interface.on_detach_vmo(&vmo);
207            }
208        }
209
210        loop {
211            futures::select! {
212                maybe_req = stream.next() => {
213                    if let Some(req) = maybe_req {
214                        helper_clone.handle_request(req?).await?;
215                    } else {
216                        break;
217                    }
218                }
219                _ = fifo_task => break,
220            }
221        }
222
223        Ok(())
224    }
225}
226
227// A task loop for receiving and responding to FIFO requests.
228async fn run_fifo<I: Interface + ?Sized>(
229    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
230    interface: Arc<I>,
231    helper: Arc<SessionHelper<SessionManager<I>>>,
232) -> Result<(), zx::Status> {
233    // The FIFO has to be processed by a single task due to implementation constraints on
234    // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and writes
235    // can happen in batch, and request processing is parallel.
236    // The general flow is:
237    //  - Read messages from the FIFO, write into `requests`.
238    //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`, which
239    //    will eventually write them into `responses`.
240    //  - Read `responses` and write out to the FIFO.
241    let mut fifo = fasync::Fifo::from_fifo(fifo);
242    let (mut reader, mut writer) = fifo.async_io();
243    let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
244    let mut active_requests = FuturesUnordered::new();
245    let mut responses = vec![];
246
247    loop {
248        let new_requests = {
249            // We provide some flow control by limiting how many in-flight requests we will allow.
250            // Due to request splitting, there might be more active requests than there are request
251            // slots.
252            let pending_requests = active_requests.len() + responses.len();
253            let count = requests.len().saturating_sub(pending_requests);
254            let mut receive_requests = pin!(if count == 0 {
255                Fuse::terminated()
256            } else {
257                reader.read_entries(&mut requests[..count]).fuse()
258            });
259            let mut send_responses = pin!(if responses.is_empty() {
260                Fuse::terminated()
261            } else {
262                poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
263                    match ready!(writer.try_write(cx, &responses[..])) {
264                        Ok(written) => {
265                            responses.drain(..written);
266                            Poll::Ready(Ok(()))
267                        }
268                        Err(status) => Poll::Ready(Err(status)),
269                    }
270                })
271                .fuse()
272            });
273
274            // Order is important here.  We want to prioritize sending results on the FIFO and
275            // processing FIFO messages over receiving new ones, to provide flow control.
276            select_biased!(
277                res = send_responses => {
278                    res?;
279                    0
280                },
281                response = active_requests.select_next_some() => {
282                    responses.extend(response);
283                    0
284                }
285                count = receive_requests => {
286                    count?
287                }
288            )
289        };
290
291        let process_request =
292            async |interface: &Arc<I>,
293                   helper: &Arc<SessionHelper<SessionManager<I>>>,
294                   decoded_request: DecodedRequest| {
295                let tracking = decoded_request.request_tracking;
296                let status = process_fifo_request(interface.clone(), decoded_request).await.into();
297                helper.finish_fifo_request(tracking, status)
298            };
299        // NB: It is very important that there are no `await`s for the rest of the loop body, as
300        // otherwise active requests might become stalled.
301        let mut i = 0;
302        let mut in_split = false;
303        while i < new_requests {
304            let request = &mut requests[i];
305            i += 1;
306            match helper.decode_fifo_request(unsafe { request.assume_init_mut() }, in_split) {
307                DecodeResult::Ok(decoded_request) => {
308                    in_split = false;
309                    if let Operation::CloseVmo = decoded_request.operation {
310                        if let Some(vmo) = decoded_request.vmo {
311                            interface.on_detach_vmo(vmo.as_ref());
312                        }
313                        responses.extend(
314                            helper.finish_fifo_request(
315                                decoded_request.request_tracking,
316                                zx::Status::OK,
317                            ),
318                        );
319                    } else {
320                        active_requests.push(process_request(&interface, &helper, decoded_request));
321                    }
322                }
323                DecodeResult::Split(decoded_request) => {
324                    active_requests.push(process_request(&interface, &helper, decoded_request));
325                    // Re-process the request
326                    in_split = true;
327                    i -= 1;
328                }
329                DecodeResult::InvalidRequest(tracking, status) => {
330                    in_split = false;
331                    responses.extend(helper.finish_fifo_request(tracking, status));
332                }
333                DecodeResult::IgnoreRequest => {
334                    in_split = false;
335                }
336            }
337        }
338    }
339}
340
341impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
342    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
343        self.interface.on_attach_vmo(vmo).await
344    }
345
346    async fn open_session(
347        self: Arc<Self>,
348        stream: fblock::SessionRequestStream,
349        offset_map: OffsetMap,
350        block_size: u32,
351    ) -> Result<(), Error> {
352        self.interface.clone().open_session(self, stream, offset_map, block_size).await
353    }
354
355    async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
356        self.interface.get_info().await
357    }
358
359    async fn get_volume_info(
360        &self,
361    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
362        self.interface.get_volume_info().await
363    }
364
365    async fn query_slices(
366        &self,
367        start_slices: &[u64],
368    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
369        self.interface.query_slices(start_slices).await
370    }
371
372    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
373        self.interface.extend(start_slice, slice_count).await
374    }
375
376    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
377        self.interface.shrink(start_slice, slice_count).await
378    }
379}
380
381impl<I: Interface> IntoSessionManager for Arc<I> {
382    type SM = SessionManager<I>;
383
384    fn into_session_manager(self) -> Arc<Self::SM> {
385        Arc::new(SessionManager { interface: self })
386    }
387}
388
389/// Processes a fifo request.
390async fn process_fifo_request<I: Interface + ?Sized>(
391    interface: Arc<I>,
392    r: DecodedRequest,
393) -> Result<(), zx::Status> {
394    let trace_flow_id = r.request_tracking.trace_flow_id;
395    match r.operation {
396        Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
397            interface
398                .read(
399                    device_block_offset,
400                    block_count,
401                    &r.vmo.as_ref().unwrap(),
402                    vmo_offset,
403                    trace_flow_id,
404                )
405                .await
406        }
407        Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
408            interface
409                .write(
410                    device_block_offset,
411                    block_count,
412                    &r.vmo.as_ref().unwrap(),
413                    vmo_offset,
414                    options,
415                    trace_flow_id,
416                )
417                .await
418        }
419        Operation::Flush => interface.flush(trace_flow_id).await,
420        Operation::Trim { device_block_offset, block_count } => {
421            interface.trim(device_block_offset, block_count, trace_flow_id).await
422        }
423        Operation::CloseVmo => {
424            // Handled in main request loop
425            unreachable!()
426        }
427    }
428}