Skip to main content

block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::{Borrow, Cow};
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23pub mod callback_interface;
24
25#[cfg(test)]
26mod decompression_tests;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn label(&self) -> &str {
40        match self {
41            Self::Block(BlockInfo { .. }) => "",
42            Self::Partition(PartitionInfo { name, .. }) => name,
43        }
44    }
45    pub fn device_flags(&self) -> fblock::DeviceFlag {
46        match self {
47            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
48            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
49        }
50    }
51
52    pub fn block_count(&self) -> Option<u64> {
53        match self {
54            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
55            Self::Partition(PartitionInfo { block_range, .. }) => {
56                block_range.as_ref().map(|range| range.end - range.start)
57            }
58        }
59    }
60
61    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
62        match self {
63            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
64            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
65                max_transfer_blocks.clone()
66            }
67        }
68    }
69
70    fn max_transfer_size(&self, block_size: u32) -> u32 {
71        if let Some(max_blocks) = self.max_transfer_blocks() {
72            max_blocks.get() * block_size
73        } else {
74            MAX_TRANSFER_UNBOUNDED
75        }
76    }
77}
78
79/// Information associated with non-partition block devices.
80#[derive(Clone, Default)]
81pub struct BlockInfo {
82    pub device_flags: fblock::DeviceFlag,
83    pub block_count: u64,
84    pub max_transfer_blocks: Option<NonZero<u32>>,
85}
86
87/// Information associated with a block device that is also a partition.
88#[derive(Clone, Default)]
89pub struct PartitionInfo {
90    /// The device flags reported by the underlying device.
91    pub device_flags: fblock::DeviceFlag,
92    pub max_transfer_blocks: Option<NonZero<u32>>,
93    /// If `block_range` is None, the partition is a volume and may not be contiguous.
94    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
95    /// slices and use that (along with the slice and block sizes) to determine the block count.
96    pub block_range: Option<Range<u64>>,
97    pub type_guid: [u8; 16],
98    pub instance_guid: [u8; 16],
99    pub name: String,
100    pub flags: u64,
101}
102
103/// We internally keep track of active requests, so that when the server is torn down, we can
104/// deallocate all of the resources for pending requests.
105struct ActiveRequest<S> {
106    session: S,
107    group_or_request: GroupOrRequest,
108    trace_flow_id: TraceFlowId,
109    _epoch_guard: EpochGuard<'static>,
110    status: zx::Status,
111    count: u32,
112    req_id: Option<u32>,
113    decompression_info: Option<DecompressionInfo>,
114}
115
116struct DecompressionInfo {
117    // This is the range of compressed bytes in receiving buffer.
118    compressed_range: Range<usize>,
119
120    // This is the range in the target VMO where we will write uncompressed bytes.
121    uncompressed_range: Range<u64>,
122
123    bytes_so_far: u64,
124    mapping: Arc<VmoMapping>,
125    buffer: Option<Buffer<'static>>,
126}
127
128impl DecompressionInfo {
129    /// Returns the uncompressed slice.
130    fn uncompressed_slice(&self) -> *mut [u8] {
131        std::ptr::slice_from_raw_parts_mut(
132            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
133            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
134        )
135    }
136}
137
138pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
139
140impl<S> Default for ActiveRequests<S> {
141    fn default() -> Self {
142        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
143    }
144}
145
146impl<S> ActiveRequests<S> {
147    fn complete_and_take_response(
148        &self,
149        request_id: RequestId,
150        status: zx::Status,
151    ) -> Option<(S, BlockFifoResponse)> {
152        self.0.lock().complete_and_take_response(request_id, status)
153    }
154
155    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
156        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
157    }
158}
159
160struct ActiveRequestsInner<S> {
161    requests: Slab<ActiveRequest<S>>,
162}
163
164// Keeps track of all the requests that are currently being processed
165impl<S> ActiveRequestsInner<S> {
166    /// Completes a request.
167    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
168        let group = &mut self.requests[request_id.0];
169
170        group.count = group.count.checked_sub(1).unwrap();
171        if status != zx::Status::OK && group.status == zx::Status::OK {
172            group.status = status
173        }
174
175        fuchsia_trace::duration!(
176            "storage",
177            "block_server::finish_transaction",
178            "request_id" => request_id.0,
179            "group_completed" => group.count == 0,
180            "status" => status.into_raw());
181        if let Some(trace_flow_id) = group.trace_flow_id {
182            fuchsia_trace::flow_step!(
183                "storage",
184                "block_server::finish_request",
185                trace_flow_id.get().into()
186            );
187        }
188
189        if group.count == 0
190            && group.status == zx::Status::OK
191            && let Some(info) = &mut group.decompression_info
192        {
193            thread_local! {
194                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
195                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
196            }
197            DECOMPRESSOR.with(|decompressor| {
198                // SAFETY: We verified `uncompressed_range` fits within our mapping.
199                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
200                let mut decompressor = decompressor.borrow_mut();
201                if let Err(error) = decompressor.decompress_to_buffer(
202                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
203                    target_slice,
204                ) {
205                    log::warn!(error:?; "Decompression error");
206                    group.status = zx::Status::IO_DATA_INTEGRITY;
207                };
208            });
209        }
210    }
211
212    /// Takes the response if all requests are finished.
213    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
214        let group = &self.requests[request_id.0];
215        match group.req_id {
216            Some(reqid) if group.count == 0 => {
217                let group = self.requests.remove(request_id.0);
218                Some((
219                    group.session,
220                    BlockFifoResponse {
221                        status: group.status.into_raw(),
222                        reqid,
223                        group: group.group_or_request.group_id().unwrap_or(0),
224                        ..Default::default()
225                    },
226                ))
227            }
228            _ => None,
229        }
230    }
231
232    /// Competes the request and returns a response if the request group is finished.
233    fn complete_and_take_response(
234        &mut self,
235        request_id: RequestId,
236        status: zx::Status,
237    ) -> Option<(S, BlockFifoResponse)> {
238        self.complete(request_id, status);
239        self.take_response(request_id)
240    }
241}
242
243/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
244/// cbindgen:no-export
245pub struct BlockServer<SM: SessionManager> {
246    block_size: u32,
247    orchestrator: Arc<SM::Orchestrator>,
248}
249
250/// A single entry in `[OffsetMap]`.
251#[derive(Debug)]
252pub struct BlockOffsetMapping {
253    source_block_offset: u64,
254    target_block_offset: u64,
255    length: u64,
256}
257
258impl BlockOffsetMapping {
259    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
260        blocks.0 >= self.source_block_offset
261            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
262    }
263}
264
265impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
266    type Error = zx::Status;
267
268    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
269        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
270        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271        Ok(Self {
272            source_block_offset: wire.source_block_offset,
273            target_block_offset: wire.target_block_offset,
274            length: wire.length,
275        })
276    }
277}
278
279/// Remaps the offset of block requests based on an internal map, and truncates long requests.
280pub struct OffsetMap {
281    mapping: Option<BlockOffsetMapping>,
282    max_transfer_blocks: Option<NonZero<u32>>,
283}
284
285impl OffsetMap {
286    /// An OffsetMap that remaps requests.
287    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
288        Self { mapping: Some(mapping), max_transfer_blocks }
289    }
290
291    /// An OffsetMap that just enforces maximum request sizes.
292    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
293        Self { mapping: None, max_transfer_blocks }
294    }
295
296    pub fn is_empty(&self) -> bool {
297        self.mapping.is_none()
298    }
299
300    fn mapping(&self) -> Option<&BlockOffsetMapping> {
301        self.mapping.as_ref()
302    }
303
304    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
305        self.max_transfer_blocks
306    }
307}
308
309// Methods take Arc<Self> rather than &self because of
310// https://github.com/rust-lang/rust/issues/42940.
311pub trait SessionManager: 'static {
312    /// The Orchestrator is an object that holds the `SessionManager` and any other state that needs
313    /// to be shared between sessions.  It is responsible for keeping the `SessionManager` alive.
314    /// We use this type instead of directly holding an Arc<SessionManager> in BlockServer, to avoid
315    /// nested Arcs in concrete implementations which need to keep additional state.
316    type Orchestrator: Borrow<Self> + Send + Sync;
317
318    const SUPPORTS_DECOMPRESSION: bool;
319
320    type Session;
321
322    fn on_attach_vmo(
323        orchestrator: Arc<Self::Orchestrator>,
324        vmo: &Arc<zx::Vmo>,
325    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
326
327    /// Creates a new session to handle `stream`.
328    /// The returned future should run until the session completes, for example when the client end
329    /// closes.
330    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
331    fn open_session(
332        orchestrator: Arc<Self::Orchestrator>,
333        stream: fblock::SessionRequestStream,
334        offset_map: OffsetMap,
335        block_size: u32,
336    ) -> impl Future<Output = Result<(), Error>> + Send;
337
338    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
339    fn get_info(&self) -> Cow<'_, DeviceInfo>;
340
341    /// Called to handle the GetVolumeInfo FIDL call.
342    fn get_volume_info(
343        &self,
344    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
345    {
346        async { Err(zx::Status::NOT_SUPPORTED) }
347    }
348
349    /// Called to handle the QuerySlices FIDL call.
350    fn query_slices(
351        &self,
352        _start_slices: &[u64],
353    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
354        async { Err(zx::Status::NOT_SUPPORTED) }
355    }
356
357    /// Called to handle the Shrink FIDL call.
358    fn extend(
359        &self,
360        _start_slice: u64,
361        _slice_count: u64,
362    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363        async { Err(zx::Status::NOT_SUPPORTED) }
364    }
365
366    /// Called to handle the Shrink FIDL call.
367    fn shrink(
368        &self,
369        _start_slice: u64,
370        _slice_count: u64,
371    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
372        async { Err(zx::Status::NOT_SUPPORTED) }
373    }
374
375    /// Returns the active requests.
376    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
377}
378
379/// A helper trait for converting various types into an `Orchestrator`.
380///
381/// This exists to simplify [`BlockServer::new`].
382pub trait IntoOrchestrator {
383    type SM: SessionManager;
384
385    fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
386}
387
388impl<SM: SessionManager> BlockServer<SM> {
389    pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
390        Self { block_size, orchestrator: orchestrator.into_orchestrator() }
391    }
392
393    pub fn session_manager(&self) -> &SM {
394        self.orchestrator.as_ref().borrow()
395    }
396
397    /// Called to process requests for fuchsia.storage.block.Block.
398    pub async fn handle_requests(
399        &self,
400        mut requests: fblock::BlockRequestStream,
401    ) -> Result<(), Error> {
402        let scope = fasync::Scope::new();
403        loop {
404            match requests.try_next().await {
405                Ok(Some(request)) => {
406                    if let Some(session) = self.handle_request(request).await? {
407                        scope.spawn(session.map(|_| ()));
408                    }
409                }
410                Ok(None) => break,
411                Err(err) => log::warn!(err:?; "Invalid request"),
412            }
413        }
414        scope.await;
415        Ok(())
416    }
417
418    /// Processes a Block request.  If a new session task is created in response to the request,
419    /// it is returned.
420    async fn handle_request(
421        &self,
422        request: fblock::BlockRequest,
423    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
424        match request {
425            fblock::BlockRequest::GetInfo { responder } => {
426                let info = self.device_info();
427                let max_transfer_size = info.max_transfer_size(self.block_size);
428                let (block_count, mut flags) = match info.as_ref() {
429                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
430                        (*block_count, *device_flags)
431                    }
432                    DeviceInfo::Partition(partition_info) => {
433                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
434                            range.end - range.start
435                        } else {
436                            let volume_info = self.session_manager().get_volume_info().await?;
437                            volume_info.0.slice_size * volume_info.1.partition_slice_count
438                                / self.block_size as u64
439                        };
440                        (block_count, partition_info.device_flags)
441                    }
442                };
443                if SM::SUPPORTS_DECOMPRESSION {
444                    flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
445                }
446                responder.send(Ok(&fblock::BlockInfo {
447                    block_count,
448                    block_size: self.block_size,
449                    max_transfer_size,
450                    flags,
451                }))?;
452            }
453            fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
454                let info = self.device_info();
455                return Ok(Some(SM::open_session(
456                    self.orchestrator.clone(),
457                    session.into_stream(),
458                    OffsetMap::empty(info.max_transfer_blocks()),
459                    self.block_size,
460                )));
461            }
462            fblock::BlockRequest::OpenSessionWithOffsetMap {
463                session,
464                mapping,
465                control_handle: _,
466            } => {
467                let info = self.device_info();
468                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
469                    Ok(m) => m,
470                    Err(status) => {
471                        session.close_with_epitaph(status)?;
472                        return Ok(None);
473                    }
474                };
475                if let Some(max) = info.block_count() {
476                    if initial_mapping.target_block_offset + initial_mapping.length > max {
477                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
478                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
479                        return Ok(None);
480                    }
481                }
482                return Ok(Some(SM::open_session(
483                    self.orchestrator.clone(),
484                    session.into_stream(),
485                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
486                    self.block_size,
487                )));
488            }
489            fblock::BlockRequest::GetTypeGuid { responder } => {
490                let info = self.device_info();
491                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
492                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
493                    guid.value.copy_from_slice(&partition_info.type_guid);
494                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
495                } else {
496                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497                }
498            }
499            fblock::BlockRequest::GetInstanceGuid { responder } => {
500                let info = self.device_info();
501                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
502                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503                    guid.value.copy_from_slice(&partition_info.instance_guid);
504                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
505                } else {
506                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
507                }
508            }
509            fblock::BlockRequest::GetName { responder } => {
510                let info = self.device_info();
511                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
512                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
513                } else {
514                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
515                }
516            }
517            fblock::BlockRequest::GetMetadata { responder } => {
518                let info = self.device_info();
519                if let DeviceInfo::Partition(info) = info.as_ref() {
520                    let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
521                    type_guid.value.copy_from_slice(&info.type_guid);
522                    let mut instance_guid =
523                        fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
524                    instance_guid.value.copy_from_slice(&info.instance_guid);
525                    responder.send(Ok(&fblock::BlockGetMetadataResponse {
526                        name: Some(info.name.clone()),
527                        type_guid: Some(type_guid),
528                        instance_guid: Some(instance_guid),
529                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
530                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
531                        flags: Some(info.flags),
532                        ..Default::default()
533                    }))?;
534                } else {
535                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
536                }
537            }
538            fblock::BlockRequest::QuerySlices { responder, start_slices } => {
539                match self.session_manager().query_slices(&start_slices).await {
540                    Ok(mut results) => {
541                        let results_len = results.len();
542                        assert!(results_len <= 16);
543                        results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
544                        responder.send(
545                            zx::sys::ZX_OK,
546                            &results.try_into().unwrap(),
547                            results_len as u64,
548                        )?;
549                    }
550                    Err(s) => {
551                        responder.send(
552                            s.into_raw(),
553                            &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
554                            0,
555                        )?;
556                    }
557                }
558            }
559            fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
560                match self.session_manager().get_volume_info().await {
561                    Ok((manager_info, volume_info)) => {
562                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
563                    }
564                    Err(s) => responder.send(s.into_raw(), None, None)?,
565                }
566            }
567            fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
568                responder.send(
569                    zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
570                        .into_raw(),
571                )?;
572            }
573            fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
574                responder.send(
575                    zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
576                        .into_raw(),
577                )?;
578            }
579            fblock::BlockRequest::Destroy { responder, .. } => {
580                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
581            }
582        }
583        Ok(None)
584    }
585
586    fn device_info(&self) -> Cow<'_, DeviceInfo> {
587        self.session_manager().get_info()
588    }
589}
590
591struct SessionHelper<SM: SessionManager> {
592    orchestrator: Arc<SM::Orchestrator>,
593    offset_map: OffsetMap,
594    block_size: u32,
595    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
596    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
597}
598
599struct VmoMapping {
600    base: usize,
601    size: usize,
602}
603
604impl VmoMapping {
605    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
606        let size = vmo.get_size().unwrap() as usize;
607        Ok(Arc::new(Self {
608            base: fuchsia_runtime::vmar_root_self()
609                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
610                .inspect_err(|error| {
611                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
612                })?,
613            size,
614        }))
615    }
616}
617
618impl Drop for VmoMapping {
619    fn drop(&mut self) {
620        // SAFETY: We mapped this in `VmoMapping::new`.
621        unsafe {
622            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
623        }
624    }
625}
626
627impl<SM: SessionManager> SessionHelper<SM> {
628    fn new(
629        orchestrator: Arc<SM::Orchestrator>,
630        offset_map: OffsetMap,
631        block_size: u32,
632    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
633        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
634        Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
635    }
636
637    fn session_manager(&self) -> &SM {
638        self.orchestrator.as_ref().borrow()
639    }
640
641    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
642        match request {
643            fblock::SessionRequest::GetFifo { responder } => {
644                let rights = zx::Rights::TRANSFER
645                    | zx::Rights::READ
646                    | zx::Rights::WRITE
647                    | zx::Rights::SIGNAL
648                    | zx::Rights::WAIT;
649                match self.peer_fifo.duplicate_handle(rights) {
650                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
651                    Err(s) => responder.send(Err(s.into_raw()))?,
652                }
653                Ok(())
654            }
655            fblock::SessionRequest::AttachVmo { vmo, responder } => {
656                let vmo = Arc::new(vmo);
657                let vmo_id = {
658                    let mut vmos = self.vmos.lock();
659                    if vmos.len() == u16::MAX as usize {
660                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
661                        return Ok(());
662                    } else {
663                        let vmo_id = match vmos.last_entry() {
664                            None => 1,
665                            Some(o) => {
666                                o.key().checked_add(1).unwrap_or_else(|| {
667                                    let mut vmo_id = 1;
668                                    // Find the first gap...
669                                    for (&id, _) in &*vmos {
670                                        if id > vmo_id {
671                                            break;
672                                        }
673                                        vmo_id = id + 1;
674                                    }
675                                    vmo_id
676                                })
677                            }
678                        };
679                        vmos.insert(vmo_id, (vmo.clone(), None));
680                        vmo_id
681                    }
682                };
683                SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
684                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
685                Ok(())
686            }
687            fblock::SessionRequest::Close { responder } => {
688                responder.send(Ok(()))?;
689                Err(anyhow!("Closed"))
690            }
691        }
692    }
693
694    /// Decodes `request`.
695    fn decode_fifo_request(
696        &self,
697        session: SM::Session,
698        request: &BlockFifoRequest,
699    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
700        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
701
702        let request_bytes = request.length as u64 * self.block_size as u64;
703
704        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
705            .ok_or(zx::Status::INVALID_ARGS)
706            .and_then(|code| {
707                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
708                    if code != BlockOpcode::Read {
709                        return Err(zx::Status::INVALID_ARGS);
710                    }
711                    if !SM::SUPPORTS_DECOMPRESSION {
712                        return Err(zx::Status::NOT_SUPPORTED);
713                    }
714                }
715                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
716                    if request.length == 0 {
717                        return Err(zx::Status::INVALID_ARGS);
718                    }
719                    // Make sure the end offsets won't wrap.
720                    if request.dev_offset.checked_add(request.length as u64).is_none()
721                        || (code != BlockOpcode::Trim
722                            && request_bytes.checked_add(request.vmo_offset).is_none())
723                    {
724                        return Err(zx::Status::OUT_OF_RANGE);
725                    }
726                }
727                Ok(match code {
728                    BlockOpcode::Read => Operation::Read {
729                        device_block_offset: request.dev_offset,
730                        block_count: request.length,
731                        _unused: 0,
732                        vmo_offset: request
733                            .vmo_offset
734                            .checked_mul(self.block_size as u64)
735                            .ok_or(zx::Status::OUT_OF_RANGE)?,
736                        options: ReadOptions {
737                            inline_crypto: InlineCryptoOptions {
738                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
739                                dun: request.dun,
740                                slot: request.slot,
741                            },
742                        },
743                    },
744                    BlockOpcode::Write => {
745                        let mut options = WriteOptions {
746                            inline_crypto: InlineCryptoOptions {
747                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
748                                dun: request.dun,
749                                slot: request.slot,
750                            },
751                            ..WriteOptions::default()
752                        };
753                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
754                            options.flags |= WriteFlags::FORCE_ACCESS;
755                        }
756                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
757                            options.flags |= WriteFlags::PRE_BARRIER;
758                        }
759                        Operation::Write {
760                            device_block_offset: request.dev_offset,
761                            block_count: request.length,
762                            _unused: 0,
763                            options,
764                            vmo_offset: request
765                                .vmo_offset
766                                .checked_mul(self.block_size as u64)
767                                .ok_or(zx::Status::OUT_OF_RANGE)?,
768                        }
769                    }
770                    BlockOpcode::Flush => Operation::Flush,
771                    BlockOpcode::Trim => Operation::Trim {
772                        device_block_offset: request.dev_offset,
773                        block_count: request.length,
774                    },
775                    BlockOpcode::CloseVmo => Operation::CloseVmo,
776                })
777            });
778
779        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
780            GroupOrRequest::Group(request.group)
781        } else {
782            GroupOrRequest::Request(request.reqid)
783        };
784
785        let mut active_requests = self.session_manager().active_requests().0.lock();
786        let mut request_id = None;
787
788        // Multiple Block I/O request may be sent as a group.
789        // Notes:
790        // - the group is identified by the group id in the request
791        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
792        //   flag is set.
793        // - when processing a request of a group fails, subsequent requests of that
794        //   group will not be processed.
795        // - decompression is a special case, see block-fifo.h for semantics.
796        //
797        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
798        if group_or_request.is_group() {
799            // Search for an existing entry that matches this group.  NOTE: This is a potentially
800            // expensive way to find a group (it's iterating over all slots in the active-requests
801            // slab).  This can be optimised easily should we need to.
802            for (key, group) in &mut active_requests.requests {
803                if group.group_or_request == group_or_request {
804                    if group.req_id.is_some() {
805                        // We have already received a request tagged as last.
806                        if group.status == zx::Status::OK {
807                            group.status = zx::Status::INVALID_ARGS;
808                        }
809                        // Ignore this request.
810                        return Err(None);
811                    }
812                    // See if this is a continuation of a decompressed read.
813                    if group.status == zx::Status::OK
814                        && let Some(info) = &mut group.decompression_info
815                    {
816                        if let Ok(Operation::Read {
817                            device_block_offset,
818                            mut block_count,
819                            options,
820                            vmo_offset: 0,
821                            ..
822                        }) = operation
823                        {
824                            let remaining_bytes = info
825                                .compressed_range
826                                .end
827                                .next_multiple_of(self.block_size as usize)
828                                as u64
829                                - info.bytes_so_far;
830                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
831                                || request.total_compressed_bytes != 0
832                                || request.uncompressed_bytes != 0
833                                || request.compressed_prefix_bytes != 0
834                                || (flags.contains(BlockIoFlag::GROUP_LAST)
835                                    && info.bytes_so_far + request_bytes
836                                        < info.compressed_range.end as u64)
837                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
838                                    && request_bytes >= remaining_bytes)
839                            {
840                                group.status = zx::Status::INVALID_ARGS;
841                            } else {
842                                // We are tolerant of `block_count` being more than we actually
843                                // need.  This can happen if the client is working with a larger
844                                // block size than the device block size.  For example, if Blobfs
845                                // has a 8192 byte block size, but the device might has a 512 byte
846                                // block size, it can ask for a multiple of 16 blocks, when fewer
847                                // than that might actually be required to hold the compressed data.
848                                // It is easier for us to tolerate this here than to get Blobfs to
849                                // change to pass only the blocks that are required.
850                                if request_bytes > remaining_bytes {
851                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
852                                }
853
854                                operation = Ok(Operation::ContinueDecompressedRead {
855                                    offset: info.bytes_so_far,
856                                    device_block_offset,
857                                    block_count,
858                                    options,
859                                });
860
861                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
862                            }
863                        } else {
864                            group.status = zx::Status::INVALID_ARGS;
865                        }
866                    }
867                    if flags.contains(BlockIoFlag::GROUP_LAST) {
868                        group.req_id = Some(request.reqid);
869                        // If the group has had an error, there is no point trying to issue this
870                        // request.
871                        if group.status != zx::Status::OK {
872                            operation = Err(group.status);
873                        }
874                    } else if group.status != zx::Status::OK {
875                        // The group has already encountered an error, so there is no point trying
876                        // to issue this request.
877                        return Err(None);
878                    }
879                    request_id = Some(RequestId(key));
880                    group.count += 1;
881                    break;
882                }
883            }
884        }
885
886        let is_single_request =
887            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
888
889        let mut decompression_info = None;
890        let vmo = match operation {
891            Ok(Operation::Read {
892                device_block_offset,
893                mut block_count,
894                options,
895                vmo_offset,
896                ..
897            }) => match self.vmos.lock().get_mut(&request.vmoid) {
898                Some((vmo, mapping)) => {
899                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
900                        let compressed_range = request.compressed_prefix_bytes as usize
901                            ..request.compressed_prefix_bytes as usize
902                                + request.total_compressed_bytes as usize;
903                        let required_buffer_size =
904                            compressed_range.end.next_multiple_of(self.block_size as usize);
905
906                        // Validate the initial decompression request.
907                        if compressed_range.start >= compressed_range.end
908                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
909                            || (is_single_request && request_bytes < compressed_range.end as u64)
910                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
911                        {
912                            Err(zx::Status::INVALID_ARGS)
913                        } else {
914                            // We are tolerant of `block_count` being more than we actually need.
915                            // This can happen if the client is working in a larger block size than
916                            // the device block size.  For example, Blobfs has a 8192 byte block
917                            // size, but the device might have a 512 byte block size.  It is easier
918                            // for us to tolerate this here than to get Blobfs to change to pass
919                            // only the blocks that are required.
920                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
921                                block_count =
922                                    (required_buffer_size / self.block_size as usize) as u32;
923                                required_buffer_size as u64
924                            } else {
925                                request_bytes
926                            };
927
928                            // To decompress, we need to have the target VMO mapped (cached).
929                            match mapping {
930                                Some(mapping) => Ok(mapping.clone()),
931                                None => {
932                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
933                                }
934                            }
935                            .and_then(|mapping| {
936                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
937                                // range.
938                                if vmo_offset
939                                    .checked_add(request.uncompressed_bytes as u64)
940                                    .is_some_and(|end| end <= mapping.size as u64)
941                                {
942                                    Ok(mapping)
943                                } else {
944                                    Err(zx::Status::OUT_OF_RANGE)
945                                }
946                            })
947                            .map(|mapping| {
948                                // Convert the operation into a `StartDecompressedRead`
949                                // operation. For non-fragmented requests, this will be the only
950                                // operation, but if it's a fragmented read,
951                                // `ContinueDecompressedRead` operations will follow.
952                                operation = Ok(Operation::StartDecompressedRead {
953                                    required_buffer_size,
954                                    device_block_offset,
955                                    block_count,
956                                    options,
957                                });
958                                // Record sufficient information so that we can decompress when all
959                                // the requests complete.
960                                decompression_info = Some(DecompressionInfo {
961                                    compressed_range,
962                                    bytes_so_far,
963                                    mapping,
964                                    uncompressed_range: vmo_offset
965                                        ..vmo_offset + request.uncompressed_bytes as u64,
966                                    buffer: None,
967                                });
968                                None
969                            })
970                        }
971                    } else {
972                        Ok(Some(vmo.clone()))
973                    }
974                }
975                None => Err(zx::Status::IO),
976            },
977            Ok(Operation::Write { .. }) => self
978                .vmos
979                .lock()
980                .get(&request.vmoid)
981                .cloned()
982                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
983            Ok(Operation::CloseVmo) => {
984                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
985                    let vmo_clone = vmo.clone();
986                    // Make sure the VMO is dropped after all current Epoch guards have been
987                    // dropped.
988                    Epoch::global().defer(move || drop(vmo_clone));
989                    Ok(Some(vmo))
990                })
991            }
992            _ => Ok(None),
993        }
994        .unwrap_or_else(|e| {
995            operation = Err(e);
996            None
997        });
998
999        let trace_flow_id = NonZero::new(request.trace_flow_id);
1000        let request_id = request_id.unwrap_or_else(|| {
1001            RequestId(active_requests.requests.insert(ActiveRequest {
1002                session,
1003                group_or_request,
1004                trace_flow_id,
1005                _epoch_guard: Epoch::global().guard(),
1006                status: zx::Status::OK,
1007                count: 1,
1008                req_id: is_single_request.then_some(request.reqid),
1009                decompression_info,
1010            }))
1011        });
1012
1013        Ok(DecodedRequest {
1014            request_id,
1015            trace_flow_id,
1016            operation: operation.map_err(|status| {
1017                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1018            })?,
1019            vmo,
1020        })
1021    }
1022
1023    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1024        std::mem::take(&mut *self.vmos.lock())
1025    }
1026
1027    /// Maps the request and returns the mapped request with an optional remainder.
1028    fn map_request(
1029        &self,
1030        mut request: DecodedRequest,
1031        active_request: &mut ActiveRequest<SM::Session>,
1032    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1033        if active_request.status != zx::Status::OK {
1034            return Err(zx::Status::BAD_STATE);
1035        }
1036        let mapping = self.offset_map.mapping();
1037        match (mapping, request.operation.blocks()) {
1038            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1039                return Err(zx::Status::OUT_OF_RANGE);
1040            }
1041            _ => {}
1042        }
1043        let remainder = request.operation.map(
1044            self.offset_map.mapping(),
1045            self.offset_map.max_transfer_blocks(),
1046            self.block_size,
1047        );
1048        if remainder.is_some() {
1049            active_request.count += 1;
1050        }
1051        static CACHE: AtomicU64 = AtomicU64::new(0);
1052        if let Some(context) =
1053            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1054        {
1055            use fuchsia_trace::ArgValue;
1056            let trace_args = [
1057                ArgValue::of("request_id", request.request_id.0),
1058                ArgValue::of("opcode", request.operation.trace_label()),
1059            ];
1060            let _scope =
1061                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1062            if let Some(trace_flow_id) = active_request.trace_flow_id {
1063                fuchsia_trace::flow_step(
1064                    &context,
1065                    "block_server::start_transaction",
1066                    trace_flow_id.get().into(),
1067                    &[],
1068                );
1069            }
1070        }
1071        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1072        Ok((request, remainder))
1073    }
1074
1075    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1076        self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1077    }
1078}
1079
1080#[repr(transparent)]
1081#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1082pub struct RequestId(usize);
1083
1084#[derive(Clone, Debug)]
1085struct DecodedRequest {
1086    request_id: RequestId,
1087    trace_flow_id: TraceFlowId,
1088    operation: Operation,
1089    vmo: Option<Arc<zx::Vmo>>,
1090}
1091
1092/// cbindgen:no-export
1093pub type WriteFlags = block_protocol::WriteFlags;
1094pub type WriteOptions = block_protocol::WriteOptions;
1095pub type ReadOptions = block_protocol::ReadOptions;
1096pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1097
1098#[repr(C)]
1099#[derive(Clone, Debug, PartialEq, Eq)]
1100pub enum Operation {
1101    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1102    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1103    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1104    // write variant).
1105    Read {
1106        device_block_offset: u64,
1107        block_count: u32,
1108        _unused: u32,
1109        vmo_offset: u64,
1110        options: ReadOptions,
1111    },
1112    Write {
1113        device_block_offset: u64,
1114        block_count: u32,
1115        _unused: u32,
1116        vmo_offset: u64,
1117        options: WriteOptions,
1118    },
1119    Flush,
1120    Trim {
1121        device_block_offset: u64,
1122        block_count: u32,
1123    },
1124    /// This will never be seen by the C interface.
1125    CloseVmo,
1126    StartDecompressedRead {
1127        required_buffer_size: usize,
1128        device_block_offset: u64,
1129        block_count: u32,
1130        options: ReadOptions,
1131    },
1132    ContinueDecompressedRead {
1133        offset: u64,
1134        device_block_offset: u64,
1135        block_count: u32,
1136        options: ReadOptions,
1137    },
1138}
1139
1140impl Operation {
1141    fn trace_label(&self) -> &'static str {
1142        match self {
1143            Operation::Read { .. } => "read",
1144            Operation::Write { .. } => "write",
1145            Operation::Flush { .. } => "flush",
1146            Operation::Trim { .. } => "trim",
1147            Operation::CloseVmo { .. } => "close_vmo",
1148            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1149            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1150        }
1151    }
1152
1153    /// Returns (offset, length).
1154    fn blocks(&self) -> Option<(u64, u32)> {
1155        match self {
1156            Operation::Read { device_block_offset, block_count, .. }
1157            | Operation::Write { device_block_offset, block_count, .. }
1158            | Operation::Trim { device_block_offset, block_count, .. } => {
1159                Some((*device_block_offset, *block_count))
1160            }
1161            _ => None,
1162        }
1163    }
1164
1165    /// Returns mutable references to (offset, length).
1166    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1167        match self {
1168            Operation::Read { device_block_offset, block_count, .. }
1169            | Operation::Write { device_block_offset, block_count, .. }
1170            | Operation::Trim { device_block_offset, block_count, .. } => {
1171                Some((device_block_offset, block_count))
1172            }
1173            _ => None,
1174        }
1175    }
1176
1177    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1178    /// start of the operation.
1179    fn map(
1180        &mut self,
1181        mapping: Option<&BlockOffsetMapping>,
1182        max_blocks: Option<NonZero<u32>>,
1183        block_size: u32,
1184    ) -> Option<Self> {
1185        let mut max = match self {
1186            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1187            _ => None,
1188        };
1189        let (offset, length) = self.blocks_mut()?;
1190        let orig_offset = *offset;
1191        if let Some(mapping) = mapping {
1192            let delta = *offset - mapping.source_block_offset;
1193            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1194            *offset = mapping.target_block_offset + delta;
1195            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1196            max = match max {
1197                None => Some(mapping_max),
1198                Some(m) => Some(std::cmp::min(m, mapping_max)),
1199            };
1200        };
1201        if let Some(max) = max {
1202            if *length as u64 > max {
1203                let rem = (*length as u64 - max) as u32;
1204                *length = max as u32;
1205                return Some(match self {
1206                    Operation::Read {
1207                        device_block_offset: _,
1208                        block_count: _,
1209                        vmo_offset,
1210                        _unused,
1211                        options,
1212                    } => {
1213                        let mut options = *options;
1214                        options.inline_crypto.dun += max as u32;
1215                        Operation::Read {
1216                            device_block_offset: orig_offset + max,
1217                            block_count: rem,
1218                            vmo_offset: *vmo_offset + max * block_size as u64,
1219                            _unused: *_unused,
1220                            options: options,
1221                        }
1222                    }
1223                    Operation::Write {
1224                        device_block_offset: _,
1225                        block_count: _,
1226                        _unused,
1227                        vmo_offset,
1228                        options,
1229                    } => {
1230                        let mut options = *options;
1231                        options.inline_crypto.dun += max as u32;
1232                        Operation::Write {
1233                            device_block_offset: orig_offset + max,
1234                            block_count: rem,
1235                            _unused: *_unused,
1236                            vmo_offset: *vmo_offset + max * block_size as u64,
1237                            options: options,
1238                        }
1239                    }
1240                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1241                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1242                    }
1243                    _ => unreachable!(),
1244                });
1245            }
1246        }
1247        None
1248    }
1249
1250    /// Returns true if the specified write flags are set.
1251    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1252        if let Operation::Write { options, .. } = self {
1253            options.flags.contains(value)
1254        } else {
1255            false
1256        }
1257    }
1258
1259    /// Removes `value` from the request's write flags and returns true if the flag was set.
1260    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1261        if let Operation::Write { options, .. } = self {
1262            let result = options.flags.contains(value);
1263            options.flags.remove(value);
1264            result
1265        } else {
1266            false
1267        }
1268    }
1269}
1270
1271#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1272pub enum GroupOrRequest {
1273    Group(u16),
1274    Request(u32),
1275}
1276
1277impl GroupOrRequest {
1278    fn is_group(&self) -> bool {
1279        matches!(self, Self::Group(_))
1280    }
1281
1282    fn group_id(&self) -> Option<u16> {
1283        match self {
1284            Self::Group(id) => Some(*id),
1285            Self::Request(_) => None,
1286        }
1287    }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292    use super::{
1293        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1294        TraceFlowId,
1295    };
1296    use assert_matches::assert_matches;
1297    use block_protocol::{
1298        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1299        WriteFlags, WriteOptions,
1300    };
1301    use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1302    use fuchsia_sync::Mutex;
1303    use futures::FutureExt as _;
1304    use futures::channel::oneshot;
1305    use futures::future::BoxFuture;
1306    use std::borrow::Cow;
1307    use std::future::poll_fn;
1308    use std::num::NonZero;
1309    use std::pin::pin;
1310    use std::sync::Arc;
1311    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1312    use std::task::{Context, Poll};
1313    use zx::HandleBased as _;
1314    use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1315
1316    #[derive(Default)]
1317    struct MockInterface {
1318        read_hook: Option<
1319            Box<
1320                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1321                    + Send
1322                    + Sync,
1323            >,
1324        >,
1325        write_hook:
1326            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1327        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1328    }
1329
1330    impl super::async_interface::Interface for MockInterface {
1331        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1332            Ok(())
1333        }
1334
1335        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1336            Cow::Owned(test_device_info())
1337        }
1338
1339        async fn read(
1340            &self,
1341            device_block_offset: u64,
1342            block_count: u32,
1343            vmo: &Arc<zx::Vmo>,
1344            vmo_offset: u64,
1345            _opts: ReadOptions,
1346            _trace_flow_id: TraceFlowId,
1347        ) -> Result<(), zx::Status> {
1348            if let Some(read_hook) = &self.read_hook {
1349                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1350            } else {
1351                unimplemented!();
1352            }
1353        }
1354
1355        async fn write(
1356            &self,
1357            device_block_offset: u64,
1358            _block_count: u32,
1359            _vmo: &Arc<zx::Vmo>,
1360            _vmo_offset: u64,
1361            opts: WriteOptions,
1362            _trace_flow_id: TraceFlowId,
1363        ) -> Result<(), zx::Status> {
1364            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1365                && let Some(barrier_hook) = &self.barrier_hook
1366            {
1367                barrier_hook()?;
1368            }
1369            if let Some(write_hook) = &self.write_hook {
1370                write_hook(device_block_offset).await
1371            } else {
1372                unimplemented!();
1373            }
1374        }
1375
1376        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1377            Ok(())
1378        }
1379
1380        async fn trim(
1381            &self,
1382            _device_block_offset: u64,
1383            _block_count: u32,
1384            _trace_flow_id: TraceFlowId,
1385        ) -> Result<(), zx::Status> {
1386            unreachable!();
1387        }
1388
1389        async fn get_volume_info(
1390            &self,
1391        ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1392            // Hang forever for the test_requests_dont_block_sessions test.
1393            let () = std::future::pending().await;
1394            unreachable!();
1395        }
1396    }
1397
1398    const BLOCK_SIZE: u32 = 512;
1399    const MAX_TRANSFER_BLOCKS: u32 = 10;
1400
1401    fn test_device_info() -> DeviceInfo {
1402        DeviceInfo::Partition(PartitionInfo {
1403            device_flags: fblock::DeviceFlag::READONLY
1404                | fblock::DeviceFlag::BARRIER_SUPPORT
1405                | fblock::DeviceFlag::FUA_SUPPORT,
1406            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1407            block_range: Some(0..100),
1408            type_guid: [1; 16],
1409            instance_guid: [2; 16],
1410            name: "foo".to_string(),
1411            flags: 0xabcd,
1412        })
1413    }
1414
1415    #[fuchsia::test]
1416    async fn test_barriers_ordering() {
1417        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1418        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1419        let barrier_called = Arc::new(AtomicBool::new(false));
1420
1421        futures::join!(
1422            async move {
1423                let barrier_called_clone = barrier_called.clone();
1424                let block_server = BlockServer::new(
1425                    BLOCK_SIZE,
1426                    Arc::new(MockInterface {
1427                        barrier_hook: Some(Box::new(move || {
1428                            barrier_called.store(true, Ordering::Relaxed);
1429                            Ok(())
1430                        })),
1431                        write_hook: Some(Box::new(move |device_block_offset| {
1432                            let barrier_called = barrier_called_clone.clone();
1433                            Box::pin(async move {
1434                                // The sleep allows the server to reorder the fifo requests.
1435                                if device_block_offset % 2 == 0 {
1436                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1437                                        zx::MonotonicDuration::from_millis(200),
1438                                    ))
1439                                    .await;
1440                                }
1441                                assert!(barrier_called.load(Ordering::Relaxed));
1442                                Ok(())
1443                            })
1444                        })),
1445                        ..MockInterface::default()
1446                    }),
1447                );
1448                block_server.handle_requests(stream).await.unwrap();
1449            },
1450            async move {
1451                let (session_proxy, server) = fidl::endpoints::create_proxy();
1452
1453                proxy.open_session(server).unwrap();
1454
1455                let vmo_id = session_proxy
1456                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1457                    .await
1458                    .unwrap()
1459                    .unwrap();
1460                assert_ne!(vmo_id.id, 0);
1461
1462                let mut fifo =
1463                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1464                let (mut reader, mut writer) = fifo.async_io();
1465
1466                writer
1467                    .write_entries(&BlockFifoRequest {
1468                        command: BlockFifoCommand {
1469                            opcode: BlockOpcode::Write.into_primitive(),
1470                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1471                            ..Default::default()
1472                        },
1473                        vmoid: vmo_id.id,
1474                        dev_offset: 0,
1475                        length: 5,
1476                        vmo_offset: 6,
1477                        ..Default::default()
1478                    })
1479                    .await
1480                    .unwrap();
1481
1482                for i in 0..10 {
1483                    writer
1484                        .write_entries(&BlockFifoRequest {
1485                            command: BlockFifoCommand {
1486                                opcode: BlockOpcode::Write.into_primitive(),
1487                                ..Default::default()
1488                            },
1489                            vmoid: vmo_id.id,
1490                            dev_offset: i + 1,
1491                            length: 5,
1492                            vmo_offset: 6,
1493                            ..Default::default()
1494                        })
1495                        .await
1496                        .unwrap();
1497                }
1498                for _ in 0..11 {
1499                    let mut response = BlockFifoResponse::default();
1500                    reader.read_entries(&mut response).await.unwrap();
1501                    assert_eq!(response.status, zx::sys::ZX_OK);
1502                }
1503
1504                std::mem::drop(proxy);
1505            }
1506        );
1507    }
1508
1509    #[fuchsia::test]
1510    async fn test_info() {
1511        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1512
1513        futures::join!(
1514            async {
1515                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1516                block_server.handle_requests(stream).await.unwrap();
1517            },
1518            async {
1519                let expected_info = test_device_info();
1520                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1521                    info
1522                } else {
1523                    unreachable!()
1524                };
1525
1526                let block_info = proxy.get_info().await.unwrap().unwrap();
1527                assert_eq!(
1528                    block_info.block_count,
1529                    partition_info.block_range.as_ref().unwrap().end
1530                        - partition_info.block_range.as_ref().unwrap().start
1531                );
1532                assert_eq!(
1533                    block_info.flags,
1534                    fblock::DeviceFlag::READONLY
1535                        | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1536                        | fblock::DeviceFlag::BARRIER_SUPPORT
1537                        | fblock::DeviceFlag::FUA_SUPPORT
1538                );
1539
1540                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1541
1542                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1543                assert_eq!(status, zx::sys::ZX_OK);
1544                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1545
1546                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1547                assert_eq!(status, zx::sys::ZX_OK);
1548                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1549
1550                let (status, name) = proxy.get_name().await.unwrap();
1551                assert_eq!(status, zx::sys::ZX_OK);
1552                assert_eq!(name.as_ref(), Some(&partition_info.name));
1553
1554                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1555                assert_eq!(metadata.name, name);
1556                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1557                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1558                assert_eq!(
1559                    metadata.start_block_offset,
1560                    Some(partition_info.block_range.as_ref().unwrap().start)
1561                );
1562                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1563                assert_eq!(metadata.flags, Some(partition_info.flags));
1564
1565                std::mem::drop(proxy);
1566            }
1567        );
1568    }
1569
1570    #[fuchsia::test]
1571    async fn test_attach_vmo() {
1572        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1573
1574        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1575        let koid = vmo.koid().unwrap();
1576
1577        futures::join!(
1578            async {
1579                let block_server = BlockServer::new(
1580                    BLOCK_SIZE,
1581                    Arc::new(MockInterface {
1582                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1583                            assert_eq!(vmo.koid().unwrap(), koid);
1584                            Box::pin(async { Ok(()) })
1585                        })),
1586                        ..MockInterface::default()
1587                    }),
1588                );
1589                block_server.handle_requests(stream).await.unwrap();
1590            },
1591            async move {
1592                let (session_proxy, server) = fidl::endpoints::create_proxy();
1593
1594                proxy.open_session(server).unwrap();
1595
1596                let vmo_id = session_proxy
1597                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1598                    .await
1599                    .unwrap()
1600                    .unwrap();
1601                assert_ne!(vmo_id.id, 0);
1602
1603                let mut fifo =
1604                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1605                let (mut reader, mut writer) = fifo.async_io();
1606
1607                // Keep attaching VMOs until we eventually hit the maximum.
1608                let mut count = 1;
1609                loop {
1610                    match session_proxy
1611                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1612                        .await
1613                        .unwrap()
1614                    {
1615                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1616                        Err(e) => {
1617                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1618                            break;
1619                        }
1620                    }
1621
1622                    // Only test every 10 to keep test time down.
1623                    if count % 10 == 0 {
1624                        writer
1625                            .write_entries(&BlockFifoRequest {
1626                                command: BlockFifoCommand {
1627                                    opcode: BlockOpcode::Read.into_primitive(),
1628                                    ..Default::default()
1629                                },
1630                                vmoid: vmo_id.id,
1631                                length: 1,
1632                                ..Default::default()
1633                            })
1634                            .await
1635                            .unwrap();
1636
1637                        let mut response = BlockFifoResponse::default();
1638                        reader.read_entries(&mut response).await.unwrap();
1639                        assert_eq!(response.status, zx::sys::ZX_OK);
1640                    }
1641
1642                    count += 1;
1643                }
1644
1645                assert_eq!(count, u16::MAX as u64);
1646
1647                // Detach the original VMO, and make sure we can then attach another one.
1648                writer
1649                    .write_entries(&BlockFifoRequest {
1650                        command: BlockFifoCommand {
1651                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1652                            ..Default::default()
1653                        },
1654                        vmoid: vmo_id.id,
1655                        ..Default::default()
1656                    })
1657                    .await
1658                    .unwrap();
1659
1660                let mut response = BlockFifoResponse::default();
1661                reader.read_entries(&mut response).await.unwrap();
1662                assert_eq!(response.status, zx::sys::ZX_OK);
1663
1664                let new_vmo_id = session_proxy
1665                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1666                    .await
1667                    .unwrap()
1668                    .unwrap();
1669                // It should reuse the same ID.
1670                assert_eq!(new_vmo_id.id, vmo_id.id);
1671
1672                std::mem::drop(proxy);
1673            }
1674        );
1675    }
1676
1677    #[fuchsia::test]
1678    async fn test_close() {
1679        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1680
1681        let mut server = std::pin::pin!(
1682            async {
1683                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1684                block_server.handle_requests(stream).await.unwrap();
1685            }
1686            .fuse()
1687        );
1688
1689        let mut client = std::pin::pin!(
1690            async {
1691                let (session_proxy, server) = fidl::endpoints::create_proxy();
1692
1693                proxy.open_session(server).unwrap();
1694
1695                // Dropping the proxy should not cause the session to terminate because the session is
1696                // still live.
1697                std::mem::drop(proxy);
1698
1699                session_proxy.close().await.unwrap().unwrap();
1700
1701                // Keep the session alive.  Calling `close` should cause the server to terminate.
1702                let _: () = std::future::pending().await;
1703            }
1704            .fuse()
1705        );
1706
1707        futures::select!(
1708            _ = server => {}
1709            _ = client => unreachable!(),
1710        );
1711    }
1712
1713    #[derive(Default)]
1714    struct IoMockInterface {
1715        do_checks: bool,
1716        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1717        return_errors: bool,
1718    }
1719
1720    #[derive(Debug)]
1721    enum ExpectedOp {
1722        Read(u64, u32, u64),
1723        Write(u64, u32, u64),
1724        Trim(u64, u32),
1725        Flush,
1726    }
1727
1728    impl super::async_interface::Interface for IoMockInterface {
1729        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1730            Ok(())
1731        }
1732
1733        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1734            Cow::Owned(test_device_info())
1735        }
1736
1737        async fn read(
1738            &self,
1739            device_block_offset: u64,
1740            block_count: u32,
1741            _vmo: &Arc<zx::Vmo>,
1742            vmo_offset: u64,
1743            _opts: ReadOptions,
1744            _trace_flow_id: TraceFlowId,
1745        ) -> Result<(), zx::Status> {
1746            if self.return_errors {
1747                Err(zx::Status::INTERNAL)
1748            } else {
1749                if self.do_checks {
1750                    assert_matches!(
1751                        self.expected_op.lock().take(),
1752                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1753                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1754                        "Read {device_block_offset} {block_count} {vmo_offset}"
1755                    );
1756                }
1757                Ok(())
1758            }
1759        }
1760
1761        async fn write(
1762            &self,
1763            device_block_offset: u64,
1764            block_count: u32,
1765            _vmo: &Arc<zx::Vmo>,
1766            vmo_offset: u64,
1767            _write_opts: WriteOptions,
1768            _trace_flow_id: TraceFlowId,
1769        ) -> Result<(), zx::Status> {
1770            if self.return_errors {
1771                Err(zx::Status::NOT_SUPPORTED)
1772            } else {
1773                if self.do_checks {
1774                    assert_matches!(
1775                        self.expected_op.lock().take(),
1776                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1777                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1778                        "Write {device_block_offset} {block_count} {vmo_offset}"
1779                    );
1780                }
1781                Ok(())
1782            }
1783        }
1784
1785        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1786            if self.return_errors {
1787                Err(zx::Status::NO_RESOURCES)
1788            } else {
1789                if self.do_checks {
1790                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1791                }
1792                Ok(())
1793            }
1794        }
1795
1796        async fn trim(
1797            &self,
1798            device_block_offset: u64,
1799            block_count: u32,
1800            _trace_flow_id: TraceFlowId,
1801        ) -> Result<(), zx::Status> {
1802            if self.return_errors {
1803                Err(zx::Status::NO_MEMORY)
1804            } else {
1805                if self.do_checks {
1806                    assert_matches!(
1807                        self.expected_op.lock().take(),
1808                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1809                            block_count == b,
1810                        "Trim {device_block_offset} {block_count}"
1811                    );
1812                }
1813                Ok(())
1814            }
1815        }
1816    }
1817
1818    #[fuchsia::test]
1819    async fn test_io() {
1820        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1821
1822        let expected_op = Arc::new(Mutex::new(None));
1823        let expected_op_clone = expected_op.clone();
1824
1825        let server = async {
1826            let block_server = BlockServer::new(
1827                BLOCK_SIZE,
1828                Arc::new(IoMockInterface {
1829                    return_errors: false,
1830                    do_checks: true,
1831                    expected_op: expected_op_clone,
1832                }),
1833            );
1834            block_server.handle_requests(stream).await.unwrap();
1835        };
1836
1837        let client = async move {
1838            let (session_proxy, server) = fidl::endpoints::create_proxy();
1839
1840            proxy.open_session(server).unwrap();
1841
1842            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1843            let vmo_id = session_proxy
1844                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1845                .await
1846                .unwrap()
1847                .unwrap();
1848
1849            let mut fifo =
1850                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1851            let (mut reader, mut writer) = fifo.async_io();
1852
1853            // READ
1854            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1855            writer
1856                .write_entries(&BlockFifoRequest {
1857                    command: BlockFifoCommand {
1858                        opcode: BlockOpcode::Read.into_primitive(),
1859                        ..Default::default()
1860                    },
1861                    vmoid: vmo_id.id,
1862                    dev_offset: 1,
1863                    length: 2,
1864                    vmo_offset: 3,
1865                    ..Default::default()
1866                })
1867                .await
1868                .unwrap();
1869
1870            let mut response = BlockFifoResponse::default();
1871            reader.read_entries(&mut response).await.unwrap();
1872            assert_eq!(response.status, zx::sys::ZX_OK);
1873
1874            // WRITE
1875            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1876            writer
1877                .write_entries(&BlockFifoRequest {
1878                    command: BlockFifoCommand {
1879                        opcode: BlockOpcode::Write.into_primitive(),
1880                        ..Default::default()
1881                    },
1882                    vmoid: vmo_id.id,
1883                    dev_offset: 4,
1884                    length: 5,
1885                    vmo_offset: 6,
1886                    ..Default::default()
1887                })
1888                .await
1889                .unwrap();
1890
1891            let mut response = BlockFifoResponse::default();
1892            reader.read_entries(&mut response).await.unwrap();
1893            assert_eq!(response.status, zx::sys::ZX_OK);
1894
1895            // FLUSH
1896            *expected_op.lock() = Some(ExpectedOp::Flush);
1897            writer
1898                .write_entries(&BlockFifoRequest {
1899                    command: BlockFifoCommand {
1900                        opcode: BlockOpcode::Flush.into_primitive(),
1901                        ..Default::default()
1902                    },
1903                    ..Default::default()
1904                })
1905                .await
1906                .unwrap();
1907
1908            reader.read_entries(&mut response).await.unwrap();
1909            assert_eq!(response.status, zx::sys::ZX_OK);
1910
1911            // TRIM
1912            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1913            writer
1914                .write_entries(&BlockFifoRequest {
1915                    command: BlockFifoCommand {
1916                        opcode: BlockOpcode::Trim.into_primitive(),
1917                        ..Default::default()
1918                    },
1919                    dev_offset: 7,
1920                    length: 8,
1921                    ..Default::default()
1922                })
1923                .await
1924                .unwrap();
1925
1926            reader.read_entries(&mut response).await.unwrap();
1927            assert_eq!(response.status, zx::sys::ZX_OK);
1928
1929            std::mem::drop(proxy);
1930        };
1931
1932        futures::join!(server, client);
1933    }
1934
1935    #[fuchsia::test]
1936    async fn test_io_errors() {
1937        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1938
1939        futures::join!(
1940            async {
1941                let block_server = BlockServer::new(
1942                    BLOCK_SIZE,
1943                    Arc::new(IoMockInterface {
1944                        return_errors: true,
1945                        do_checks: false,
1946                        expected_op: Arc::new(Mutex::new(None)),
1947                    }),
1948                );
1949                block_server.handle_requests(stream).await.unwrap();
1950            },
1951            async move {
1952                let (session_proxy, server) = fidl::endpoints::create_proxy();
1953
1954                proxy.open_session(server).unwrap();
1955
1956                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1957                let vmo_id = session_proxy
1958                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1959                    .await
1960                    .unwrap()
1961                    .unwrap();
1962
1963                let mut fifo =
1964                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1965                let (mut reader, mut writer) = fifo.async_io();
1966
1967                // READ
1968                writer
1969                    .write_entries(&BlockFifoRequest {
1970                        command: BlockFifoCommand {
1971                            opcode: BlockOpcode::Read.into_primitive(),
1972                            ..Default::default()
1973                        },
1974                        vmoid: vmo_id.id,
1975                        length: 1,
1976                        reqid: 1,
1977                        ..Default::default()
1978                    })
1979                    .await
1980                    .unwrap();
1981
1982                let mut response = BlockFifoResponse::default();
1983                reader.read_entries(&mut response).await.unwrap();
1984                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1985
1986                // WRITE
1987                writer
1988                    .write_entries(&BlockFifoRequest {
1989                        command: BlockFifoCommand {
1990                            opcode: BlockOpcode::Write.into_primitive(),
1991                            ..Default::default()
1992                        },
1993                        vmoid: vmo_id.id,
1994                        length: 1,
1995                        reqid: 2,
1996                        ..Default::default()
1997                    })
1998                    .await
1999                    .unwrap();
2000
2001                reader.read_entries(&mut response).await.unwrap();
2002                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2003
2004                // FLUSH
2005                writer
2006                    .write_entries(&BlockFifoRequest {
2007                        command: BlockFifoCommand {
2008                            opcode: BlockOpcode::Flush.into_primitive(),
2009                            ..Default::default()
2010                        },
2011                        reqid: 3,
2012                        ..Default::default()
2013                    })
2014                    .await
2015                    .unwrap();
2016
2017                reader.read_entries(&mut response).await.unwrap();
2018                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2019
2020                // TRIM
2021                writer
2022                    .write_entries(&BlockFifoRequest {
2023                        command: BlockFifoCommand {
2024                            opcode: BlockOpcode::Trim.into_primitive(),
2025                            ..Default::default()
2026                        },
2027                        reqid: 4,
2028                        length: 1,
2029                        ..Default::default()
2030                    })
2031                    .await
2032                    .unwrap();
2033
2034                reader.read_entries(&mut response).await.unwrap();
2035                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2036
2037                std::mem::drop(proxy);
2038            }
2039        );
2040    }
2041
2042    #[fuchsia::test]
2043    async fn test_invalid_args() {
2044        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2045
2046        futures::join!(
2047            async {
2048                let block_server = BlockServer::new(
2049                    BLOCK_SIZE,
2050                    Arc::new(IoMockInterface {
2051                        return_errors: false,
2052                        do_checks: false,
2053                        expected_op: Arc::new(Mutex::new(None)),
2054                    }),
2055                );
2056                block_server.handle_requests(stream).await.unwrap();
2057            },
2058            async move {
2059                let (session_proxy, server) = fidl::endpoints::create_proxy();
2060
2061                proxy.open_session(server).unwrap();
2062
2063                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2064                let vmo_id = session_proxy
2065                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2066                    .await
2067                    .unwrap()
2068                    .unwrap();
2069
2070                let mut fifo =
2071                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2072
2073                async fn test(
2074                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2075                    request: BlockFifoRequest,
2076                ) -> Result<(), zx::Status> {
2077                    let (mut reader, mut writer) = fifo.async_io();
2078                    writer.write_entries(&request).await.unwrap();
2079                    let mut response = BlockFifoResponse::default();
2080                    reader.read_entries(&mut response).await.unwrap();
2081                    zx::Status::ok(response.status)
2082                }
2083
2084                // READ
2085
2086                let good_read_request = || BlockFifoRequest {
2087                    command: BlockFifoCommand {
2088                        opcode: BlockOpcode::Read.into_primitive(),
2089                        ..Default::default()
2090                    },
2091                    length: 1,
2092                    vmoid: vmo_id.id,
2093                    ..Default::default()
2094                };
2095
2096                assert_eq!(
2097                    test(
2098                        &mut fifo,
2099                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2100                    )
2101                    .await,
2102                    Err(zx::Status::IO)
2103                );
2104
2105                assert_eq!(
2106                    test(
2107                        &mut fifo,
2108                        BlockFifoRequest {
2109                            vmo_offset: 0xffff_ffff_ffff_ffff,
2110                            ..good_read_request()
2111                        }
2112                    )
2113                    .await,
2114                    Err(zx::Status::OUT_OF_RANGE)
2115                );
2116
2117                assert_eq!(
2118                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2119                    Err(zx::Status::INVALID_ARGS)
2120                );
2121
2122                // WRITE
2123
2124                let good_write_request = || BlockFifoRequest {
2125                    command: BlockFifoCommand {
2126                        opcode: BlockOpcode::Write.into_primitive(),
2127                        ..Default::default()
2128                    },
2129                    length: 1,
2130                    vmoid: vmo_id.id,
2131                    ..Default::default()
2132                };
2133
2134                assert_eq!(
2135                    test(
2136                        &mut fifo,
2137                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2138                    )
2139                    .await,
2140                    Err(zx::Status::IO)
2141                );
2142
2143                assert_eq!(
2144                    test(
2145                        &mut fifo,
2146                        BlockFifoRequest {
2147                            vmo_offset: 0xffff_ffff_ffff_ffff,
2148                            ..good_write_request()
2149                        }
2150                    )
2151                    .await,
2152                    Err(zx::Status::OUT_OF_RANGE)
2153                );
2154
2155                assert_eq!(
2156                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2157                    Err(zx::Status::INVALID_ARGS)
2158                );
2159
2160                // CLOSE VMO
2161
2162                assert_eq!(
2163                    test(
2164                        &mut fifo,
2165                        BlockFifoRequest {
2166                            command: BlockFifoCommand {
2167                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2168                                ..Default::default()
2169                            },
2170                            vmoid: vmo_id.id + 1,
2171                            ..Default::default()
2172                        }
2173                    )
2174                    .await,
2175                    Err(zx::Status::IO)
2176                );
2177
2178                std::mem::drop(proxy);
2179            }
2180        );
2181    }
2182
2183    #[fuchsia::test]
2184    async fn test_concurrent_requests() {
2185        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2186
2187        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2188        let waiting_readers_clone = waiting_readers.clone();
2189
2190        futures::join!(
2191            async move {
2192                let block_server = BlockServer::new(
2193                    BLOCK_SIZE,
2194                    Arc::new(MockInterface {
2195                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2196                            let (tx, rx) = oneshot::channel();
2197                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2198                            Box::pin(async move {
2199                                let _ = rx.await;
2200                                Ok(())
2201                            })
2202                        })),
2203                        ..MockInterface::default()
2204                    }),
2205                );
2206                block_server.handle_requests(stream).await.unwrap();
2207            },
2208            async move {
2209                let (session_proxy, server) = fidl::endpoints::create_proxy();
2210
2211                proxy.open_session(server).unwrap();
2212
2213                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2214                let vmo_id = session_proxy
2215                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2216                    .await
2217                    .unwrap()
2218                    .unwrap();
2219
2220                let mut fifo =
2221                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2222                let (mut reader, mut writer) = fifo.async_io();
2223
2224                writer
2225                    .write_entries(&BlockFifoRequest {
2226                        command: BlockFifoCommand {
2227                            opcode: BlockOpcode::Read.into_primitive(),
2228                            ..Default::default()
2229                        },
2230                        reqid: 1,
2231                        dev_offset: 1, // Intentionally use the same as `reqid`.
2232                        vmoid: vmo_id.id,
2233                        length: 1,
2234                        ..Default::default()
2235                    })
2236                    .await
2237                    .unwrap();
2238
2239                writer
2240                    .write_entries(&BlockFifoRequest {
2241                        command: BlockFifoCommand {
2242                            opcode: BlockOpcode::Read.into_primitive(),
2243                            ..Default::default()
2244                        },
2245                        reqid: 2,
2246                        dev_offset: 2,
2247                        vmoid: vmo_id.id,
2248                        length: 1,
2249                        ..Default::default()
2250                    })
2251                    .await
2252                    .unwrap();
2253
2254                // Wait till both those entries are pending.
2255                poll_fn(|cx: &mut Context<'_>| {
2256                    if waiting_readers.lock().len() == 2 {
2257                        Poll::Ready(())
2258                    } else {
2259                        // Yield to the executor.
2260                        cx.waker().wake_by_ref();
2261                        Poll::Pending
2262                    }
2263                })
2264                .await;
2265
2266                let mut response = BlockFifoResponse::default();
2267                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2268
2269                let (id, tx) = waiting_readers.lock().pop().unwrap();
2270                tx.send(()).unwrap();
2271
2272                reader.read_entries(&mut response).await.unwrap();
2273                assert_eq!(response.status, zx::sys::ZX_OK);
2274                assert_eq!(response.reqid, id);
2275
2276                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2277
2278                let (id, tx) = waiting_readers.lock().pop().unwrap();
2279                tx.send(()).unwrap();
2280
2281                reader.read_entries(&mut response).await.unwrap();
2282                assert_eq!(response.status, zx::sys::ZX_OK);
2283                assert_eq!(response.reqid, id);
2284            }
2285        );
2286    }
2287
2288    #[fuchsia::test]
2289    async fn test_groups() {
2290        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2291
2292        futures::join!(
2293            async move {
2294                let block_server = BlockServer::new(
2295                    BLOCK_SIZE,
2296                    Arc::new(MockInterface {
2297                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2298                        ..MockInterface::default()
2299                    }),
2300                );
2301                block_server.handle_requests(stream).await.unwrap();
2302            },
2303            async move {
2304                let (session_proxy, server) = fidl::endpoints::create_proxy();
2305
2306                proxy.open_session(server).unwrap();
2307
2308                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2309                let vmo_id = session_proxy
2310                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2311                    .await
2312                    .unwrap()
2313                    .unwrap();
2314
2315                let mut fifo =
2316                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2317                let (mut reader, mut writer) = fifo.async_io();
2318
2319                writer
2320                    .write_entries(&BlockFifoRequest {
2321                        command: BlockFifoCommand {
2322                            opcode: BlockOpcode::Read.into_primitive(),
2323                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2324                            ..Default::default()
2325                        },
2326                        group: 1,
2327                        vmoid: vmo_id.id,
2328                        length: 1,
2329                        ..Default::default()
2330                    })
2331                    .await
2332                    .unwrap();
2333
2334                writer
2335                    .write_entries(&BlockFifoRequest {
2336                        command: BlockFifoCommand {
2337                            opcode: BlockOpcode::Read.into_primitive(),
2338                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2339                            ..Default::default()
2340                        },
2341                        reqid: 2,
2342                        group: 1,
2343                        vmoid: vmo_id.id,
2344                        length: 1,
2345                        ..Default::default()
2346                    })
2347                    .await
2348                    .unwrap();
2349
2350                let mut response = BlockFifoResponse::default();
2351                reader.read_entries(&mut response).await.unwrap();
2352                assert_eq!(response.status, zx::sys::ZX_OK);
2353                assert_eq!(response.reqid, 2);
2354                assert_eq!(response.group, 1);
2355            }
2356        );
2357    }
2358
2359    #[fuchsia::test]
2360    async fn test_group_error() {
2361        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2362
2363        let counter = Arc::new(AtomicU64::new(0));
2364        let counter_clone = counter.clone();
2365
2366        futures::join!(
2367            async move {
2368                let block_server = BlockServer::new(
2369                    BLOCK_SIZE,
2370                    Arc::new(MockInterface {
2371                        read_hook: Some(Box::new(move |_, _, _, _| {
2372                            counter_clone.fetch_add(1, Ordering::Relaxed);
2373                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2374                        })),
2375                        ..MockInterface::default()
2376                    }),
2377                );
2378                block_server.handle_requests(stream).await.unwrap();
2379            },
2380            async move {
2381                let (session_proxy, server) = fidl::endpoints::create_proxy();
2382
2383                proxy.open_session(server).unwrap();
2384
2385                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2386                let vmo_id = session_proxy
2387                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2388                    .await
2389                    .unwrap()
2390                    .unwrap();
2391
2392                let mut fifo =
2393                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2394                let (mut reader, mut writer) = fifo.async_io();
2395
2396                writer
2397                    .write_entries(&BlockFifoRequest {
2398                        command: BlockFifoCommand {
2399                            opcode: BlockOpcode::Read.into_primitive(),
2400                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2401                            ..Default::default()
2402                        },
2403                        group: 1,
2404                        vmoid: vmo_id.id,
2405                        length: 1,
2406                        ..Default::default()
2407                    })
2408                    .await
2409                    .unwrap();
2410
2411                // Wait until processed.
2412                poll_fn(|cx: &mut Context<'_>| {
2413                    if counter.load(Ordering::Relaxed) == 1 {
2414                        Poll::Ready(())
2415                    } else {
2416                        // Yield to the executor.
2417                        cx.waker().wake_by_ref();
2418                        Poll::Pending
2419                    }
2420                })
2421                .await;
2422
2423                let mut response = BlockFifoResponse::default();
2424                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2425
2426                writer
2427                    .write_entries(&BlockFifoRequest {
2428                        command: BlockFifoCommand {
2429                            opcode: BlockOpcode::Read.into_primitive(),
2430                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2431                            ..Default::default()
2432                        },
2433                        group: 1,
2434                        vmoid: vmo_id.id,
2435                        length: 1,
2436                        ..Default::default()
2437                    })
2438                    .await
2439                    .unwrap();
2440
2441                writer
2442                    .write_entries(&BlockFifoRequest {
2443                        command: BlockFifoCommand {
2444                            opcode: BlockOpcode::Read.into_primitive(),
2445                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2446                            ..Default::default()
2447                        },
2448                        reqid: 2,
2449                        group: 1,
2450                        vmoid: vmo_id.id,
2451                        length: 1,
2452                        ..Default::default()
2453                    })
2454                    .await
2455                    .unwrap();
2456
2457                reader.read_entries(&mut response).await.unwrap();
2458                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2459                assert_eq!(response.reqid, 2);
2460                assert_eq!(response.group, 1);
2461
2462                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2463
2464                // Only the first request should have been processed.
2465                assert_eq!(counter.load(Ordering::Relaxed), 1);
2466            }
2467        );
2468    }
2469
2470    #[fuchsia::test]
2471    async fn test_group_with_two_lasts() {
2472        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2473
2474        let (tx, rx) = oneshot::channel();
2475
2476        futures::join!(
2477            async move {
2478                let rx = Mutex::new(Some(rx));
2479                let block_server = BlockServer::new(
2480                    BLOCK_SIZE,
2481                    Arc::new(MockInterface {
2482                        read_hook: Some(Box::new(move |_, _, _, _| {
2483                            let rx = rx.lock().take().unwrap();
2484                            Box::pin(async {
2485                                let _ = rx.await;
2486                                Ok(())
2487                            })
2488                        })),
2489                        ..MockInterface::default()
2490                    }),
2491                );
2492                block_server.handle_requests(stream).await.unwrap();
2493            },
2494            async move {
2495                let (session_proxy, server) = fidl::endpoints::create_proxy();
2496
2497                proxy.open_session(server).unwrap();
2498
2499                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2500                let vmo_id = session_proxy
2501                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2502                    .await
2503                    .unwrap()
2504                    .unwrap();
2505
2506                let mut fifo =
2507                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2508                let (mut reader, mut writer) = fifo.async_io();
2509
2510                writer
2511                    .write_entries(&BlockFifoRequest {
2512                        command: BlockFifoCommand {
2513                            opcode: BlockOpcode::Read.into_primitive(),
2514                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2515                            ..Default::default()
2516                        },
2517                        reqid: 1,
2518                        group: 1,
2519                        vmoid: vmo_id.id,
2520                        length: 1,
2521                        ..Default::default()
2522                    })
2523                    .await
2524                    .unwrap();
2525
2526                writer
2527                    .write_entries(&BlockFifoRequest {
2528                        command: BlockFifoCommand {
2529                            opcode: BlockOpcode::Read.into_primitive(),
2530                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2531                            ..Default::default()
2532                        },
2533                        reqid: 2,
2534                        group: 1,
2535                        vmoid: vmo_id.id,
2536                        length: 1,
2537                        ..Default::default()
2538                    })
2539                    .await
2540                    .unwrap();
2541
2542                // Send an independent request to flush through the fifo.
2543                writer
2544                    .write_entries(&BlockFifoRequest {
2545                        command: BlockFifoCommand {
2546                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2547                            ..Default::default()
2548                        },
2549                        reqid: 3,
2550                        vmoid: vmo_id.id,
2551                        ..Default::default()
2552                    })
2553                    .await
2554                    .unwrap();
2555
2556                // It should succeed.
2557                let mut response = BlockFifoResponse::default();
2558                reader.read_entries(&mut response).await.unwrap();
2559                assert_eq!(response.status, zx::sys::ZX_OK);
2560                assert_eq!(response.reqid, 3);
2561
2562                // Now release the original request.
2563                tx.send(()).unwrap();
2564
2565                // The response should be for the first message tagged as last, and it should be
2566                // an error because we sent two messages with the LAST marker.
2567                let mut response = BlockFifoResponse::default();
2568                reader.read_entries(&mut response).await.unwrap();
2569                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2570                assert_eq!(response.reqid, 1);
2571                assert_eq!(response.group, 1);
2572            }
2573        );
2574    }
2575
2576    #[fuchsia::test(allow_stalls = false)]
2577    async fn test_requests_dont_block_sessions() {
2578        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2579
2580        let (tx, rx) = oneshot::channel();
2581
2582        fasync::Task::local(async move {
2583            let rx = Mutex::new(Some(rx));
2584            let block_server = BlockServer::new(
2585                BLOCK_SIZE,
2586                Arc::new(MockInterface {
2587                    read_hook: Some(Box::new(move |_, _, _, _| {
2588                        let rx = rx.lock().take().unwrap();
2589                        Box::pin(async {
2590                            let _ = rx.await;
2591                            Ok(())
2592                        })
2593                    })),
2594                    ..MockInterface::default()
2595                }),
2596            );
2597            block_server.handle_requests(stream).await.unwrap();
2598        })
2599        .detach();
2600
2601        let mut fut = pin!(async {
2602            let (session_proxy, server) = fidl::endpoints::create_proxy();
2603
2604            proxy.open_session(server).unwrap();
2605
2606            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2607            let vmo_id = session_proxy
2608                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2609                .await
2610                .unwrap()
2611                .unwrap();
2612
2613            let mut fifo =
2614                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2615            let (mut reader, mut writer) = fifo.async_io();
2616
2617            writer
2618                .write_entries(&BlockFifoRequest {
2619                    command: BlockFifoCommand {
2620                        opcode: BlockOpcode::Read.into_primitive(),
2621                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2622                        ..Default::default()
2623                    },
2624                    reqid: 1,
2625                    group: 1,
2626                    vmoid: vmo_id.id,
2627                    length: 1,
2628                    ..Default::default()
2629                })
2630                .await
2631                .unwrap();
2632
2633            let mut response = BlockFifoResponse::default();
2634            reader.read_entries(&mut response).await.unwrap();
2635            assert_eq!(response.status, zx::sys::ZX_OK);
2636        });
2637
2638        // The response won't come back until we send on `tx`.
2639        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2640
2641        let mut fut2 = pin!(proxy.get_volume_info());
2642
2643        // get_volume_info is set up to stall forever.
2644        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2645
2646        // If we now free up the first future, it should resolve; the stalled call to
2647        // get_volume_info should not block the fifo response.
2648        let _ = tx.send(());
2649
2650        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2651    }
2652
2653    #[fuchsia::test]
2654    async fn test_request_flow_control() {
2655        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2656
2657        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2658        // server will block until that happens.
2659        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2660        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2661        let event_clone = event.clone();
2662        futures::join!(
2663            async move {
2664                let block_server = BlockServer::new(
2665                    BLOCK_SIZE,
2666                    Arc::new(MockInterface {
2667                        read_hook: Some(Box::new(move |_, _, _, _| {
2668                            let event_clone = event_clone.clone();
2669                            Box::pin(async move {
2670                                if !event_clone.1.load(Ordering::SeqCst) {
2671                                    event_clone.0.listen().await;
2672                                }
2673                                Ok(())
2674                            })
2675                        })),
2676                        ..MockInterface::default()
2677                    }),
2678                );
2679                block_server.handle_requests(stream).await.unwrap();
2680            },
2681            async move {
2682                let (session_proxy, server) = fidl::endpoints::create_proxy();
2683
2684                proxy.open_session(server).unwrap();
2685
2686                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2687                let vmo_id = session_proxy
2688                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2689                    .await
2690                    .unwrap()
2691                    .unwrap();
2692
2693                let mut fifo =
2694                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2695                let (mut reader, mut writer) = fifo.async_io();
2696
2697                for i in 0..MAX_REQUESTS {
2698                    writer
2699                        .write_entries(&BlockFifoRequest {
2700                            command: BlockFifoCommand {
2701                                opcode: BlockOpcode::Read.into_primitive(),
2702                                ..Default::default()
2703                            },
2704                            reqid: (i + 1) as u32,
2705                            dev_offset: i,
2706                            vmoid: vmo_id.id,
2707                            length: 1,
2708                            ..Default::default()
2709                        })
2710                        .await
2711                        .unwrap();
2712                }
2713                assert!(
2714                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2715                        command: BlockFifoCommand {
2716                            opcode: BlockOpcode::Read.into_primitive(),
2717                            ..Default::default()
2718                        },
2719                        reqid: u32::MAX,
2720                        dev_offset: MAX_REQUESTS,
2721                        vmoid: vmo_id.id,
2722                        length: 1,
2723                        ..Default::default()
2724                    })))
2725                    .is_pending()
2726                );
2727                // OK, let the server start to process.
2728                event.1.store(true, Ordering::SeqCst);
2729                event.0.notify(usize::MAX);
2730                // For each entry we read, make sure we can write a new one in.
2731                let mut finished_reqids = vec![];
2732                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2733                    let mut response = BlockFifoResponse::default();
2734                    reader.read_entries(&mut response).await.unwrap();
2735                    assert_eq!(response.status, zx::sys::ZX_OK);
2736                    finished_reqids.push(response.reqid);
2737                    writer
2738                        .write_entries(&BlockFifoRequest {
2739                            command: BlockFifoCommand {
2740                                opcode: BlockOpcode::Read.into_primitive(),
2741                                ..Default::default()
2742                            },
2743                            reqid: (i + 1) as u32,
2744                            dev_offset: i,
2745                            vmoid: vmo_id.id,
2746                            length: 1,
2747                            ..Default::default()
2748                        })
2749                        .await
2750                        .unwrap();
2751                }
2752                let mut response = BlockFifoResponse::default();
2753                for _ in 0..MAX_REQUESTS {
2754                    reader.read_entries(&mut response).await.unwrap();
2755                    assert_eq!(response.status, zx::sys::ZX_OK);
2756                    finished_reqids.push(response.reqid);
2757                }
2758                // Verify that we got a response for each request.  Note that we can't assume FIFO
2759                // ordering.
2760                finished_reqids.sort();
2761                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2762                let mut i = 1;
2763                for reqid in finished_reqids {
2764                    assert_eq!(reqid, i);
2765                    i += 1;
2766                }
2767            }
2768        );
2769    }
2770
2771    #[fuchsia::test]
2772    async fn test_passthrough_io_with_fixed_map() {
2773        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2774
2775        let expected_op = Arc::new(Mutex::new(None));
2776        let expected_op_clone = expected_op.clone();
2777        futures::join!(
2778            async {
2779                let block_server = BlockServer::new(
2780                    BLOCK_SIZE,
2781                    Arc::new(IoMockInterface {
2782                        return_errors: false,
2783                        do_checks: true,
2784                        expected_op: expected_op_clone,
2785                    }),
2786                );
2787                block_server.handle_requests(stream).await.unwrap();
2788            },
2789            async move {
2790                let (session_proxy, server) = fidl::endpoints::create_proxy();
2791
2792                let mapping = fblock::BlockOffsetMapping {
2793                    source_block_offset: 0,
2794                    target_block_offset: 10,
2795                    length: 20,
2796                };
2797                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2798
2799                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2800                let vmo_id = session_proxy
2801                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2802                    .await
2803                    .unwrap()
2804                    .unwrap();
2805
2806                let mut fifo =
2807                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2808                let (mut reader, mut writer) = fifo.async_io();
2809
2810                // READ
2811                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2812                writer
2813                    .write_entries(&BlockFifoRequest {
2814                        command: BlockFifoCommand {
2815                            opcode: BlockOpcode::Read.into_primitive(),
2816                            ..Default::default()
2817                        },
2818                        vmoid: vmo_id.id,
2819                        dev_offset: 1,
2820                        length: 2,
2821                        vmo_offset: 3,
2822                        ..Default::default()
2823                    })
2824                    .await
2825                    .unwrap();
2826
2827                let mut response = BlockFifoResponse::default();
2828                reader.read_entries(&mut response).await.unwrap();
2829                assert_eq!(response.status, zx::sys::ZX_OK);
2830
2831                // WRITE
2832                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2833                writer
2834                    .write_entries(&BlockFifoRequest {
2835                        command: BlockFifoCommand {
2836                            opcode: BlockOpcode::Write.into_primitive(),
2837                            ..Default::default()
2838                        },
2839                        vmoid: vmo_id.id,
2840                        dev_offset: 4,
2841                        length: 5,
2842                        vmo_offset: 6,
2843                        ..Default::default()
2844                    })
2845                    .await
2846                    .unwrap();
2847
2848                reader.read_entries(&mut response).await.unwrap();
2849                assert_eq!(response.status, zx::sys::ZX_OK);
2850
2851                // FLUSH
2852                *expected_op.lock() = Some(ExpectedOp::Flush);
2853                writer
2854                    .write_entries(&BlockFifoRequest {
2855                        command: BlockFifoCommand {
2856                            opcode: BlockOpcode::Flush.into_primitive(),
2857                            ..Default::default()
2858                        },
2859                        ..Default::default()
2860                    })
2861                    .await
2862                    .unwrap();
2863
2864                reader.read_entries(&mut response).await.unwrap();
2865                assert_eq!(response.status, zx::sys::ZX_OK);
2866
2867                // TRIM
2868                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2869                writer
2870                    .write_entries(&BlockFifoRequest {
2871                        command: BlockFifoCommand {
2872                            opcode: BlockOpcode::Trim.into_primitive(),
2873                            ..Default::default()
2874                        },
2875                        dev_offset: 7,
2876                        length: 3,
2877                        ..Default::default()
2878                    })
2879                    .await
2880                    .unwrap();
2881
2882                reader.read_entries(&mut response).await.unwrap();
2883                assert_eq!(response.status, zx::sys::ZX_OK);
2884
2885                // READ past window
2886                *expected_op.lock() = None;
2887                writer
2888                    .write_entries(&BlockFifoRequest {
2889                        command: BlockFifoCommand {
2890                            opcode: BlockOpcode::Read.into_primitive(),
2891                            ..Default::default()
2892                        },
2893                        vmoid: vmo_id.id,
2894                        dev_offset: 19,
2895                        length: 2,
2896                        vmo_offset: 3,
2897                        ..Default::default()
2898                    })
2899                    .await
2900                    .unwrap();
2901
2902                reader.read_entries(&mut response).await.unwrap();
2903                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2904
2905                std::mem::drop(proxy);
2906            }
2907        );
2908    }
2909
2910    #[fuchsia::test]
2911    fn operation_map() {
2912        const BLOCK_SIZE: u32 = 512;
2913
2914        #[track_caller]
2915        fn expect_map_result(
2916            mut operation: Operation,
2917            mapping: Option<BlockOffsetMapping>,
2918            max_blocks: Option<NonZero<u32>>,
2919            expected_operations: Vec<Operation>,
2920        ) {
2921            let mut ops = vec![];
2922            while let Some(remainder) =
2923                operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2924            {
2925                ops.push(operation);
2926                operation = remainder;
2927            }
2928            ops.push(operation);
2929            assert_eq!(ops, expected_operations);
2930        }
2931
2932        // No limits
2933        expect_map_result(
2934            Operation::Read {
2935                device_block_offset: 10,
2936                block_count: 200,
2937                _unused: 0,
2938                vmo_offset: 0,
2939                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2940            },
2941            None,
2942            None,
2943            vec![Operation::Read {
2944                device_block_offset: 10,
2945                block_count: 200,
2946                _unused: 0,
2947                vmo_offset: 0,
2948                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2949            }],
2950        );
2951
2952        // Max block count
2953        expect_map_result(
2954            Operation::Read {
2955                device_block_offset: 10,
2956                block_count: 200,
2957                _unused: 0,
2958                vmo_offset: 0,
2959                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2960            },
2961            None,
2962            NonZero::new(120),
2963            vec![
2964                Operation::Read {
2965                    device_block_offset: 10,
2966                    block_count: 120,
2967                    _unused: 0,
2968                    vmo_offset: 0,
2969                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2970                },
2971                Operation::Read {
2972                    device_block_offset: 130,
2973                    block_count: 80,
2974                    _unused: 0,
2975                    vmo_offset: 120 * BLOCK_SIZE as u64,
2976                    options: ReadOptions {
2977                        // The DUN should be offset by the number of blocks in the first request.
2978                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
2979                    },
2980                },
2981            ],
2982        );
2983        expect_map_result(
2984            Operation::Trim { device_block_offset: 10, block_count: 200 },
2985            None,
2986            NonZero::new(120),
2987            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2988        );
2989
2990        // Remapping + Max block count
2991        expect_map_result(
2992            Operation::Read {
2993                device_block_offset: 10,
2994                block_count: 200,
2995                _unused: 0,
2996                vmo_offset: 0,
2997                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2998            },
2999            Some(BlockOffsetMapping {
3000                source_block_offset: 10,
3001                target_block_offset: 100,
3002                length: 200,
3003            }),
3004            NonZero::new(120),
3005            vec![
3006                Operation::Read {
3007                    device_block_offset: 100,
3008                    block_count: 120,
3009                    _unused: 0,
3010                    vmo_offset: 0,
3011                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3012                },
3013                Operation::Read {
3014                    device_block_offset: 220,
3015                    block_count: 80,
3016                    _unused: 0,
3017                    vmo_offset: 120 * BLOCK_SIZE as u64,
3018                    options: ReadOptions {
3019                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3020                    },
3021                },
3022            ],
3023        );
3024        expect_map_result(
3025            Operation::Trim { device_block_offset: 10, block_count: 200 },
3026            Some(BlockOffsetMapping {
3027                source_block_offset: 10,
3028                target_block_offset: 100,
3029                length: 200,
3030            }),
3031            NonZero::new(120),
3032            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3033        );
3034    }
3035
3036    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3037    #[fuchsia::test]
3038    async fn test_pre_barrier_flush_failure() {
3039        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3040
3041        struct NoBarrierInterface;
3042        impl super::async_interface::Interface for NoBarrierInterface {
3043            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3044                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3045                    device_flags: fblock::DeviceFlag::empty(), // No BARRIER_SUPPORT
3046                    max_transfer_blocks: NonZero::new(100),
3047                    block_range: Some(0..100),
3048                    type_guid: [0; 16],
3049                    instance_guid: [0; 16],
3050                    name: "test".to_string(),
3051                    flags: 0,
3052                }))
3053            }
3054            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3055                Ok(())
3056            }
3057            async fn read(
3058                &self,
3059                _: u64,
3060                _: u32,
3061                _: &Arc<zx::Vmo>,
3062                _: u64,
3063                _: ReadOptions,
3064                _: TraceFlowId,
3065            ) -> Result<(), zx::Status> {
3066                unreachable!()
3067            }
3068            async fn write(
3069                &self,
3070                _: u64,
3071                _: u32,
3072                _: &Arc<zx::Vmo>,
3073                _: u64,
3074                _: WriteOptions,
3075                _: TraceFlowId,
3076            ) -> Result<(), zx::Status> {
3077                panic!("Write should not be called");
3078            }
3079            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3080                Err(zx::Status::IO)
3081            }
3082            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3083                unreachable!()
3084            }
3085        }
3086
3087        futures::join!(
3088            async move {
3089                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3090                block_server.handle_requests(stream).await.unwrap();
3091            },
3092            async move {
3093                let (session_proxy, server) = fidl::endpoints::create_proxy();
3094                proxy.open_session(server).unwrap();
3095                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3096                let vmo_id = session_proxy
3097                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3098                    .await
3099                    .unwrap()
3100                    .unwrap();
3101
3102                let mut fifo =
3103                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3104                let (mut reader, mut writer) = fifo.async_io();
3105
3106                writer
3107                    .write_entries(&BlockFifoRequest {
3108                        command: BlockFifoCommand {
3109                            opcode: BlockOpcode::Write.into_primitive(),
3110                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3111                            ..Default::default()
3112                        },
3113                        vmoid: vmo_id.id,
3114                        length: 1,
3115                        ..Default::default()
3116                    })
3117                    .await
3118                    .unwrap();
3119
3120                let mut response = BlockFifoResponse::default();
3121                reader.read_entries(&mut response).await.unwrap();
3122                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3123            }
3124        );
3125    }
3126
3127    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3128    // post-flush is not executed.
3129    #[fuchsia::test]
3130    async fn test_post_barrier_write_failure() {
3131        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3132
3133        struct NoBarrierInterface;
3134        impl super::async_interface::Interface for NoBarrierInterface {
3135            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3136                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3137                    device_flags: fblock::DeviceFlag::empty(), // No FUA_SUPPORT
3138                    max_transfer_blocks: NonZero::new(100),
3139                    block_range: Some(0..100),
3140                    type_guid: [0; 16],
3141                    instance_guid: [0; 16],
3142                    name: "test".to_string(),
3143                    flags: 0,
3144                }))
3145            }
3146            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3147                Ok(())
3148            }
3149            async fn read(
3150                &self,
3151                _: u64,
3152                _: u32,
3153                _: &Arc<zx::Vmo>,
3154                _: u64,
3155                _: ReadOptions,
3156                _: TraceFlowId,
3157            ) -> Result<(), zx::Status> {
3158                unreachable!()
3159            }
3160            async fn write(
3161                &self,
3162                _: u64,
3163                _: u32,
3164                _: &Arc<zx::Vmo>,
3165                _: u64,
3166                _: WriteOptions,
3167                _: TraceFlowId,
3168            ) -> Result<(), zx::Status> {
3169                Err(zx::Status::IO)
3170            }
3171            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3172                panic!("Flush should not be called")
3173            }
3174            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3175                unreachable!()
3176            }
3177        }
3178
3179        futures::join!(
3180            async move {
3181                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3182                block_server.handle_requests(stream).await.unwrap();
3183            },
3184            async move {
3185                let (session_proxy, server) = fidl::endpoints::create_proxy();
3186                proxy.open_session(server).unwrap();
3187                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3188                let vmo_id = session_proxy
3189                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3190                    .await
3191                    .unwrap()
3192                    .unwrap();
3193
3194                let mut fifo =
3195                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3196                let (mut reader, mut writer) = fifo.async_io();
3197
3198                writer
3199                    .write_entries(&BlockFifoRequest {
3200                        command: BlockFifoCommand {
3201                            opcode: BlockOpcode::Write.into_primitive(),
3202                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3203                            ..Default::default()
3204                        },
3205                        vmoid: vmo_id.id,
3206                        length: 1,
3207                        ..Default::default()
3208                    })
3209                    .await
3210                    .unwrap();
3211
3212                let mut response = BlockFifoResponse::default();
3213                reader.read_entries(&mut response).await.unwrap();
3214                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3215            }
3216        );
3217    }
3218}