Skip to main content

block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::Error;
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fidl_fuchsia_storage_block as fblock;
8use fuchsia_async as fasync;
9use fuchsia_async::epoch::{Epoch, EpochGuard};
10use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
11use futures::{Future, FutureExt as _, TryStreamExt as _};
12use slab::Slab;
13use std::borrow::{Borrow, Cow};
14use std::collections::BTreeMap;
15use std::num::NonZero;
16use std::ops::Range;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use storage_device::buffer::Buffer;
20use zx::HandleBased;
21
22pub mod async_interface;
23pub mod c_interface;
24pub mod callback_interface;
25
26#[cfg(test)]
27mod decompression_tests;
28
29pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
30
31type TraceFlowId = Option<NonZero<u64>>;
32
33#[derive(Clone)]
34pub enum DeviceInfo {
35    Block(BlockInfo),
36    Partition(PartitionInfo),
37}
38
39impl DeviceInfo {
40    pub fn label(&self) -> &str {
41        match self {
42            Self::Block(BlockInfo { .. }) => "",
43            Self::Partition(PartitionInfo { name, .. }) => name,
44        }
45    }
46    pub fn device_flags(&self) -> fblock::DeviceFlag {
47        match self {
48            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
49            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
50        }
51    }
52
53    pub fn block_count(&self) -> Option<u64> {
54        match self {
55            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
56            Self::Partition(PartitionInfo { block_range, .. }) => {
57                block_range.as_ref().map(|range| range.end - range.start)
58            }
59        }
60    }
61
62    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
63        match self {
64            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
65            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
66                max_transfer_blocks.clone()
67            }
68        }
69    }
70
71    fn max_transfer_size(&self, block_size: u32) -> u32 {
72        if let Some(max_blocks) = self.max_transfer_blocks() {
73            max_blocks.get() * block_size
74        } else {
75            MAX_TRANSFER_UNBOUNDED
76        }
77    }
78}
79
80/// Information associated with non-partition block devices.
81#[derive(Clone, Default)]
82pub struct BlockInfo {
83    pub device_flags: fblock::DeviceFlag,
84    pub block_count: u64,
85    pub max_transfer_blocks: Option<NonZero<u32>>,
86}
87
88/// Information associated with a block device that is also a partition.
89#[derive(Clone, Default)]
90pub struct PartitionInfo {
91    /// The device flags reported by the underlying device.
92    pub device_flags: fblock::DeviceFlag,
93    pub max_transfer_blocks: Option<NonZero<u32>>,
94    /// If `block_range` is None, the partition is a volume and may not be contiguous.
95    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
96    /// slices and use that (along with the slice and block sizes) to determine the block count.
97    pub block_range: Option<Range<u64>>,
98    pub type_guid: [u8; 16],
99    pub instance_guid: [u8; 16],
100    pub name: String,
101    pub flags: u64,
102}
103
104/// We internally keep track of active requests, so that when the server is torn down, we can
105/// deallocate all of the resources for pending requests.
106struct ActiveRequest<S> {
107    session: S,
108    group_or_request: GroupOrRequest,
109    trace_flow_id: TraceFlowId,
110    _epoch_guard: EpochGuard<'static>,
111    status: zx::Status,
112    count: u32,
113    req_id: Option<u32>,
114    decompression_info: Option<DecompressionInfo>,
115}
116
117struct DecompressionInfo {
118    // This is the range of compressed bytes in receiving buffer.
119    compressed_range: Range<usize>,
120
121    // This is the range in the target VMO where we will write uncompressed bytes.
122    uncompressed_range: Range<u64>,
123
124    bytes_so_far: u64,
125    mapping: Arc<VmoMapping>,
126    buffer: Option<Buffer<'static>>,
127}
128
129impl DecompressionInfo {
130    /// Returns the uncompressed slice.
131    fn uncompressed_slice(&self) -> *mut [u8] {
132        std::ptr::slice_from_raw_parts_mut(
133            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
134            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
135        )
136    }
137}
138
139pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
140
141impl<S> Default for ActiveRequests<S> {
142    fn default() -> Self {
143        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
144    }
145}
146
147impl<S> ActiveRequests<S> {
148    fn complete_and_take_response(
149        &self,
150        request_id: RequestId,
151        status: zx::Status,
152    ) -> Option<(S, BlockFifoResponse)> {
153        self.0.lock().complete_and_take_response(request_id, status)
154    }
155
156    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
157        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
158    }
159}
160
161struct ActiveRequestsInner<S> {
162    requests: Slab<ActiveRequest<S>>,
163}
164
165// Keeps track of all the requests that are currently being processed
166impl<S> ActiveRequestsInner<S> {
167    /// Completes a request.
168    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
169        let group = &mut self.requests[request_id.0];
170
171        group.count = group.count.checked_sub(1).unwrap();
172        if status != zx::Status::OK && group.status == zx::Status::OK {
173            group.status = status
174        }
175
176        fuchsia_trace::duration!(
177            "storage",
178            "block_server::finish_transaction",
179            "request_id" => request_id.0,
180            "group_completed" => group.count == 0,
181            "status" => status.into_raw());
182        if let Some(trace_flow_id) = group.trace_flow_id {
183            fuchsia_trace::flow_step!(
184                "storage",
185                "block_server::finish_request",
186                trace_flow_id.get().into()
187            );
188        }
189
190        if group.count == 0
191            && group.status == zx::Status::OK
192            && let Some(info) = &mut group.decompression_info
193        {
194            thread_local! {
195                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
196                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
197            }
198            DECOMPRESSOR.with(|decompressor| {
199                // SAFETY: We verified `uncompressed_range` fits within our mapping.
200                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
201                let mut decompressor = decompressor.borrow_mut();
202                if let Err(error) = decompressor.decompress_to_buffer(
203                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
204                    target_slice,
205                ) {
206                    log::warn!(error:?; "Decompression error");
207                    group.status = zx::Status::IO_DATA_INTEGRITY;
208                };
209            });
210        }
211    }
212
213    /// Takes the response if all requests are finished.
214    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
215        let group = &self.requests[request_id.0];
216        match group.req_id {
217            Some(reqid) if group.count == 0 => {
218                let group = self.requests.remove(request_id.0);
219                Some((
220                    group.session,
221                    BlockFifoResponse {
222                        status: group.status.into_raw(),
223                        reqid,
224                        group: group.group_or_request.group_id().unwrap_or(0),
225                        ..Default::default()
226                    },
227                ))
228            }
229            _ => None,
230        }
231    }
232
233    /// Competes the request and returns a response if the request group is finished.
234    fn complete_and_take_response(
235        &mut self,
236        request_id: RequestId,
237        status: zx::Status,
238    ) -> Option<(S, BlockFifoResponse)> {
239        self.complete(request_id, status);
240        self.take_response(request_id)
241    }
242}
243
244/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
245/// cbindgen:no-export
246pub struct BlockServer<SM: SessionManager> {
247    block_size: u32,
248    orchestrator: Arc<SM::Orchestrator>,
249}
250
251/// A single entry in `[OffsetMap]`.
252#[derive(Debug)]
253pub struct BlockOffsetMapping {
254    source_block_offset: u64,
255    target_block_offset: u64,
256    length: u64,
257}
258
259impl BlockOffsetMapping {
260    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
261        blocks.0 >= self.source_block_offset
262            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
263    }
264}
265
266impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
267    type Error = zx::Status;
268
269    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
270        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
272        Ok(Self {
273            source_block_offset: wire.source_block_offset,
274            target_block_offset: wire.target_block_offset,
275            length: wire.length,
276        })
277    }
278}
279
280/// Remaps the offset of block requests based on an internal map, and truncates long requests.
281pub struct OffsetMap {
282    mapping: Option<BlockOffsetMapping>,
283    max_transfer_blocks: Option<NonZero<u32>>,
284}
285
286impl OffsetMap {
287    /// An OffsetMap that remaps requests.
288    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
289        Self { mapping: Some(mapping), max_transfer_blocks }
290    }
291
292    /// An OffsetMap that just enforces maximum request sizes.
293    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
294        Self { mapping: None, max_transfer_blocks }
295    }
296
297    pub fn is_empty(&self) -> bool {
298        self.mapping.is_none()
299    }
300
301    fn mapping(&self) -> Option<&BlockOffsetMapping> {
302        self.mapping.as_ref()
303    }
304
305    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
306        self.max_transfer_blocks
307    }
308}
309
310// Methods take Arc<Self> rather than &self because of
311// https://github.com/rust-lang/rust/issues/42940.
312pub trait SessionManager: 'static {
313    /// The Orchestrator is an object that holds the `SessionManager` and any other state that needs
314    /// to be shared between sessions.  It is responsible for keeping the `SessionManager` alive.
315    /// We use this type instead of directly holding an Arc<SessionManager> in BlockServer, to avoid
316    /// nested Arcs in concrete implementations which need to keep additional state.
317    type Orchestrator: Borrow<Self> + Send + Sync;
318
319    const SUPPORTS_DECOMPRESSION: bool;
320
321    type Session;
322
323    fn on_attach_vmo(
324        orchestrator: Arc<Self::Orchestrator>,
325        vmo: &Arc<zx::Vmo>,
326    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
327
328    /// Creates a new session to handle `stream`.
329    /// The returned future should run until the session completes, for example when the client end
330    /// closes.
331    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
332    fn open_session(
333        orchestrator: Arc<Self::Orchestrator>,
334        stream: fblock::SessionRequestStream,
335        offset_map: OffsetMap,
336        block_size: u32,
337    ) -> impl Future<Output = Result<(), Error>> + Send;
338
339    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
340    fn get_info(&self) -> Cow<'_, DeviceInfo>;
341
342    /// Called to handle the GetVolumeInfo FIDL call.
343    fn get_volume_info(
344        &self,
345    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
346    {
347        async { Err(zx::Status::NOT_SUPPORTED) }
348    }
349
350    /// Called to handle the QuerySlices FIDL call.
351    fn query_slices(
352        &self,
353        _start_slices: &[u64],
354    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
355        async { Err(zx::Status::NOT_SUPPORTED) }
356    }
357
358    /// Called to handle the Shrink FIDL call.
359    fn extend(
360        &self,
361        _start_slice: u64,
362        _slice_count: u64,
363    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
364        async { Err(zx::Status::NOT_SUPPORTED) }
365    }
366
367    /// Called to handle the Shrink FIDL call.
368    fn shrink(
369        &self,
370        _start_slice: u64,
371        _slice_count: u64,
372    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
373        async { Err(zx::Status::NOT_SUPPORTED) }
374    }
375
376    /// Returns the active requests.
377    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
378}
379
380/// A helper trait for converting various types into an `Orchestrator`.
381///
382/// This exists to simplify [`BlockServer::new`].
383pub trait IntoOrchestrator {
384    type SM: SessionManager;
385
386    fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
387}
388
389impl<SM: SessionManager> BlockServer<SM> {
390    pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
391        Self { block_size, orchestrator: orchestrator.into_orchestrator() }
392    }
393
394    pub fn session_manager(&self) -> &SM {
395        self.orchestrator.as_ref().borrow()
396    }
397
398    /// Called to process requests for fuchsia.storage.block.Block.
399    pub async fn handle_requests(
400        &self,
401        mut requests: fblock::BlockRequestStream,
402    ) -> Result<(), Error> {
403        let scope = fasync::Scope::new();
404        loop {
405            match requests.try_next().await {
406                Ok(Some(request)) => {
407                    if let Some(session) = self.handle_request(request).await? {
408                        scope.spawn(session.map(|_| ()));
409                    }
410                }
411                Ok(None) => break,
412                Err(err) => log::warn!(err:?; "Invalid request"),
413            }
414        }
415        scope.await;
416        Ok(())
417    }
418
419    /// Processes a Block request.  If a new session task is created in response to the request,
420    /// it is returned.
421    async fn handle_request(
422        &self,
423        request: fblock::BlockRequest,
424    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
425        match request {
426            fblock::BlockRequest::GetInfo { responder } => {
427                let info = self.device_info();
428                let max_transfer_size = info.max_transfer_size(self.block_size);
429                let (block_count, mut flags) = match info.as_ref() {
430                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
431                        (*block_count, *device_flags)
432                    }
433                    DeviceInfo::Partition(partition_info) => {
434                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
435                            range.end - range.start
436                        } else {
437                            let volume_info = self.session_manager().get_volume_info().await?;
438                            volume_info.0.slice_size * volume_info.1.partition_slice_count
439                                / self.block_size as u64
440                        };
441                        (block_count, partition_info.device_flags)
442                    }
443                };
444                if SM::SUPPORTS_DECOMPRESSION {
445                    flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
446                }
447                responder.send(Ok(&fblock::BlockInfo {
448                    block_count,
449                    block_size: self.block_size,
450                    max_transfer_size,
451                    flags,
452                }))?;
453            }
454            fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
455                let info = self.device_info();
456                return Ok(Some(SM::open_session(
457                    self.orchestrator.clone(),
458                    session.into_stream(),
459                    OffsetMap::empty(info.max_transfer_blocks()),
460                    self.block_size,
461                )));
462            }
463            fblock::BlockRequest::OpenSessionWithOffsetMap {
464                session,
465                mapping,
466                control_handle: _,
467            } => {
468                let info = self.device_info();
469                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
470                    Ok(m) => m,
471                    Err(status) => {
472                        session.close_with_epitaph(status)?;
473                        return Ok(None);
474                    }
475                };
476                if let Some(max) = info.block_count() {
477                    if initial_mapping.target_block_offset + initial_mapping.length > max {
478                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
479                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
480                        return Ok(None);
481                    }
482                }
483                return Ok(Some(SM::open_session(
484                    self.orchestrator.clone(),
485                    session.into_stream(),
486                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
487                    self.block_size,
488                )));
489            }
490            fblock::BlockRequest::GetTypeGuid { responder } => {
491                let info = self.device_info();
492                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
493                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
494                    guid.value.copy_from_slice(&partition_info.type_guid);
495                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
496                } else {
497                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
498                }
499            }
500            fblock::BlockRequest::GetInstanceGuid { responder } => {
501                let info = self.device_info();
502                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
503                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
504                    guid.value.copy_from_slice(&partition_info.instance_guid);
505                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
506                } else {
507                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
508                }
509            }
510            fblock::BlockRequest::GetName { responder } => {
511                let info = self.device_info();
512                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
513                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
514                } else {
515                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
516                }
517            }
518            fblock::BlockRequest::GetMetadata { responder } => {
519                let info = self.device_info();
520                if let DeviceInfo::Partition(info) = info.as_ref() {
521                    let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
522                    type_guid.value.copy_from_slice(&info.type_guid);
523                    let mut instance_guid =
524                        fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
525                    instance_guid.value.copy_from_slice(&info.instance_guid);
526                    responder.send(Ok(&fblock::BlockGetMetadataResponse {
527                        name: Some(info.name.clone()),
528                        type_guid: Some(type_guid),
529                        instance_guid: Some(instance_guid),
530                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
531                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
532                        flags: Some(info.flags),
533                        ..Default::default()
534                    }))?;
535                } else {
536                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
537                }
538            }
539            fblock::BlockRequest::QuerySlices { responder, start_slices } => {
540                match self.session_manager().query_slices(&start_slices).await {
541                    Ok(mut results) => {
542                        let results_len = results.len();
543                        assert!(results_len <= 16);
544                        results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
545                        responder.send(
546                            zx::sys::ZX_OK,
547                            &results.try_into().unwrap(),
548                            results_len as u64,
549                        )?;
550                    }
551                    Err(s) => {
552                        responder.send(
553                            s.into_raw(),
554                            &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
555                            0,
556                        )?;
557                    }
558                }
559            }
560            fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
561                match self.session_manager().get_volume_info().await {
562                    Ok((manager_info, volume_info)) => {
563                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
564                    }
565                    Err(s) => responder.send(s.into_raw(), None, None)?,
566                }
567            }
568            fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
569                responder.send(
570                    zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
571                        .into_raw(),
572                )?;
573            }
574            fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
575                responder.send(
576                    zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
577                        .into_raw(),
578                )?;
579            }
580            fblock::BlockRequest::Destroy { responder, .. } => {
581                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
582            }
583        }
584        Ok(None)
585    }
586
587    fn device_info(&self) -> Cow<'_, DeviceInfo> {
588        self.session_manager().get_info()
589    }
590}
591
592struct SessionHelper<SM: SessionManager> {
593    orchestrator: Arc<SM::Orchestrator>,
594    offset_map: OffsetMap,
595    block_size: u32,
596    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
597    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
598}
599
600struct VmoMapping {
601    base: usize,
602    size: usize,
603}
604
605impl VmoMapping {
606    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
607        let size = vmo.get_size().unwrap() as usize;
608        Ok(Arc::new(Self {
609            base: fuchsia_runtime::vmar_root_self()
610                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
611                .inspect_err(|error| {
612                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
613                })?,
614            size,
615        }))
616    }
617}
618
619impl Drop for VmoMapping {
620    fn drop(&mut self) {
621        // SAFETY: We mapped this in `VmoMapping::new`.
622        unsafe {
623            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
624        }
625    }
626}
627
628enum HandleRequestResult {
629    /// The request was handled successfully.
630    Ok,
631    /// The request closed the stream.  The caller must shut down the session, and must call the
632    /// provided callback after the session is completely shut down.  The caller should assume that
633    /// no further requests need to be handled once this is received.
634    Closed(Box<dyn FnOnce() + Send + 'static>),
635}
636
637impl<SM: SessionManager> SessionHelper<SM> {
638    fn new(
639        orchestrator: Arc<SM::Orchestrator>,
640        offset_map: OffsetMap,
641        block_size: u32,
642    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
643        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
644        Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
645    }
646
647    fn session_manager(&self) -> &SM {
648        self.orchestrator.as_ref().borrow()
649    }
650
651    async fn handle_request(
652        &self,
653        request: fblock::SessionRequest,
654    ) -> Result<HandleRequestResult, Error> {
655        match request {
656            fblock::SessionRequest::GetFifo { responder } => {
657                let rights = zx::Rights::TRANSFER
658                    | zx::Rights::READ
659                    | zx::Rights::WRITE
660                    | zx::Rights::SIGNAL
661                    | zx::Rights::WAIT;
662                match self.peer_fifo.duplicate_handle(rights) {
663                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
664                    Err(s) => responder.send(Err(s.into_raw()))?,
665                }
666                Ok(HandleRequestResult::Ok)
667            }
668            fblock::SessionRequest::AttachVmo { vmo, responder } => {
669                let vmo = Arc::new(vmo);
670                let vmo_id = {
671                    let mut vmos = self.vmos.lock();
672                    if vmos.len() == u16::MAX as usize {
673                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
674                        return Ok(HandleRequestResult::Ok);
675                    } else {
676                        let vmo_id = match vmos.last_entry() {
677                            None => 1,
678                            Some(o) => {
679                                o.key().checked_add(1).unwrap_or_else(|| {
680                                    let mut vmo_id = 1;
681                                    // Find the first gap...
682                                    for (&id, _) in &*vmos {
683                                        if id > vmo_id {
684                                            break;
685                                        }
686                                        vmo_id = id + 1;
687                                    }
688                                    vmo_id
689                                })
690                            }
691                        };
692                        vmos.insert(vmo_id, (vmo.clone(), None));
693                        vmo_id
694                    }
695                };
696                SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
697                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
698                Ok(HandleRequestResult::Ok)
699            }
700            fblock::SessionRequest::Close { responder } => {
701                Ok(HandleRequestResult::Closed(Box::new(move || {
702                    if let Err(err) = responder.send(Ok(())) {
703                        log::warn!(err:?; "Error sending close response");
704                    }
705                })))
706            }
707        }
708    }
709
710    /// Decodes `request`.
711    fn decode_fifo_request(
712        &self,
713        session: SM::Session,
714        request: &BlockFifoRequest,
715    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
716        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
717
718        let request_bytes = request.length as u64 * self.block_size as u64;
719
720        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
721            .ok_or(zx::Status::INVALID_ARGS)
722            .and_then(|code| {
723                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
724                    if code != BlockOpcode::Read {
725                        return Err(zx::Status::INVALID_ARGS);
726                    }
727                    if !SM::SUPPORTS_DECOMPRESSION {
728                        return Err(zx::Status::NOT_SUPPORTED);
729                    }
730                }
731                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
732                    if request.length == 0 {
733                        return Err(zx::Status::INVALID_ARGS);
734                    }
735                    // Make sure the end offsets won't wrap.
736                    if request.dev_offset.checked_add(request.length as u64).is_none()
737                        || (code != BlockOpcode::Trim
738                            && request_bytes.checked_add(request.vmo_offset).is_none())
739                    {
740                        return Err(zx::Status::OUT_OF_RANGE);
741                    }
742                }
743                Ok(match code {
744                    BlockOpcode::Read => Operation::Read {
745                        device_block_offset: request.dev_offset,
746                        block_count: request.length,
747                        _unused: 0,
748                        vmo_offset: request
749                            .vmo_offset
750                            .checked_mul(self.block_size as u64)
751                            .ok_or(zx::Status::OUT_OF_RANGE)?,
752                        options: ReadOptions {
753                            inline_crypto: InlineCryptoOptions {
754                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
755                                dun: request.dun,
756                                slot: request.slot,
757                            },
758                        },
759                    },
760                    BlockOpcode::Write => {
761                        let mut options = WriteOptions {
762                            inline_crypto: InlineCryptoOptions {
763                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
764                                dun: request.dun,
765                                slot: request.slot,
766                            },
767                            ..WriteOptions::default()
768                        };
769                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
770                            options.flags |= WriteFlags::FORCE_ACCESS;
771                        }
772                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
773                            options.flags |= WriteFlags::PRE_BARRIER;
774                        }
775                        Operation::Write {
776                            device_block_offset: request.dev_offset,
777                            block_count: request.length,
778                            _unused: 0,
779                            options,
780                            vmo_offset: request
781                                .vmo_offset
782                                .checked_mul(self.block_size as u64)
783                                .ok_or(zx::Status::OUT_OF_RANGE)?,
784                        }
785                    }
786                    BlockOpcode::Flush => Operation::Flush,
787                    BlockOpcode::Trim => Operation::Trim {
788                        device_block_offset: request.dev_offset,
789                        block_count: request.length,
790                    },
791                    BlockOpcode::CloseVmo => Operation::CloseVmo,
792                })
793            });
794
795        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
796            GroupOrRequest::Group(request.group)
797        } else {
798            GroupOrRequest::Request(request.reqid)
799        };
800
801        let mut active_requests = self.session_manager().active_requests().0.lock();
802        let mut request_id = None;
803
804        // Multiple Block I/O request may be sent as a group.
805        // Notes:
806        // - the group is identified by the group id in the request
807        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
808        //   flag is set.
809        // - when processing a request of a group fails, subsequent requests of that
810        //   group will not be processed.
811        // - decompression is a special case, see block-fifo.h for semantics.
812        //
813        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
814        if group_or_request.is_group() {
815            // Search for an existing entry that matches this group.  NOTE: This is a potentially
816            // expensive way to find a group (it's iterating over all slots in the active-requests
817            // slab).  This can be optimised easily should we need to.
818            for (key, group) in &mut active_requests.requests {
819                if group.group_or_request == group_or_request {
820                    if group.req_id.is_some() {
821                        // We have already received a request tagged as last.
822                        if group.status == zx::Status::OK {
823                            group.status = zx::Status::INVALID_ARGS;
824                        }
825                        // Ignore this request.
826                        return Err(None);
827                    }
828                    // See if this is a continuation of a decompressed read.
829                    if group.status == zx::Status::OK
830                        && let Some(info) = &mut group.decompression_info
831                    {
832                        if let Ok(Operation::Read {
833                            device_block_offset,
834                            mut block_count,
835                            options,
836                            vmo_offset: 0,
837                            ..
838                        }) = operation
839                        {
840                            let remaining_bytes = info
841                                .compressed_range
842                                .end
843                                .next_multiple_of(self.block_size as usize)
844                                as u64
845                                - info.bytes_so_far;
846                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
847                                || request.total_compressed_bytes != 0
848                                || request.uncompressed_bytes != 0
849                                || request.compressed_prefix_bytes != 0
850                                || (flags.contains(BlockIoFlag::GROUP_LAST)
851                                    && info.bytes_so_far + request_bytes
852                                        < info.compressed_range.end as u64)
853                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
854                                    && request_bytes >= remaining_bytes)
855                            {
856                                group.status = zx::Status::INVALID_ARGS;
857                            } else {
858                                // We are tolerant of `block_count` being more than we actually
859                                // need.  This can happen if the client is working with a larger
860                                // block size than the device block size.  For example, if Blobfs
861                                // has a 8192 byte block size, but the device might has a 512 byte
862                                // block size, it can ask for a multiple of 16 blocks, when fewer
863                                // than that might actually be required to hold the compressed data.
864                                // It is easier for us to tolerate this here than to get Blobfs to
865                                // change to pass only the blocks that are required.
866                                if request_bytes > remaining_bytes {
867                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
868                                }
869
870                                operation = Ok(Operation::ContinueDecompressedRead {
871                                    offset: info.bytes_so_far,
872                                    device_block_offset,
873                                    block_count,
874                                    options,
875                                });
876
877                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
878                            }
879                        } else {
880                            group.status = zx::Status::INVALID_ARGS;
881                        }
882                    }
883                    if flags.contains(BlockIoFlag::GROUP_LAST) {
884                        group.req_id = Some(request.reqid);
885                        // If the group has had an error, there is no point trying to issue this
886                        // request.
887                        if group.status != zx::Status::OK {
888                            operation = Err(group.status);
889                        }
890                    } else if group.status != zx::Status::OK {
891                        // The group has already encountered an error, so there is no point trying
892                        // to issue this request.
893                        return Err(None);
894                    }
895                    request_id = Some(RequestId(key));
896                    group.count += 1;
897                    break;
898                }
899            }
900        }
901
902        let is_single_request =
903            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
904
905        let mut decompression_info = None;
906        let vmo = match operation {
907            Ok(Operation::Read {
908                device_block_offset,
909                mut block_count,
910                options,
911                vmo_offset,
912                ..
913            }) => match self.vmos.lock().get_mut(&request.vmoid) {
914                Some((vmo, mapping)) => {
915                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
916                        let compressed_range = request.compressed_prefix_bytes as usize
917                            ..request.compressed_prefix_bytes as usize
918                                + request.total_compressed_bytes as usize;
919                        let required_buffer_size =
920                            compressed_range.end.next_multiple_of(self.block_size as usize);
921
922                        // Validate the initial decompression request.
923                        if compressed_range.start >= compressed_range.end
924                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
925                            || (is_single_request && request_bytes < compressed_range.end as u64)
926                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
927                        {
928                            Err(zx::Status::INVALID_ARGS)
929                        } else {
930                            // We are tolerant of `block_count` being more than we actually need.
931                            // This can happen if the client is working in a larger block size than
932                            // the device block size.  For example, Blobfs has a 8192 byte block
933                            // size, but the device might have a 512 byte block size.  It is easier
934                            // for us to tolerate this here than to get Blobfs to change to pass
935                            // only the blocks that are required.
936                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
937                                block_count =
938                                    (required_buffer_size / self.block_size as usize) as u32;
939                                required_buffer_size as u64
940                            } else {
941                                request_bytes
942                            };
943
944                            // To decompress, we need to have the target VMO mapped (cached).
945                            match mapping {
946                                Some(mapping) => Ok(mapping.clone()),
947                                None => {
948                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
949                                }
950                            }
951                            .and_then(|mapping| {
952                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
953                                // range.
954                                if vmo_offset
955                                    .checked_add(request.uncompressed_bytes as u64)
956                                    .is_some_and(|end| end <= mapping.size as u64)
957                                {
958                                    Ok(mapping)
959                                } else {
960                                    Err(zx::Status::OUT_OF_RANGE)
961                                }
962                            })
963                            .map(|mapping| {
964                                // Convert the operation into a `StartDecompressedRead`
965                                // operation. For non-fragmented requests, this will be the only
966                                // operation, but if it's a fragmented read,
967                                // `ContinueDecompressedRead` operations will follow.
968                                operation = Ok(Operation::StartDecompressedRead {
969                                    required_buffer_size,
970                                    device_block_offset,
971                                    block_count,
972                                    options,
973                                });
974                                // Record sufficient information so that we can decompress when all
975                                // the requests complete.
976                                decompression_info = Some(DecompressionInfo {
977                                    compressed_range,
978                                    bytes_so_far,
979                                    mapping,
980                                    uncompressed_range: vmo_offset
981                                        ..vmo_offset + request.uncompressed_bytes as u64,
982                                    buffer: None,
983                                });
984                                None
985                            })
986                        }
987                    } else {
988                        Ok(Some(vmo.clone()))
989                    }
990                }
991                None => Err(zx::Status::IO),
992            },
993            Ok(Operation::Write { .. }) => self
994                .vmos
995                .lock()
996                .get(&request.vmoid)
997                .cloned()
998                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
999            Ok(Operation::CloseVmo) => {
1000                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
1001                    let vmo_clone = vmo.clone();
1002                    // Make sure the VMO is dropped after all current Epoch guards have been
1003                    // dropped.
1004                    Epoch::global().defer(move || drop(vmo_clone));
1005                    Ok(Some(vmo))
1006                })
1007            }
1008            _ => Ok(None),
1009        }
1010        .unwrap_or_else(|e| {
1011            operation = Err(e);
1012            None
1013        });
1014
1015        let trace_flow_id = NonZero::new(request.trace_flow_id);
1016        let request_id = request_id.unwrap_or_else(|| {
1017            RequestId(active_requests.requests.insert(ActiveRequest {
1018                session,
1019                group_or_request,
1020                trace_flow_id,
1021                _epoch_guard: Epoch::global().guard(),
1022                status: zx::Status::OK,
1023                count: 1,
1024                req_id: is_single_request.then_some(request.reqid),
1025                decompression_info,
1026            }))
1027        });
1028
1029        Ok(DecodedRequest {
1030            request_id,
1031            trace_flow_id,
1032            operation: operation.map_err(|status| {
1033                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1034            })?,
1035            vmo,
1036        })
1037    }
1038
1039    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1040        std::mem::take(&mut *self.vmos.lock())
1041    }
1042
1043    /// Maps the request and returns the mapped request with an optional remainder.
1044    fn map_request(
1045        &self,
1046        mut request: DecodedRequest,
1047        active_request: &mut ActiveRequest<SM::Session>,
1048    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1049        if active_request.status != zx::Status::OK {
1050            return Err(zx::Status::BAD_STATE);
1051        }
1052        let mapping = self.offset_map.mapping();
1053        match (mapping, request.operation.blocks()) {
1054            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1055                return Err(zx::Status::OUT_OF_RANGE);
1056            }
1057            _ => {}
1058        }
1059        let remainder = request.operation.map(
1060            self.offset_map.mapping(),
1061            self.offset_map.max_transfer_blocks(),
1062            self.block_size,
1063        );
1064        if remainder.is_some() {
1065            active_request.count += 1;
1066        }
1067        static CACHE: AtomicU64 = AtomicU64::new(0);
1068        if let Some(context) =
1069            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1070        {
1071            use fuchsia_trace::ArgValue;
1072            let trace_args = [
1073                ArgValue::of("request_id", request.request_id.0),
1074                ArgValue::of("opcode", request.operation.trace_label()),
1075            ];
1076            let _scope =
1077                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1078            if let Some(trace_flow_id) = active_request.trace_flow_id {
1079                fuchsia_trace::flow_step(
1080                    &context,
1081                    "block_server::start_transaction",
1082                    trace_flow_id.get().into(),
1083                    &[],
1084                );
1085            }
1086        }
1087        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1088        Ok((request, remainder))
1089    }
1090
1091    /// Drops all requests for which `pred` is true.
1092    ///
1093    /// NOTE: This should only be called once we are certain that the requests will not be
1094    /// completed asynchronously  Otherwise, requests might be completed twice.
1095    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1096        self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1097    }
1098
1099    /// Closes all grouped requests for which `pred` is true and which are held open pending the
1100    /// completion of their group.
1101    ///
1102    /// Normally, a request is dropped from ActiveRequests when it is completed.  However, if a
1103    /// request is part of a group, it will not be dropped until a request with GROUP_LAST arrives.
1104    /// If we're shutting down a session, the client may not ever send the GROUP_LAST, so we need to
1105    /// be sure to close these grouped requests.
1106    ///
1107    /// This is called during session shutdown in situations where [`Self::drop_active_requests`]
1108    /// cannot be used (e.g. for the callback interface, which hands off the responsibility of
1109    /// completing requests to its concrete implementation and cannot control when requests are
1110    /// completed relative to session shutdown).
1111    fn close_active_groups(&self, pred: impl Fn(&SM::Session) -> bool) {
1112        self.session_manager().active_requests().0.lock().requests.retain(|_, request| {
1113            if !pred(&request.session) || request.req_id.is_some() {
1114                return true;
1115            }
1116            // Mark the group as completed, and immediately drop any which have no outstanding
1117            // requests (since they will otherwise never be dropped).
1118            request.req_id = Some(u32::MAX);
1119            request.count > 0
1120        });
1121    }
1122}
1123
1124#[repr(transparent)]
1125#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1126pub struct RequestId(usize);
1127
1128#[derive(Clone, Debug)]
1129struct DecodedRequest {
1130    request_id: RequestId,
1131    trace_flow_id: TraceFlowId,
1132    operation: Operation,
1133    vmo: Option<Arc<zx::Vmo>>,
1134}
1135
1136/// cbindgen:no-export
1137pub type WriteFlags = block_protocol::WriteFlags;
1138pub type WriteOptions = block_protocol::WriteOptions;
1139pub type ReadOptions = block_protocol::ReadOptions;
1140pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1141
1142#[repr(C)]
1143#[derive(Clone, Debug, PartialEq, Eq)]
1144pub enum Operation {
1145    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1146    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1147    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1148    // write variant).
1149    Read {
1150        device_block_offset: u64,
1151        block_count: u32,
1152        _unused: u32,
1153        vmo_offset: u64,
1154        options: ReadOptions,
1155    },
1156    Write {
1157        device_block_offset: u64,
1158        block_count: u32,
1159        _unused: u32,
1160        vmo_offset: u64,
1161        options: WriteOptions,
1162    },
1163    Flush,
1164    Trim {
1165        device_block_offset: u64,
1166        block_count: u32,
1167    },
1168    /// This will never be seen by the C interface.
1169    CloseVmo,
1170    /// This will never be seen by the C interface.
1171    StartDecompressedRead {
1172        required_buffer_size: usize,
1173        device_block_offset: u64,
1174        block_count: u32,
1175        options: ReadOptions,
1176    },
1177    /// This will never be seen by the C interface.
1178    ContinueDecompressedRead {
1179        offset: u64,
1180        device_block_offset: u64,
1181        block_count: u32,
1182        options: ReadOptions,
1183    },
1184}
1185
1186impl Operation {
1187    fn trace_label(&self) -> &'static str {
1188        match self {
1189            Operation::Read { .. } => "read",
1190            Operation::Write { .. } => "write",
1191            Operation::Flush { .. } => "flush",
1192            Operation::Trim { .. } => "trim",
1193            Operation::CloseVmo { .. } => "close_vmo",
1194            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1195            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1196        }
1197    }
1198
1199    /// Returns (offset, length).
1200    fn blocks(&self) -> Option<(u64, u32)> {
1201        match self {
1202            Operation::Read { device_block_offset, block_count, .. }
1203            | Operation::Write { device_block_offset, block_count, .. }
1204            | Operation::Trim { device_block_offset, block_count, .. } => {
1205                Some((*device_block_offset, *block_count))
1206            }
1207            _ => None,
1208        }
1209    }
1210
1211    /// Returns mutable references to (offset, length).
1212    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1213        match self {
1214            Operation::Read { device_block_offset, block_count, .. }
1215            | Operation::Write { device_block_offset, block_count, .. }
1216            | Operation::Trim { device_block_offset, block_count, .. } => {
1217                Some((device_block_offset, block_count))
1218            }
1219            _ => None,
1220        }
1221    }
1222
1223    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1224    /// start of the operation.
1225    fn map(
1226        &mut self,
1227        mapping: Option<&BlockOffsetMapping>,
1228        max_blocks: Option<NonZero<u32>>,
1229        block_size: u32,
1230    ) -> Option<Self> {
1231        let mut max = match self {
1232            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1233            _ => None,
1234        };
1235        let (offset, length) = self.blocks_mut()?;
1236        let orig_offset = *offset;
1237        if let Some(mapping) = mapping {
1238            let delta = *offset - mapping.source_block_offset;
1239            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1240            *offset = mapping.target_block_offset + delta;
1241            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1242            max = match max {
1243                None => Some(mapping_max),
1244                Some(m) => Some(std::cmp::min(m, mapping_max)),
1245            };
1246        };
1247        if let Some(max) = max {
1248            if *length as u64 > max {
1249                let rem = (*length as u64 - max) as u32;
1250                *length = max as u32;
1251                return Some(match self {
1252                    Operation::Read {
1253                        device_block_offset: _,
1254                        block_count: _,
1255                        vmo_offset,
1256                        _unused,
1257                        options,
1258                    } => {
1259                        let mut options = *options;
1260                        options.inline_crypto.dun += max as u32;
1261                        Operation::Read {
1262                            device_block_offset: orig_offset + max,
1263                            block_count: rem,
1264                            vmo_offset: *vmo_offset + max * block_size as u64,
1265                            _unused: *_unused,
1266                            options: options,
1267                        }
1268                    }
1269                    Operation::Write {
1270                        device_block_offset: _,
1271                        block_count: _,
1272                        _unused,
1273                        vmo_offset,
1274                        options,
1275                    } => {
1276                        let mut options = *options;
1277                        options.inline_crypto.dun += max as u32;
1278                        Operation::Write {
1279                            device_block_offset: orig_offset + max,
1280                            block_count: rem,
1281                            _unused: *_unused,
1282                            vmo_offset: *vmo_offset + max * block_size as u64,
1283                            options: options,
1284                        }
1285                    }
1286                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1287                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1288                    }
1289                    _ => unreachable!(),
1290                });
1291            }
1292        }
1293        None
1294    }
1295
1296    /// Returns true if the specified write flags are set.
1297    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1298        if let Operation::Write { options, .. } = self {
1299            options.flags.contains(value)
1300        } else {
1301            false
1302        }
1303    }
1304
1305    /// Removes `value` from the request's write flags and returns true if the flag was set.
1306    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1307        if let Operation::Write { options, .. } = self {
1308            let result = options.flags.contains(value);
1309            options.flags.remove(value);
1310            result
1311        } else {
1312            false
1313        }
1314    }
1315}
1316
1317#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1318pub enum GroupOrRequest {
1319    Group(u16),
1320    Request(u32),
1321}
1322
1323impl GroupOrRequest {
1324    fn is_group(&self) -> bool {
1325        matches!(self, Self::Group(_))
1326    }
1327
1328    fn group_id(&self) -> Option<u16> {
1329        match self {
1330            Self::Group(id) => Some(*id),
1331            Self::Request(_) => None,
1332        }
1333    }
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338    use super::{
1339        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1340        TraceFlowId,
1341    };
1342    use assert_matches::assert_matches;
1343    use block_protocol::{
1344        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1345        WriteFlags, WriteOptions,
1346    };
1347    use fidl_fuchsia_storage_block as fblock;
1348    use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1349    use fuchsia_async as fasync;
1350    use fuchsia_sync::Mutex;
1351    use futures::FutureExt as _;
1352    use futures::channel::oneshot;
1353    use futures::future::BoxFuture;
1354    use std::borrow::Cow;
1355    use std::future::poll_fn;
1356    use std::num::NonZero;
1357    use std::pin::pin;
1358    use std::sync::Arc;
1359    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1360    use std::task::{Context, Poll};
1361    use zx::HandleBased as _;
1362
1363    #[derive(Default)]
1364    struct MockInterface {
1365        read_hook: Option<
1366            Box<
1367                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1368                    + Send
1369                    + Sync,
1370            >,
1371        >,
1372        write_hook:
1373            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1374        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1375    }
1376
1377    impl super::async_interface::Interface for MockInterface {
1378        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1379            Ok(())
1380        }
1381
1382        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1383            Cow::Owned(test_device_info())
1384        }
1385
1386        async fn read(
1387            &self,
1388            device_block_offset: u64,
1389            block_count: u32,
1390            vmo: &Arc<zx::Vmo>,
1391            vmo_offset: u64,
1392            _opts: ReadOptions,
1393            _trace_flow_id: TraceFlowId,
1394        ) -> Result<(), zx::Status> {
1395            if let Some(read_hook) = &self.read_hook {
1396                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1397            } else {
1398                unimplemented!();
1399            }
1400        }
1401
1402        async fn write(
1403            &self,
1404            device_block_offset: u64,
1405            _block_count: u32,
1406            _vmo: &Arc<zx::Vmo>,
1407            _vmo_offset: u64,
1408            opts: WriteOptions,
1409            _trace_flow_id: TraceFlowId,
1410        ) -> Result<(), zx::Status> {
1411            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1412                && let Some(barrier_hook) = &self.barrier_hook
1413            {
1414                barrier_hook()?;
1415            }
1416            if let Some(write_hook) = &self.write_hook {
1417                write_hook(device_block_offset).await
1418            } else {
1419                unimplemented!();
1420            }
1421        }
1422
1423        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1424            Ok(())
1425        }
1426
1427        async fn trim(
1428            &self,
1429            _device_block_offset: u64,
1430            _block_count: u32,
1431            _trace_flow_id: TraceFlowId,
1432        ) -> Result<(), zx::Status> {
1433            unreachable!();
1434        }
1435
1436        async fn get_volume_info(
1437            &self,
1438        ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1439            // Hang forever for the test_requests_dont_block_sessions test.
1440            let () = std::future::pending().await;
1441            unreachable!();
1442        }
1443    }
1444
1445    const BLOCK_SIZE: u32 = 512;
1446    const MAX_TRANSFER_BLOCKS: u32 = 10;
1447
1448    fn test_device_info() -> DeviceInfo {
1449        DeviceInfo::Partition(PartitionInfo {
1450            device_flags: fblock::DeviceFlag::READONLY
1451                | fblock::DeviceFlag::BARRIER_SUPPORT
1452                | fblock::DeviceFlag::FUA_SUPPORT,
1453            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1454            block_range: Some(0..100),
1455            type_guid: [1; 16],
1456            instance_guid: [2; 16],
1457            name: "foo".to_string(),
1458            flags: 0xabcd,
1459        })
1460    }
1461
1462    #[fuchsia::test]
1463    async fn test_barriers_ordering() {
1464        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1465        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1466        let barrier_called = Arc::new(AtomicBool::new(false));
1467
1468        futures::join!(
1469            async move {
1470                let barrier_called_clone = barrier_called.clone();
1471                let block_server = BlockServer::new(
1472                    BLOCK_SIZE,
1473                    Arc::new(MockInterface {
1474                        barrier_hook: Some(Box::new(move || {
1475                            barrier_called.store(true, Ordering::Relaxed);
1476                            Ok(())
1477                        })),
1478                        write_hook: Some(Box::new(move |device_block_offset| {
1479                            let barrier_called = barrier_called_clone.clone();
1480                            Box::pin(async move {
1481                                // The sleep allows the server to reorder the fifo requests.
1482                                if device_block_offset % 2 == 0 {
1483                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1484                                        zx::MonotonicDuration::from_millis(200),
1485                                    ))
1486                                    .await;
1487                                }
1488                                assert!(barrier_called.load(Ordering::Relaxed));
1489                                Ok(())
1490                            })
1491                        })),
1492                        ..MockInterface::default()
1493                    }),
1494                );
1495                block_server.handle_requests(stream).await.unwrap();
1496            },
1497            async move {
1498                let (session_proxy, server) = fidl::endpoints::create_proxy();
1499
1500                proxy.open_session(server).unwrap();
1501
1502                let vmo_id = session_proxy
1503                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1504                    .await
1505                    .unwrap()
1506                    .unwrap();
1507                assert_ne!(vmo_id.id, 0);
1508
1509                let mut fifo =
1510                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1511                let (mut reader, mut writer) = fifo.async_io();
1512
1513                writer
1514                    .write_entries(&BlockFifoRequest {
1515                        command: BlockFifoCommand {
1516                            opcode: BlockOpcode::Write.into_primitive(),
1517                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1518                            ..Default::default()
1519                        },
1520                        vmoid: vmo_id.id,
1521                        dev_offset: 0,
1522                        length: 5,
1523                        vmo_offset: 6,
1524                        ..Default::default()
1525                    })
1526                    .await
1527                    .unwrap();
1528
1529                for i in 0..10 {
1530                    writer
1531                        .write_entries(&BlockFifoRequest {
1532                            command: BlockFifoCommand {
1533                                opcode: BlockOpcode::Write.into_primitive(),
1534                                ..Default::default()
1535                            },
1536                            vmoid: vmo_id.id,
1537                            dev_offset: i + 1,
1538                            length: 5,
1539                            vmo_offset: 6,
1540                            ..Default::default()
1541                        })
1542                        .await
1543                        .unwrap();
1544                }
1545                for _ in 0..11 {
1546                    let mut response = BlockFifoResponse::default();
1547                    reader.read_entries(&mut response).await.unwrap();
1548                    assert_eq!(response.status, zx::sys::ZX_OK);
1549                }
1550
1551                std::mem::drop(proxy);
1552            }
1553        );
1554    }
1555
1556    #[fuchsia::test]
1557    async fn test_info() {
1558        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1559
1560        futures::join!(
1561            async {
1562                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1563                block_server.handle_requests(stream).await.unwrap();
1564            },
1565            async {
1566                let expected_info = test_device_info();
1567                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1568                    info
1569                } else {
1570                    unreachable!()
1571                };
1572
1573                let block_info = proxy.get_info().await.unwrap().unwrap();
1574                assert_eq!(
1575                    block_info.block_count,
1576                    partition_info.block_range.as_ref().unwrap().end
1577                        - partition_info.block_range.as_ref().unwrap().start
1578                );
1579                assert_eq!(
1580                    block_info.flags,
1581                    fblock::DeviceFlag::READONLY
1582                        | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1583                        | fblock::DeviceFlag::BARRIER_SUPPORT
1584                        | fblock::DeviceFlag::FUA_SUPPORT
1585                );
1586
1587                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1588
1589                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1590                assert_eq!(status, zx::sys::ZX_OK);
1591                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1592
1593                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1594                assert_eq!(status, zx::sys::ZX_OK);
1595                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1596
1597                let (status, name) = proxy.get_name().await.unwrap();
1598                assert_eq!(status, zx::sys::ZX_OK);
1599                assert_eq!(name.as_ref(), Some(&partition_info.name));
1600
1601                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1602                assert_eq!(metadata.name, name);
1603                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1604                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1605                assert_eq!(
1606                    metadata.start_block_offset,
1607                    Some(partition_info.block_range.as_ref().unwrap().start)
1608                );
1609                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1610                assert_eq!(metadata.flags, Some(partition_info.flags));
1611
1612                std::mem::drop(proxy);
1613            }
1614        );
1615    }
1616
1617    #[fuchsia::test]
1618    async fn test_attach_vmo() {
1619        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1620
1621        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1622        let koid = vmo.koid().unwrap();
1623
1624        futures::join!(
1625            async {
1626                let block_server = BlockServer::new(
1627                    BLOCK_SIZE,
1628                    Arc::new(MockInterface {
1629                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1630                            assert_eq!(vmo.koid().unwrap(), koid);
1631                            Box::pin(async { Ok(()) })
1632                        })),
1633                        ..MockInterface::default()
1634                    }),
1635                );
1636                block_server.handle_requests(stream).await.unwrap();
1637            },
1638            async move {
1639                let (session_proxy, server) = fidl::endpoints::create_proxy();
1640
1641                proxy.open_session(server).unwrap();
1642
1643                let vmo_id = session_proxy
1644                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1645                    .await
1646                    .unwrap()
1647                    .unwrap();
1648                assert_ne!(vmo_id.id, 0);
1649
1650                let mut fifo =
1651                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1652                let (mut reader, mut writer) = fifo.async_io();
1653
1654                // Keep attaching VMOs until we eventually hit the maximum.
1655                let mut count = 1;
1656                loop {
1657                    match session_proxy
1658                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1659                        .await
1660                        .unwrap()
1661                    {
1662                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1663                        Err(e) => {
1664                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1665                            break;
1666                        }
1667                    }
1668
1669                    // Only test every 10 to keep test time down.
1670                    if count % 10 == 0 {
1671                        writer
1672                            .write_entries(&BlockFifoRequest {
1673                                command: BlockFifoCommand {
1674                                    opcode: BlockOpcode::Read.into_primitive(),
1675                                    ..Default::default()
1676                                },
1677                                vmoid: vmo_id.id,
1678                                length: 1,
1679                                ..Default::default()
1680                            })
1681                            .await
1682                            .unwrap();
1683
1684                        let mut response = BlockFifoResponse::default();
1685                        reader.read_entries(&mut response).await.unwrap();
1686                        assert_eq!(response.status, zx::sys::ZX_OK);
1687                    }
1688
1689                    count += 1;
1690                }
1691
1692                assert_eq!(count, u16::MAX as u64);
1693
1694                // Detach the original VMO, and make sure we can then attach another one.
1695                writer
1696                    .write_entries(&BlockFifoRequest {
1697                        command: BlockFifoCommand {
1698                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1699                            ..Default::default()
1700                        },
1701                        vmoid: vmo_id.id,
1702                        ..Default::default()
1703                    })
1704                    .await
1705                    .unwrap();
1706
1707                let mut response = BlockFifoResponse::default();
1708                reader.read_entries(&mut response).await.unwrap();
1709                assert_eq!(response.status, zx::sys::ZX_OK);
1710
1711                let new_vmo_id = session_proxy
1712                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1713                    .await
1714                    .unwrap()
1715                    .unwrap();
1716                // It should reuse the same ID.
1717                assert_eq!(new_vmo_id.id, vmo_id.id);
1718
1719                std::mem::drop(proxy);
1720            }
1721        );
1722    }
1723
1724    #[fuchsia::test]
1725    async fn test_close() {
1726        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1727
1728        let mut server = std::pin::pin!(
1729            async {
1730                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1731                block_server.handle_requests(stream).await.unwrap();
1732            }
1733            .fuse()
1734        );
1735
1736        let mut client = std::pin::pin!(
1737            async {
1738                let (session_proxy, server) = fidl::endpoints::create_proxy();
1739
1740                proxy.open_session(server).unwrap();
1741
1742                // Dropping the proxy should not cause the session to terminate because the session is
1743                // still live.
1744                std::mem::drop(proxy);
1745
1746                session_proxy.close().await.unwrap().unwrap();
1747
1748                // Keep the session alive.  Calling `close` should cause the server to terminate.
1749                let _: () = std::future::pending().await;
1750            }
1751            .fuse()
1752        );
1753
1754        futures::select!(
1755            _ = server => {}
1756            _ = client => unreachable!(),
1757        );
1758    }
1759
1760    #[derive(Default)]
1761    struct IoMockInterface {
1762        do_checks: bool,
1763        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1764        return_errors: bool,
1765    }
1766
1767    #[derive(Debug)]
1768    enum ExpectedOp {
1769        Read(u64, u32, u64),
1770        Write(u64, u32, u64),
1771        Trim(u64, u32),
1772        Flush,
1773    }
1774
1775    impl super::async_interface::Interface for IoMockInterface {
1776        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1777            Ok(())
1778        }
1779
1780        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1781            Cow::Owned(test_device_info())
1782        }
1783
1784        async fn read(
1785            &self,
1786            device_block_offset: u64,
1787            block_count: u32,
1788            _vmo: &Arc<zx::Vmo>,
1789            vmo_offset: u64,
1790            _opts: ReadOptions,
1791            _trace_flow_id: TraceFlowId,
1792        ) -> Result<(), zx::Status> {
1793            if self.return_errors {
1794                Err(zx::Status::INTERNAL)
1795            } else {
1796                if self.do_checks {
1797                    assert_matches!(
1798                        self.expected_op.lock().take(),
1799                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1800                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1801                        "Read {device_block_offset} {block_count} {vmo_offset}"
1802                    );
1803                }
1804                Ok(())
1805            }
1806        }
1807
1808        async fn write(
1809            &self,
1810            device_block_offset: u64,
1811            block_count: u32,
1812            _vmo: &Arc<zx::Vmo>,
1813            vmo_offset: u64,
1814            _write_opts: WriteOptions,
1815            _trace_flow_id: TraceFlowId,
1816        ) -> Result<(), zx::Status> {
1817            if self.return_errors {
1818                Err(zx::Status::NOT_SUPPORTED)
1819            } else {
1820                if self.do_checks {
1821                    assert_matches!(
1822                        self.expected_op.lock().take(),
1823                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1824                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1825                        "Write {device_block_offset} {block_count} {vmo_offset}"
1826                    );
1827                }
1828                Ok(())
1829            }
1830        }
1831
1832        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1833            if self.return_errors {
1834                Err(zx::Status::NO_RESOURCES)
1835            } else {
1836                if self.do_checks {
1837                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1838                }
1839                Ok(())
1840            }
1841        }
1842
1843        async fn trim(
1844            &self,
1845            device_block_offset: u64,
1846            block_count: u32,
1847            _trace_flow_id: TraceFlowId,
1848        ) -> Result<(), zx::Status> {
1849            if self.return_errors {
1850                Err(zx::Status::NO_MEMORY)
1851            } else {
1852                if self.do_checks {
1853                    assert_matches!(
1854                        self.expected_op.lock().take(),
1855                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1856                            block_count == b,
1857                        "Trim {device_block_offset} {block_count}"
1858                    );
1859                }
1860                Ok(())
1861            }
1862        }
1863    }
1864
1865    #[fuchsia::test]
1866    async fn test_io() {
1867        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1868
1869        let expected_op = Arc::new(Mutex::new(None));
1870        let expected_op_clone = expected_op.clone();
1871
1872        let server = async {
1873            let block_server = BlockServer::new(
1874                BLOCK_SIZE,
1875                Arc::new(IoMockInterface {
1876                    return_errors: false,
1877                    do_checks: true,
1878                    expected_op: expected_op_clone,
1879                }),
1880            );
1881            block_server.handle_requests(stream).await.unwrap();
1882        };
1883
1884        let client = async move {
1885            let (session_proxy, server) = fidl::endpoints::create_proxy();
1886
1887            proxy.open_session(server).unwrap();
1888
1889            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1890            let vmo_id = session_proxy
1891                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1892                .await
1893                .unwrap()
1894                .unwrap();
1895
1896            let mut fifo =
1897                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1898            let (mut reader, mut writer) = fifo.async_io();
1899
1900            // READ
1901            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1902            writer
1903                .write_entries(&BlockFifoRequest {
1904                    command: BlockFifoCommand {
1905                        opcode: BlockOpcode::Read.into_primitive(),
1906                        ..Default::default()
1907                    },
1908                    vmoid: vmo_id.id,
1909                    dev_offset: 1,
1910                    length: 2,
1911                    vmo_offset: 3,
1912                    ..Default::default()
1913                })
1914                .await
1915                .unwrap();
1916
1917            let mut response = BlockFifoResponse::default();
1918            reader.read_entries(&mut response).await.unwrap();
1919            assert_eq!(response.status, zx::sys::ZX_OK);
1920
1921            // WRITE
1922            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1923            writer
1924                .write_entries(&BlockFifoRequest {
1925                    command: BlockFifoCommand {
1926                        opcode: BlockOpcode::Write.into_primitive(),
1927                        ..Default::default()
1928                    },
1929                    vmoid: vmo_id.id,
1930                    dev_offset: 4,
1931                    length: 5,
1932                    vmo_offset: 6,
1933                    ..Default::default()
1934                })
1935                .await
1936                .unwrap();
1937
1938            let mut response = BlockFifoResponse::default();
1939            reader.read_entries(&mut response).await.unwrap();
1940            assert_eq!(response.status, zx::sys::ZX_OK);
1941
1942            // FLUSH
1943            *expected_op.lock() = Some(ExpectedOp::Flush);
1944            writer
1945                .write_entries(&BlockFifoRequest {
1946                    command: BlockFifoCommand {
1947                        opcode: BlockOpcode::Flush.into_primitive(),
1948                        ..Default::default()
1949                    },
1950                    ..Default::default()
1951                })
1952                .await
1953                .unwrap();
1954
1955            reader.read_entries(&mut response).await.unwrap();
1956            assert_eq!(response.status, zx::sys::ZX_OK);
1957
1958            // TRIM
1959            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1960            writer
1961                .write_entries(&BlockFifoRequest {
1962                    command: BlockFifoCommand {
1963                        opcode: BlockOpcode::Trim.into_primitive(),
1964                        ..Default::default()
1965                    },
1966                    dev_offset: 7,
1967                    length: 8,
1968                    ..Default::default()
1969                })
1970                .await
1971                .unwrap();
1972
1973            reader.read_entries(&mut response).await.unwrap();
1974            assert_eq!(response.status, zx::sys::ZX_OK);
1975
1976            std::mem::drop(proxy);
1977        };
1978
1979        futures::join!(server, client);
1980    }
1981
1982    #[fuchsia::test]
1983    async fn test_io_errors() {
1984        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1985
1986        futures::join!(
1987            async {
1988                let block_server = BlockServer::new(
1989                    BLOCK_SIZE,
1990                    Arc::new(IoMockInterface {
1991                        return_errors: true,
1992                        do_checks: false,
1993                        expected_op: Arc::new(Mutex::new(None)),
1994                    }),
1995                );
1996                block_server.handle_requests(stream).await.unwrap();
1997            },
1998            async move {
1999                let (session_proxy, server) = fidl::endpoints::create_proxy();
2000
2001                proxy.open_session(server).unwrap();
2002
2003                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2004                let vmo_id = session_proxy
2005                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2006                    .await
2007                    .unwrap()
2008                    .unwrap();
2009
2010                let mut fifo =
2011                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2012                let (mut reader, mut writer) = fifo.async_io();
2013
2014                // READ
2015                writer
2016                    .write_entries(&BlockFifoRequest {
2017                        command: BlockFifoCommand {
2018                            opcode: BlockOpcode::Read.into_primitive(),
2019                            ..Default::default()
2020                        },
2021                        vmoid: vmo_id.id,
2022                        length: 1,
2023                        reqid: 1,
2024                        ..Default::default()
2025                    })
2026                    .await
2027                    .unwrap();
2028
2029                let mut response = BlockFifoResponse::default();
2030                reader.read_entries(&mut response).await.unwrap();
2031                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2032
2033                // WRITE
2034                writer
2035                    .write_entries(&BlockFifoRequest {
2036                        command: BlockFifoCommand {
2037                            opcode: BlockOpcode::Write.into_primitive(),
2038                            ..Default::default()
2039                        },
2040                        vmoid: vmo_id.id,
2041                        length: 1,
2042                        reqid: 2,
2043                        ..Default::default()
2044                    })
2045                    .await
2046                    .unwrap();
2047
2048                reader.read_entries(&mut response).await.unwrap();
2049                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2050
2051                // FLUSH
2052                writer
2053                    .write_entries(&BlockFifoRequest {
2054                        command: BlockFifoCommand {
2055                            opcode: BlockOpcode::Flush.into_primitive(),
2056                            ..Default::default()
2057                        },
2058                        reqid: 3,
2059                        ..Default::default()
2060                    })
2061                    .await
2062                    .unwrap();
2063
2064                reader.read_entries(&mut response).await.unwrap();
2065                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2066
2067                // TRIM
2068                writer
2069                    .write_entries(&BlockFifoRequest {
2070                        command: BlockFifoCommand {
2071                            opcode: BlockOpcode::Trim.into_primitive(),
2072                            ..Default::default()
2073                        },
2074                        reqid: 4,
2075                        length: 1,
2076                        ..Default::default()
2077                    })
2078                    .await
2079                    .unwrap();
2080
2081                reader.read_entries(&mut response).await.unwrap();
2082                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2083
2084                std::mem::drop(proxy);
2085            }
2086        );
2087    }
2088
2089    #[fuchsia::test]
2090    async fn test_invalid_args() {
2091        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2092
2093        futures::join!(
2094            async {
2095                let block_server = BlockServer::new(
2096                    BLOCK_SIZE,
2097                    Arc::new(IoMockInterface {
2098                        return_errors: false,
2099                        do_checks: false,
2100                        expected_op: Arc::new(Mutex::new(None)),
2101                    }),
2102                );
2103                block_server.handle_requests(stream).await.unwrap();
2104            },
2105            async move {
2106                let (session_proxy, server) = fidl::endpoints::create_proxy();
2107
2108                proxy.open_session(server).unwrap();
2109
2110                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2111                let vmo_id = session_proxy
2112                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2113                    .await
2114                    .unwrap()
2115                    .unwrap();
2116
2117                let mut fifo =
2118                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2119
2120                async fn test(
2121                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2122                    request: BlockFifoRequest,
2123                ) -> Result<(), zx::Status> {
2124                    let (mut reader, mut writer) = fifo.async_io();
2125                    writer.write_entries(&request).await.unwrap();
2126                    let mut response = BlockFifoResponse::default();
2127                    reader.read_entries(&mut response).await.unwrap();
2128                    zx::Status::ok(response.status)
2129                }
2130
2131                // READ
2132
2133                let good_read_request = || BlockFifoRequest {
2134                    command: BlockFifoCommand {
2135                        opcode: BlockOpcode::Read.into_primitive(),
2136                        ..Default::default()
2137                    },
2138                    length: 1,
2139                    vmoid: vmo_id.id,
2140                    ..Default::default()
2141                };
2142
2143                assert_eq!(
2144                    test(
2145                        &mut fifo,
2146                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2147                    )
2148                    .await,
2149                    Err(zx::Status::IO)
2150                );
2151
2152                assert_eq!(
2153                    test(
2154                        &mut fifo,
2155                        BlockFifoRequest {
2156                            vmo_offset: 0xffff_ffff_ffff_ffff,
2157                            ..good_read_request()
2158                        }
2159                    )
2160                    .await,
2161                    Err(zx::Status::OUT_OF_RANGE)
2162                );
2163
2164                assert_eq!(
2165                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2166                    Err(zx::Status::INVALID_ARGS)
2167                );
2168
2169                // WRITE
2170
2171                let good_write_request = || BlockFifoRequest {
2172                    command: BlockFifoCommand {
2173                        opcode: BlockOpcode::Write.into_primitive(),
2174                        ..Default::default()
2175                    },
2176                    length: 1,
2177                    vmoid: vmo_id.id,
2178                    ..Default::default()
2179                };
2180
2181                assert_eq!(
2182                    test(
2183                        &mut fifo,
2184                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2185                    )
2186                    .await,
2187                    Err(zx::Status::IO)
2188                );
2189
2190                assert_eq!(
2191                    test(
2192                        &mut fifo,
2193                        BlockFifoRequest {
2194                            vmo_offset: 0xffff_ffff_ffff_ffff,
2195                            ..good_write_request()
2196                        }
2197                    )
2198                    .await,
2199                    Err(zx::Status::OUT_OF_RANGE)
2200                );
2201
2202                assert_eq!(
2203                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2204                    Err(zx::Status::INVALID_ARGS)
2205                );
2206
2207                // CLOSE VMO
2208
2209                assert_eq!(
2210                    test(
2211                        &mut fifo,
2212                        BlockFifoRequest {
2213                            command: BlockFifoCommand {
2214                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2215                                ..Default::default()
2216                            },
2217                            vmoid: vmo_id.id + 1,
2218                            ..Default::default()
2219                        }
2220                    )
2221                    .await,
2222                    Err(zx::Status::IO)
2223                );
2224
2225                std::mem::drop(proxy);
2226            }
2227        );
2228    }
2229
2230    #[fuchsia::test]
2231    async fn test_concurrent_requests() {
2232        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2233
2234        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2235        let waiting_readers_clone = waiting_readers.clone();
2236
2237        futures::join!(
2238            async move {
2239                let block_server = BlockServer::new(
2240                    BLOCK_SIZE,
2241                    Arc::new(MockInterface {
2242                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2243                            let (tx, rx) = oneshot::channel();
2244                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2245                            Box::pin(async move {
2246                                let _ = rx.await;
2247                                Ok(())
2248                            })
2249                        })),
2250                        ..MockInterface::default()
2251                    }),
2252                );
2253                block_server.handle_requests(stream).await.unwrap();
2254            },
2255            async move {
2256                let (session_proxy, server) = fidl::endpoints::create_proxy();
2257
2258                proxy.open_session(server).unwrap();
2259
2260                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2261                let vmo_id = session_proxy
2262                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2263                    .await
2264                    .unwrap()
2265                    .unwrap();
2266
2267                let mut fifo =
2268                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2269                let (mut reader, mut writer) = fifo.async_io();
2270
2271                writer
2272                    .write_entries(&BlockFifoRequest {
2273                        command: BlockFifoCommand {
2274                            opcode: BlockOpcode::Read.into_primitive(),
2275                            ..Default::default()
2276                        },
2277                        reqid: 1,
2278                        dev_offset: 1, // Intentionally use the same as `reqid`.
2279                        vmoid: vmo_id.id,
2280                        length: 1,
2281                        ..Default::default()
2282                    })
2283                    .await
2284                    .unwrap();
2285
2286                writer
2287                    .write_entries(&BlockFifoRequest {
2288                        command: BlockFifoCommand {
2289                            opcode: BlockOpcode::Read.into_primitive(),
2290                            ..Default::default()
2291                        },
2292                        reqid: 2,
2293                        dev_offset: 2,
2294                        vmoid: vmo_id.id,
2295                        length: 1,
2296                        ..Default::default()
2297                    })
2298                    .await
2299                    .unwrap();
2300
2301                // Wait till both those entries are pending.
2302                poll_fn(|cx: &mut Context<'_>| {
2303                    if waiting_readers.lock().len() == 2 {
2304                        Poll::Ready(())
2305                    } else {
2306                        // Yield to the executor.
2307                        cx.waker().wake_by_ref();
2308                        Poll::Pending
2309                    }
2310                })
2311                .await;
2312
2313                let mut response = BlockFifoResponse::default();
2314                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2315
2316                let (id, tx) = waiting_readers.lock().pop().unwrap();
2317                tx.send(()).unwrap();
2318
2319                reader.read_entries(&mut response).await.unwrap();
2320                assert_eq!(response.status, zx::sys::ZX_OK);
2321                assert_eq!(response.reqid, id);
2322
2323                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2324
2325                let (id, tx) = waiting_readers.lock().pop().unwrap();
2326                tx.send(()).unwrap();
2327
2328                reader.read_entries(&mut response).await.unwrap();
2329                assert_eq!(response.status, zx::sys::ZX_OK);
2330                assert_eq!(response.reqid, id);
2331            }
2332        );
2333    }
2334
2335    #[fuchsia::test]
2336    async fn test_session_close_is_synchronous() {
2337        use futures::{FutureExt as _, StreamExt as _};
2338
2339        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2340
2341        let (start_tx, mut start_rx) = futures::channel::mpsc::channel(1);
2342        let (finish_tx, finish_rx) = futures::channel::oneshot::channel();
2343        let finish_rx = Arc::new(Mutex::new(Some(finish_rx)));
2344
2345        futures::join!(
2346            async move {
2347                let block_server = BlockServer::new(
2348                    BLOCK_SIZE,
2349                    Arc::new(MockInterface {
2350                        read_hook: Some(Box::new(move |_, _, _, _| {
2351                            let mut start_tx = start_tx.clone();
2352                            let finish_rx = finish_rx.lock().take().unwrap();
2353                            Box::pin(async move {
2354                                start_tx.try_send(()).unwrap();
2355                                let _ = finish_rx.await;
2356                                Ok(())
2357                            })
2358                        })),
2359                        ..MockInterface::default()
2360                    }),
2361                );
2362                block_server.handle_requests(stream).await.unwrap();
2363            },
2364            async move {
2365                let (session_proxy, server) = fidl::endpoints::create_proxy();
2366                proxy.open_session(server).unwrap();
2367
2368                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2369                let vmo_id = session_proxy
2370                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2371                    .await
2372                    .unwrap()
2373                    .unwrap();
2374
2375                let mut fifo = fasync::Fifo::<BlockFifoResponse, BlockFifoRequest>::from_fifo(
2376                    session_proxy.get_fifo().await.unwrap().unwrap(),
2377                );
2378                let (_reader, mut writer) = fifo.async_io();
2379
2380                writer
2381                    .write_entries(&BlockFifoRequest {
2382                        command: BlockFifoCommand {
2383                            opcode: BlockOpcode::Read.into_primitive(),
2384                            ..Default::default()
2385                        },
2386                        reqid: 1,
2387                        vmoid: vmo_id.id,
2388                        length: 1,
2389                        ..Default::default()
2390                    })
2391                    .await
2392                    .unwrap();
2393
2394                // Wait for the read to actually start.
2395                start_rx.next().await.unwrap();
2396
2397                // The close request shouldn't complete yet because the read is still hanging.
2398                let mut close_fut = std::pin::pin!(session_proxy.close().fuse());
2399                let mut timer_fut = std::pin::pin!(
2400                    fasync::Timer::new(std::time::Duration::from_millis(100)).fuse()
2401                );
2402                futures::select! {
2403                    res = close_fut => panic!("close completed too early: {:?}", res),
2404                    _ = timer_fut => {}
2405                }
2406
2407                // Finish the pending request.
2408                finish_tx.send(()).unwrap();
2409
2410                // Verify that close() now completes.
2411                close_fut.await.unwrap().unwrap();
2412
2413                std::mem::drop(proxy);
2414            }
2415        );
2416    }
2417
2418    #[fuchsia::test]
2419    async fn test_groups() {
2420        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2421
2422        futures::join!(
2423            async move {
2424                let block_server = BlockServer::new(
2425                    BLOCK_SIZE,
2426                    Arc::new(MockInterface {
2427                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2428                        ..MockInterface::default()
2429                    }),
2430                );
2431                block_server.handle_requests(stream).await.unwrap();
2432            },
2433            async move {
2434                let (session_proxy, server) = fidl::endpoints::create_proxy();
2435
2436                proxy.open_session(server).unwrap();
2437
2438                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2439                let vmo_id = session_proxy
2440                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2441                    .await
2442                    .unwrap()
2443                    .unwrap();
2444
2445                let mut fifo =
2446                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2447                let (mut reader, mut writer) = fifo.async_io();
2448
2449                writer
2450                    .write_entries(&BlockFifoRequest {
2451                        command: BlockFifoCommand {
2452                            opcode: BlockOpcode::Read.into_primitive(),
2453                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2454                            ..Default::default()
2455                        },
2456                        group: 1,
2457                        vmoid: vmo_id.id,
2458                        length: 1,
2459                        ..Default::default()
2460                    })
2461                    .await
2462                    .unwrap();
2463
2464                writer
2465                    .write_entries(&BlockFifoRequest {
2466                        command: BlockFifoCommand {
2467                            opcode: BlockOpcode::Read.into_primitive(),
2468                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2469                            ..Default::default()
2470                        },
2471                        reqid: 2,
2472                        group: 1,
2473                        vmoid: vmo_id.id,
2474                        length: 1,
2475                        ..Default::default()
2476                    })
2477                    .await
2478                    .unwrap();
2479
2480                let mut response = BlockFifoResponse::default();
2481                reader.read_entries(&mut response).await.unwrap();
2482                assert_eq!(response.status, zx::sys::ZX_OK);
2483                assert_eq!(response.reqid, 2);
2484                assert_eq!(response.group, 1);
2485            }
2486        );
2487    }
2488
2489    #[fuchsia::test]
2490    async fn test_group_error() {
2491        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2492
2493        let counter = Arc::new(AtomicU64::new(0));
2494        let counter_clone = counter.clone();
2495
2496        futures::join!(
2497            async move {
2498                let block_server = BlockServer::new(
2499                    BLOCK_SIZE,
2500                    Arc::new(MockInterface {
2501                        read_hook: Some(Box::new(move |_, _, _, _| {
2502                            counter_clone.fetch_add(1, Ordering::Relaxed);
2503                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2504                        })),
2505                        ..MockInterface::default()
2506                    }),
2507                );
2508                block_server.handle_requests(stream).await.unwrap();
2509            },
2510            async move {
2511                let (session_proxy, server) = fidl::endpoints::create_proxy();
2512
2513                proxy.open_session(server).unwrap();
2514
2515                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2516                let vmo_id = session_proxy
2517                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2518                    .await
2519                    .unwrap()
2520                    .unwrap();
2521
2522                let mut fifo =
2523                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2524                let (mut reader, mut writer) = fifo.async_io();
2525
2526                writer
2527                    .write_entries(&BlockFifoRequest {
2528                        command: BlockFifoCommand {
2529                            opcode: BlockOpcode::Read.into_primitive(),
2530                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2531                            ..Default::default()
2532                        },
2533                        group: 1,
2534                        vmoid: vmo_id.id,
2535                        length: 1,
2536                        ..Default::default()
2537                    })
2538                    .await
2539                    .unwrap();
2540
2541                // Wait until processed.
2542                poll_fn(|cx: &mut Context<'_>| {
2543                    if counter.load(Ordering::Relaxed) == 1 {
2544                        Poll::Ready(())
2545                    } else {
2546                        // Yield to the executor.
2547                        cx.waker().wake_by_ref();
2548                        Poll::Pending
2549                    }
2550                })
2551                .await;
2552
2553                let mut response = BlockFifoResponse::default();
2554                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2555
2556                writer
2557                    .write_entries(&BlockFifoRequest {
2558                        command: BlockFifoCommand {
2559                            opcode: BlockOpcode::Read.into_primitive(),
2560                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2561                            ..Default::default()
2562                        },
2563                        group: 1,
2564                        vmoid: vmo_id.id,
2565                        length: 1,
2566                        ..Default::default()
2567                    })
2568                    .await
2569                    .unwrap();
2570
2571                writer
2572                    .write_entries(&BlockFifoRequest {
2573                        command: BlockFifoCommand {
2574                            opcode: BlockOpcode::Read.into_primitive(),
2575                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2576                            ..Default::default()
2577                        },
2578                        reqid: 2,
2579                        group: 1,
2580                        vmoid: vmo_id.id,
2581                        length: 1,
2582                        ..Default::default()
2583                    })
2584                    .await
2585                    .unwrap();
2586
2587                reader.read_entries(&mut response).await.unwrap();
2588                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2589                assert_eq!(response.reqid, 2);
2590                assert_eq!(response.group, 1);
2591
2592                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2593
2594                // Only the first request should have been processed.
2595                assert_eq!(counter.load(Ordering::Relaxed), 1);
2596            }
2597        );
2598    }
2599
2600    #[fuchsia::test]
2601    async fn test_group_with_two_lasts() {
2602        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2603
2604        let (tx, rx) = oneshot::channel();
2605
2606        futures::join!(
2607            async move {
2608                let rx = Mutex::new(Some(rx));
2609                let block_server = BlockServer::new(
2610                    BLOCK_SIZE,
2611                    Arc::new(MockInterface {
2612                        read_hook: Some(Box::new(move |_, _, _, _| {
2613                            let rx = rx.lock().take().unwrap();
2614                            Box::pin(async {
2615                                let _ = rx.await;
2616                                Ok(())
2617                            })
2618                        })),
2619                        ..MockInterface::default()
2620                    }),
2621                );
2622                block_server.handle_requests(stream).await.unwrap();
2623            },
2624            async move {
2625                let (session_proxy, server) = fidl::endpoints::create_proxy();
2626
2627                proxy.open_session(server).unwrap();
2628
2629                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2630                let vmo_id = session_proxy
2631                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2632                    .await
2633                    .unwrap()
2634                    .unwrap();
2635
2636                let mut fifo =
2637                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2638                let (mut reader, mut writer) = fifo.async_io();
2639
2640                writer
2641                    .write_entries(&BlockFifoRequest {
2642                        command: BlockFifoCommand {
2643                            opcode: BlockOpcode::Read.into_primitive(),
2644                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2645                            ..Default::default()
2646                        },
2647                        reqid: 1,
2648                        group: 1,
2649                        vmoid: vmo_id.id,
2650                        length: 1,
2651                        ..Default::default()
2652                    })
2653                    .await
2654                    .unwrap();
2655
2656                writer
2657                    .write_entries(&BlockFifoRequest {
2658                        command: BlockFifoCommand {
2659                            opcode: BlockOpcode::Read.into_primitive(),
2660                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2661                            ..Default::default()
2662                        },
2663                        reqid: 2,
2664                        group: 1,
2665                        vmoid: vmo_id.id,
2666                        length: 1,
2667                        ..Default::default()
2668                    })
2669                    .await
2670                    .unwrap();
2671
2672                // Send an independent request to flush through the fifo.
2673                writer
2674                    .write_entries(&BlockFifoRequest {
2675                        command: BlockFifoCommand {
2676                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2677                            ..Default::default()
2678                        },
2679                        reqid: 3,
2680                        vmoid: vmo_id.id,
2681                        ..Default::default()
2682                    })
2683                    .await
2684                    .unwrap();
2685
2686                // It should succeed.
2687                let mut response = BlockFifoResponse::default();
2688                reader.read_entries(&mut response).await.unwrap();
2689                assert_eq!(response.status, zx::sys::ZX_OK);
2690                assert_eq!(response.reqid, 3);
2691
2692                // Now release the original request.
2693                tx.send(()).unwrap();
2694
2695                // The response should be for the first message tagged as last, and it should be
2696                // an error because we sent two messages with the LAST marker.
2697                let mut response = BlockFifoResponse::default();
2698                reader.read_entries(&mut response).await.unwrap();
2699                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2700                assert_eq!(response.reqid, 1);
2701                assert_eq!(response.group, 1);
2702            }
2703        );
2704    }
2705
2706    #[fuchsia::test(allow_stalls = false)]
2707    async fn test_requests_dont_block_sessions() {
2708        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2709
2710        let (tx, rx) = oneshot::channel();
2711
2712        fasync::Task::local(async move {
2713            let rx = Mutex::new(Some(rx));
2714            let block_server = BlockServer::new(
2715                BLOCK_SIZE,
2716                Arc::new(MockInterface {
2717                    read_hook: Some(Box::new(move |_, _, _, _| {
2718                        let rx = rx.lock().take().unwrap();
2719                        Box::pin(async {
2720                            let _ = rx.await;
2721                            Ok(())
2722                        })
2723                    })),
2724                    ..MockInterface::default()
2725                }),
2726            );
2727            block_server.handle_requests(stream).await.unwrap();
2728        })
2729        .detach();
2730
2731        let mut fut = pin!(async {
2732            let (session_proxy, server) = fidl::endpoints::create_proxy();
2733
2734            proxy.open_session(server).unwrap();
2735
2736            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2737            let vmo_id = session_proxy
2738                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2739                .await
2740                .unwrap()
2741                .unwrap();
2742
2743            let mut fifo =
2744                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2745            let (mut reader, mut writer) = fifo.async_io();
2746
2747            writer
2748                .write_entries(&BlockFifoRequest {
2749                    command: BlockFifoCommand {
2750                        opcode: BlockOpcode::Read.into_primitive(),
2751                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2752                        ..Default::default()
2753                    },
2754                    reqid: 1,
2755                    group: 1,
2756                    vmoid: vmo_id.id,
2757                    length: 1,
2758                    ..Default::default()
2759                })
2760                .await
2761                .unwrap();
2762
2763            let mut response = BlockFifoResponse::default();
2764            reader.read_entries(&mut response).await.unwrap();
2765            assert_eq!(response.status, zx::sys::ZX_OK);
2766        });
2767
2768        // The response won't come back until we send on `tx`.
2769        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2770
2771        let mut fut2 = pin!(proxy.get_volume_info());
2772
2773        // get_volume_info is set up to stall forever.
2774        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2775
2776        // If we now free up the first future, it should resolve; the stalled call to
2777        // get_volume_info should not block the fifo response.
2778        let _ = tx.send(());
2779
2780        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2781    }
2782
2783    #[fuchsia::test]
2784    async fn test_request_flow_control() {
2785        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2786
2787        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2788        // server will block until that happens.
2789        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2790        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2791        let event_clone = event.clone();
2792        futures::join!(
2793            async move {
2794                let block_server = BlockServer::new(
2795                    BLOCK_SIZE,
2796                    Arc::new(MockInterface {
2797                        read_hook: Some(Box::new(move |_, _, _, _| {
2798                            let event_clone = event_clone.clone();
2799                            Box::pin(async move {
2800                                if !event_clone.1.load(Ordering::SeqCst) {
2801                                    event_clone.0.listen().await;
2802                                }
2803                                Ok(())
2804                            })
2805                        })),
2806                        ..MockInterface::default()
2807                    }),
2808                );
2809                block_server.handle_requests(stream).await.unwrap();
2810            },
2811            async move {
2812                let (session_proxy, server) = fidl::endpoints::create_proxy();
2813
2814                proxy.open_session(server).unwrap();
2815
2816                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2817                let vmo_id = session_proxy
2818                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2819                    .await
2820                    .unwrap()
2821                    .unwrap();
2822
2823                let mut fifo =
2824                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2825                let (mut reader, mut writer) = fifo.async_io();
2826
2827                for i in 0..MAX_REQUESTS {
2828                    writer
2829                        .write_entries(&BlockFifoRequest {
2830                            command: BlockFifoCommand {
2831                                opcode: BlockOpcode::Read.into_primitive(),
2832                                ..Default::default()
2833                            },
2834                            reqid: (i + 1) as u32,
2835                            dev_offset: i,
2836                            vmoid: vmo_id.id,
2837                            length: 1,
2838                            ..Default::default()
2839                        })
2840                        .await
2841                        .unwrap();
2842                }
2843                assert!(
2844                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2845                        command: BlockFifoCommand {
2846                            opcode: BlockOpcode::Read.into_primitive(),
2847                            ..Default::default()
2848                        },
2849                        reqid: u32::MAX,
2850                        dev_offset: MAX_REQUESTS,
2851                        vmoid: vmo_id.id,
2852                        length: 1,
2853                        ..Default::default()
2854                    })))
2855                    .is_pending()
2856                );
2857                // OK, let the server start to process.
2858                event.1.store(true, Ordering::SeqCst);
2859                event.0.notify(usize::MAX);
2860                // For each entry we read, make sure we can write a new one in.
2861                let mut finished_reqids = vec![];
2862                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2863                    let mut response = BlockFifoResponse::default();
2864                    reader.read_entries(&mut response).await.unwrap();
2865                    assert_eq!(response.status, zx::sys::ZX_OK);
2866                    finished_reqids.push(response.reqid);
2867                    writer
2868                        .write_entries(&BlockFifoRequest {
2869                            command: BlockFifoCommand {
2870                                opcode: BlockOpcode::Read.into_primitive(),
2871                                ..Default::default()
2872                            },
2873                            reqid: (i + 1) as u32,
2874                            dev_offset: i,
2875                            vmoid: vmo_id.id,
2876                            length: 1,
2877                            ..Default::default()
2878                        })
2879                        .await
2880                        .unwrap();
2881                }
2882                let mut response = BlockFifoResponse::default();
2883                for _ in 0..MAX_REQUESTS {
2884                    reader.read_entries(&mut response).await.unwrap();
2885                    assert_eq!(response.status, zx::sys::ZX_OK);
2886                    finished_reqids.push(response.reqid);
2887                }
2888                // Verify that we got a response for each request.  Note that we can't assume FIFO
2889                // ordering.
2890                finished_reqids.sort();
2891                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2892                let mut i = 1;
2893                for reqid in finished_reqids {
2894                    assert_eq!(reqid, i);
2895                    i += 1;
2896                }
2897            }
2898        );
2899    }
2900
2901    #[fuchsia::test]
2902    async fn test_passthrough_io_with_fixed_map() {
2903        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2904
2905        let expected_op = Arc::new(Mutex::new(None));
2906        let expected_op_clone = expected_op.clone();
2907        futures::join!(
2908            async {
2909                let block_server = BlockServer::new(
2910                    BLOCK_SIZE,
2911                    Arc::new(IoMockInterface {
2912                        return_errors: false,
2913                        do_checks: true,
2914                        expected_op: expected_op_clone,
2915                    }),
2916                );
2917                block_server.handle_requests(stream).await.unwrap();
2918            },
2919            async move {
2920                let (session_proxy, server) = fidl::endpoints::create_proxy();
2921
2922                let mapping = fblock::BlockOffsetMapping {
2923                    source_block_offset: 0,
2924                    target_block_offset: 10,
2925                    length: 20,
2926                };
2927                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2928
2929                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2930                let vmo_id = session_proxy
2931                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2932                    .await
2933                    .unwrap()
2934                    .unwrap();
2935
2936                let mut fifo =
2937                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2938                let (mut reader, mut writer) = fifo.async_io();
2939
2940                // READ
2941                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2942                writer
2943                    .write_entries(&BlockFifoRequest {
2944                        command: BlockFifoCommand {
2945                            opcode: BlockOpcode::Read.into_primitive(),
2946                            ..Default::default()
2947                        },
2948                        vmoid: vmo_id.id,
2949                        dev_offset: 1,
2950                        length: 2,
2951                        vmo_offset: 3,
2952                        ..Default::default()
2953                    })
2954                    .await
2955                    .unwrap();
2956
2957                let mut response = BlockFifoResponse::default();
2958                reader.read_entries(&mut response).await.unwrap();
2959                assert_eq!(response.status, zx::sys::ZX_OK);
2960
2961                // WRITE
2962                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2963                writer
2964                    .write_entries(&BlockFifoRequest {
2965                        command: BlockFifoCommand {
2966                            opcode: BlockOpcode::Write.into_primitive(),
2967                            ..Default::default()
2968                        },
2969                        vmoid: vmo_id.id,
2970                        dev_offset: 4,
2971                        length: 5,
2972                        vmo_offset: 6,
2973                        ..Default::default()
2974                    })
2975                    .await
2976                    .unwrap();
2977
2978                reader.read_entries(&mut response).await.unwrap();
2979                assert_eq!(response.status, zx::sys::ZX_OK);
2980
2981                // FLUSH
2982                *expected_op.lock() = Some(ExpectedOp::Flush);
2983                writer
2984                    .write_entries(&BlockFifoRequest {
2985                        command: BlockFifoCommand {
2986                            opcode: BlockOpcode::Flush.into_primitive(),
2987                            ..Default::default()
2988                        },
2989                        ..Default::default()
2990                    })
2991                    .await
2992                    .unwrap();
2993
2994                reader.read_entries(&mut response).await.unwrap();
2995                assert_eq!(response.status, zx::sys::ZX_OK);
2996
2997                // TRIM
2998                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2999                writer
3000                    .write_entries(&BlockFifoRequest {
3001                        command: BlockFifoCommand {
3002                            opcode: BlockOpcode::Trim.into_primitive(),
3003                            ..Default::default()
3004                        },
3005                        dev_offset: 7,
3006                        length: 3,
3007                        ..Default::default()
3008                    })
3009                    .await
3010                    .unwrap();
3011
3012                reader.read_entries(&mut response).await.unwrap();
3013                assert_eq!(response.status, zx::sys::ZX_OK);
3014
3015                // READ past window
3016                *expected_op.lock() = None;
3017                writer
3018                    .write_entries(&BlockFifoRequest {
3019                        command: BlockFifoCommand {
3020                            opcode: BlockOpcode::Read.into_primitive(),
3021                            ..Default::default()
3022                        },
3023                        vmoid: vmo_id.id,
3024                        dev_offset: 19,
3025                        length: 2,
3026                        vmo_offset: 3,
3027                        ..Default::default()
3028                    })
3029                    .await
3030                    .unwrap();
3031
3032                reader.read_entries(&mut response).await.unwrap();
3033                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3034
3035                std::mem::drop(proxy);
3036            }
3037        );
3038    }
3039
3040    #[fuchsia::test]
3041    fn operation_map() {
3042        const BLOCK_SIZE: u32 = 512;
3043
3044        #[track_caller]
3045        fn expect_map_result(
3046            mut operation: Operation,
3047            mapping: Option<BlockOffsetMapping>,
3048            max_blocks: Option<NonZero<u32>>,
3049            expected_operations: Vec<Operation>,
3050        ) {
3051            let mut ops = vec![];
3052            while let Some(remainder) =
3053                operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
3054            {
3055                ops.push(operation);
3056                operation = remainder;
3057            }
3058            ops.push(operation);
3059            assert_eq!(ops, expected_operations);
3060        }
3061
3062        // No limits
3063        expect_map_result(
3064            Operation::Read {
3065                device_block_offset: 10,
3066                block_count: 200,
3067                _unused: 0,
3068                vmo_offset: 0,
3069                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3070            },
3071            None,
3072            None,
3073            vec![Operation::Read {
3074                device_block_offset: 10,
3075                block_count: 200,
3076                _unused: 0,
3077                vmo_offset: 0,
3078                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3079            }],
3080        );
3081
3082        // Max block count
3083        expect_map_result(
3084            Operation::Read {
3085                device_block_offset: 10,
3086                block_count: 200,
3087                _unused: 0,
3088                vmo_offset: 0,
3089                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3090            },
3091            None,
3092            NonZero::new(120),
3093            vec![
3094                Operation::Read {
3095                    device_block_offset: 10,
3096                    block_count: 120,
3097                    _unused: 0,
3098                    vmo_offset: 0,
3099                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3100                },
3101                Operation::Read {
3102                    device_block_offset: 130,
3103                    block_count: 80,
3104                    _unused: 0,
3105                    vmo_offset: 120 * BLOCK_SIZE as u64,
3106                    options: ReadOptions {
3107                        // The DUN should be offset by the number of blocks in the first request.
3108                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3109                    },
3110                },
3111            ],
3112        );
3113        expect_map_result(
3114            Operation::Trim { device_block_offset: 10, block_count: 200 },
3115            None,
3116            NonZero::new(120),
3117            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3118        );
3119
3120        // Remapping + Max block count
3121        expect_map_result(
3122            Operation::Read {
3123                device_block_offset: 10,
3124                block_count: 200,
3125                _unused: 0,
3126                vmo_offset: 0,
3127                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3128            },
3129            Some(BlockOffsetMapping {
3130                source_block_offset: 10,
3131                target_block_offset: 100,
3132                length: 200,
3133            }),
3134            NonZero::new(120),
3135            vec![
3136                Operation::Read {
3137                    device_block_offset: 100,
3138                    block_count: 120,
3139                    _unused: 0,
3140                    vmo_offset: 0,
3141                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3142                },
3143                Operation::Read {
3144                    device_block_offset: 220,
3145                    block_count: 80,
3146                    _unused: 0,
3147                    vmo_offset: 120 * BLOCK_SIZE as u64,
3148                    options: ReadOptions {
3149                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3150                    },
3151                },
3152            ],
3153        );
3154        expect_map_result(
3155            Operation::Trim { device_block_offset: 10, block_count: 200 },
3156            Some(BlockOffsetMapping {
3157                source_block_offset: 10,
3158                target_block_offset: 100,
3159                length: 200,
3160            }),
3161            NonZero::new(120),
3162            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3163        );
3164    }
3165
3166    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3167    #[fuchsia::test]
3168    async fn test_pre_barrier_flush_failure() {
3169        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3170
3171        struct NoBarrierInterface;
3172        impl super::async_interface::Interface for NoBarrierInterface {
3173            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3174                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3175                    device_flags: fblock::DeviceFlag::empty(), // No BARRIER_SUPPORT
3176                    max_transfer_blocks: NonZero::new(100),
3177                    block_range: Some(0..100),
3178                    type_guid: [0; 16],
3179                    instance_guid: [0; 16],
3180                    name: "test".to_string(),
3181                    flags: 0,
3182                }))
3183            }
3184            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3185                Ok(())
3186            }
3187            async fn read(
3188                &self,
3189                _: u64,
3190                _: u32,
3191                _: &Arc<zx::Vmo>,
3192                _: u64,
3193                _: ReadOptions,
3194                _: TraceFlowId,
3195            ) -> Result<(), zx::Status> {
3196                unreachable!()
3197            }
3198            async fn write(
3199                &self,
3200                _: u64,
3201                _: u32,
3202                _: &Arc<zx::Vmo>,
3203                _: u64,
3204                _: WriteOptions,
3205                _: TraceFlowId,
3206            ) -> Result<(), zx::Status> {
3207                panic!("Write should not be called");
3208            }
3209            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3210                Err(zx::Status::IO)
3211            }
3212            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3213                unreachable!()
3214            }
3215        }
3216
3217        futures::join!(
3218            async move {
3219                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3220                block_server.handle_requests(stream).await.unwrap();
3221            },
3222            async move {
3223                let (session_proxy, server) = fidl::endpoints::create_proxy();
3224                proxy.open_session(server).unwrap();
3225                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3226                let vmo_id = session_proxy
3227                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3228                    .await
3229                    .unwrap()
3230                    .unwrap();
3231
3232                let mut fifo =
3233                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3234                let (mut reader, mut writer) = fifo.async_io();
3235
3236                writer
3237                    .write_entries(&BlockFifoRequest {
3238                        command: BlockFifoCommand {
3239                            opcode: BlockOpcode::Write.into_primitive(),
3240                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3241                            ..Default::default()
3242                        },
3243                        vmoid: vmo_id.id,
3244                        length: 1,
3245                        ..Default::default()
3246                    })
3247                    .await
3248                    .unwrap();
3249
3250                let mut response = BlockFifoResponse::default();
3251                reader.read_entries(&mut response).await.unwrap();
3252                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3253            }
3254        );
3255    }
3256
3257    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3258    // post-flush is not executed.
3259    #[fuchsia::test]
3260    async fn test_post_barrier_write_failure() {
3261        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3262
3263        struct NoBarrierInterface;
3264        impl super::async_interface::Interface for NoBarrierInterface {
3265            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3266                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3267                    device_flags: fblock::DeviceFlag::empty(), // No FUA_SUPPORT
3268                    max_transfer_blocks: NonZero::new(100),
3269                    block_range: Some(0..100),
3270                    type_guid: [0; 16],
3271                    instance_guid: [0; 16],
3272                    name: "test".to_string(),
3273                    flags: 0,
3274                }))
3275            }
3276            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3277                Ok(())
3278            }
3279            async fn read(
3280                &self,
3281                _: u64,
3282                _: u32,
3283                _: &Arc<zx::Vmo>,
3284                _: u64,
3285                _: ReadOptions,
3286                _: TraceFlowId,
3287            ) -> Result<(), zx::Status> {
3288                unreachable!()
3289            }
3290            async fn write(
3291                &self,
3292                _: u64,
3293                _: u32,
3294                _: &Arc<zx::Vmo>,
3295                _: u64,
3296                _: WriteOptions,
3297                _: TraceFlowId,
3298            ) -> Result<(), zx::Status> {
3299                Err(zx::Status::IO)
3300            }
3301            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3302                panic!("Flush should not be called")
3303            }
3304            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3305                unreachable!()
3306            }
3307        }
3308
3309        futures::join!(
3310            async move {
3311                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3312                block_server.handle_requests(stream).await.unwrap();
3313            },
3314            async move {
3315                let (session_proxy, server) = fidl::endpoints::create_proxy();
3316                proxy.open_session(server).unwrap();
3317                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3318                let vmo_id = session_proxy
3319                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3320                    .await
3321                    .unwrap()
3322                    .unwrap();
3323
3324                let mut fifo =
3325                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3326                let (mut reader, mut writer) = fifo.async_io();
3327
3328                writer
3329                    .write_entries(&BlockFifoRequest {
3330                        command: BlockFifoCommand {
3331                            opcode: BlockOpcode::Write.into_primitive(),
3332                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3333                            ..Default::default()
3334                        },
3335                        vmoid: vmo_id.id,
3336                        length: 1,
3337                        ..Default::default()
3338                    })
3339                    .await
3340                    .unwrap();
3341
3342                let mut response = BlockFifoResponse::default();
3343                reader.read_entries(&mut response).await.unwrap();
3344                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3345            }
3346        );
3347    }
3348}