block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37    Block(BlockInfo),
38    Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42    pub fn device_flags(&self) -> fblock::Flag {
43        match self {
44            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
45            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
46        }
47    }
48
49    pub fn block_count(&self) -> Option<u64> {
50        match self {
51            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
52            Self::Partition(PartitionInfo { block_range, .. }) => {
53                block_range.as_ref().map(|range| range.end - range.start)
54            }
55        }
56    }
57
58    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
59        match self {
60            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
61            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
62                max_transfer_blocks.clone()
63            }
64        }
65    }
66
67    fn max_transfer_size(&self, block_size: u32) -> u32 {
68        if let Some(max_blocks) = self.max_transfer_blocks() {
69            max_blocks.get() * block_size
70        } else {
71            MAX_TRANSFER_UNBOUNDED
72        }
73    }
74}
75
76/// Information associated with non-partition block devices.
77#[derive(Clone, Default)]
78pub struct BlockInfo {
79    pub device_flags: fblock::Flag,
80    pub block_count: u64,
81    pub max_transfer_blocks: Option<NonZero<u32>>,
82}
83
84/// Information associated with a block device that is also a partition.
85#[derive(Clone, Default)]
86pub struct PartitionInfo {
87    /// The device flags reported by the underlying device.
88    pub device_flags: fblock::Flag,
89    pub max_transfer_blocks: Option<NonZero<u32>>,
90    /// If `block_range` is None, the partition is a volume and may not be contiguous.
91    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
92    /// slices and use that (along with the slice and block sizes) to determine the block count.
93    pub block_range: Option<Range<u64>>,
94    pub type_guid: [u8; 16],
95    pub instance_guid: [u8; 16],
96    pub name: String,
97    pub flags: u64,
98}
99
100/// We internally keep track of active requests, so that when the server is torn down, we can
101/// deallocate all of the resources for pending requests.
102struct ActiveRequest<S> {
103    session: S,
104    group_or_request: GroupOrRequest,
105    trace_flow_id: TraceFlowId,
106    _epoch_guard: EpochGuard<'static>,
107    status: zx::Status,
108    count: u32,
109    req_id: Option<u32>,
110    decompression_info: Option<DecompressionInfo>,
111}
112
113struct DecompressionInfo {
114    // This is the range of compressed bytes in receiving buffer.
115    compressed_range: Range<usize>,
116
117    // This is the range in the target VMO where we will write uncompressed bytes.
118    uncompressed_range: Range<u64>,
119
120    bytes_so_far: u64,
121    mapping: Arc<VmoMapping>,
122    buffer: Option<Buffer<'static>>,
123}
124
125impl DecompressionInfo {
126    /// Returns the uncompressed slice.
127    fn uncompressed_slice(&self) -> *mut [u8] {
128        std::ptr::slice_from_raw_parts_mut(
129            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
130            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
131        )
132    }
133}
134
135pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
136
137impl<S> Default for ActiveRequests<S> {
138    fn default() -> Self {
139        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
140    }
141}
142
143impl<S> ActiveRequests<S> {
144    fn complete_and_take_response(
145        &self,
146        request_id: RequestId,
147        status: zx::Status,
148    ) -> Option<(S, BlockFifoResponse)> {
149        self.0.lock().complete_and_take_response(request_id, status)
150    }
151
152    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
153        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
154    }
155}
156
157struct ActiveRequestsInner<S> {
158    requests: Slab<ActiveRequest<S>>,
159}
160
161// Keeps track of all the requests that are currently being processed
162impl<S> ActiveRequestsInner<S> {
163    /// Completes a request.
164    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
165        let group = &mut self.requests[request_id.0];
166
167        group.count = group.count.checked_sub(1).unwrap();
168        if status != zx::Status::OK && group.status == zx::Status::OK {
169            group.status = status
170        }
171
172        fuchsia_trace::duration!(
173            "storage",
174            "block_server::finish_transaction",
175            "request_id" => request_id.0,
176            "group_completed" => group.count == 0,
177            "status" => status.into_raw());
178        if let Some(trace_flow_id) = group.trace_flow_id {
179            fuchsia_trace::flow_step!(
180                "storage",
181                "block_server::finish_request",
182                trace_flow_id.get().into()
183            );
184        }
185
186        if group.count == 0
187            && group.status == zx::Status::OK
188            && let Some(info) = &mut group.decompression_info
189        {
190            thread_local! {
191                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
192                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
193            }
194            DECOMPRESSOR.with(|decompressor| {
195                // SAFETY: We verified `uncompressed_range` fits within our mapping.
196                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
197                let mut decompressor = decompressor.borrow_mut();
198                if let Err(error) = decompressor.decompress_to_buffer(
199                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
200                    target_slice,
201                ) {
202                    log::warn!(error:?; "Decompression error");
203                    group.status = zx::Status::IO_DATA_INTEGRITY;
204                };
205            });
206        }
207    }
208
209    /// Takes the response if all requests are finished.
210    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
211        let group = &self.requests[request_id.0];
212        match group.req_id {
213            Some(reqid) if group.count == 0 => {
214                let group = self.requests.remove(request_id.0);
215                Some((
216                    group.session,
217                    BlockFifoResponse {
218                        status: group.status.into_raw(),
219                        reqid,
220                        group: group.group_or_request.group_id().unwrap_or(0),
221                        ..Default::default()
222                    },
223                ))
224            }
225            _ => None,
226        }
227    }
228
229    /// Competes the request and returns a response if the request group is finished.
230    fn complete_and_take_response(
231        &mut self,
232        request_id: RequestId,
233        status: zx::Status,
234    ) -> Option<(S, BlockFifoResponse)> {
235        self.complete(request_id, status);
236        self.take_response(request_id)
237    }
238}
239
240/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
241/// cbindgen:no-export
242pub struct BlockServer<SM> {
243    block_size: u32,
244    session_manager: Arc<SM>,
245}
246
247/// A single entry in `[OffsetMap]`.
248#[derive(Debug)]
249pub struct BlockOffsetMapping {
250    source_block_offset: u64,
251    target_block_offset: u64,
252    length: u64,
253}
254
255impl BlockOffsetMapping {
256    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
257        blocks.0 >= self.source_block_offset
258            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
259    }
260}
261
262impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
263    type Error = zx::Status;
264
265    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
266        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
267        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
268        Ok(Self {
269            source_block_offset: wire.source_block_offset,
270            target_block_offset: wire.target_block_offset,
271            length: wire.length,
272        })
273    }
274}
275
276/// Remaps the offset of block requests based on an internal map, and truncates long requests.
277pub struct OffsetMap {
278    mapping: Option<BlockOffsetMapping>,
279    max_transfer_blocks: Option<NonZero<u32>>,
280}
281
282impl OffsetMap {
283    /// An OffsetMap that remaps requests.
284    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
285        Self { mapping: Some(mapping), max_transfer_blocks }
286    }
287
288    /// An OffsetMap that just enforces maximum request sizes.
289    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
290        Self { mapping: None, max_transfer_blocks }
291    }
292
293    pub fn is_empty(&self) -> bool {
294        self.mapping.is_none()
295    }
296
297    fn mapping(&self) -> Option<&BlockOffsetMapping> {
298        self.mapping.as_ref()
299    }
300
301    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
302        self.max_transfer_blocks
303    }
304}
305
306// Methods take Arc<Self> rather than &self because of
307// https://github.com/rust-lang/rust/issues/42940.
308pub trait SessionManager: 'static {
309    const SUPPORTS_DECOMPRESSION: bool;
310
311    type Session;
312
313    fn on_attach_vmo(
314        self: Arc<Self>,
315        vmo: &Arc<zx::Vmo>,
316    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
317
318    /// Creates a new session to handle `stream`.
319    /// The returned future should run until the session completes, for example when the client end
320    /// closes.
321    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
322    fn open_session(
323        self: Arc<Self>,
324        stream: fblock::SessionRequestStream,
325        offset_map: OffsetMap,
326        block_size: u32,
327    ) -> impl Future<Output = Result<(), Error>> + Send;
328
329    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
330    fn get_info(&self) -> Cow<'_, DeviceInfo>;
331
332    /// Called to handle the GetVolumeInfo FIDL call.
333    fn get_volume_info(
334        &self,
335    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
336    {
337        async { Err(zx::Status::NOT_SUPPORTED) }
338    }
339
340    /// Called to handle the QuerySlices FIDL call.
341    fn query_slices(
342        &self,
343        _start_slices: &[u64],
344    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
345        async { Err(zx::Status::NOT_SUPPORTED) }
346    }
347
348    /// Called to handle the Shrink FIDL call.
349    fn extend(
350        &self,
351        _start_slice: u64,
352        _slice_count: u64,
353    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
354        async { Err(zx::Status::NOT_SUPPORTED) }
355    }
356
357    /// Called to handle the Shrink FIDL call.
358    fn shrink(
359        &self,
360        _start_slice: u64,
361        _slice_count: u64,
362    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363        async { Err(zx::Status::NOT_SUPPORTED) }
364    }
365
366    /// Returns the active requests.
367    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
368}
369
370pub trait IntoSessionManager {
371    type SM: SessionManager;
372
373    fn into_session_manager(self) -> Arc<Self::SM>;
374}
375
376impl<SM: SessionManager> BlockServer<SM> {
377    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
378        Self { block_size, session_manager: session_manager.into_session_manager() }
379    }
380
381    pub fn session_manager(&self) -> &SM {
382        self.session_manager.as_ref()
383    }
384
385    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
386    pub async fn handle_requests(
387        &self,
388        mut requests: fvolume::VolumeRequestStream,
389    ) -> Result<(), Error> {
390        let scope = fasync::Scope::new();
391        loop {
392            match requests.try_next().await {
393                Ok(Some(request)) => {
394                    if let Some(session) = self.handle_request(request).await? {
395                        scope.spawn(session.map(|_| ()));
396                    }
397                }
398                Ok(None) => break,
399                Err(err) => log::warn!(err:?; "Invalid request"),
400            }
401        }
402        scope.await;
403        Ok(())
404    }
405
406    /// Processes a partition request.  If a new session task is created in response to the request,
407    /// it is returned.
408    async fn handle_request(
409        &self,
410        request: fvolume::VolumeRequest,
411    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
412        match request {
413            fvolume::VolumeRequest::GetInfo { responder } => {
414                let info = self.device_info();
415                let max_transfer_size = info.max_transfer_size(self.block_size);
416                let (block_count, mut flags) = match info.as_ref() {
417                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
418                        (*block_count, *device_flags)
419                    }
420                    DeviceInfo::Partition(partition_info) => {
421                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
422                            range.end - range.start
423                        } else {
424                            let volume_info = self.session_manager.get_volume_info().await?;
425                            volume_info.0.slice_size * volume_info.1.partition_slice_count
426                                / self.block_size as u64
427                        };
428                        (block_count, partition_info.device_flags)
429                    }
430                };
431                if SM::SUPPORTS_DECOMPRESSION {
432                    flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
433                }
434                responder.send(Ok(&fblock::BlockInfo {
435                    block_count,
436                    block_size: self.block_size,
437                    max_transfer_size,
438                    flags,
439                }))?;
440            }
441            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
442                let info = self.device_info();
443                return Ok(Some(self.session_manager.clone().open_session(
444                    session.into_stream(),
445                    OffsetMap::empty(info.max_transfer_blocks()),
446                    self.block_size,
447                )));
448            }
449            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
450                session,
451                mapping,
452                control_handle: _,
453            } => {
454                let info = self.device_info();
455                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
456                    Ok(m) => m,
457                    Err(status) => {
458                        session.close_with_epitaph(status)?;
459                        return Ok(None);
460                    }
461                };
462                if let Some(max) = info.block_count() {
463                    if initial_mapping.target_block_offset + initial_mapping.length > max {
464                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
465                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
466                        return Ok(None);
467                    }
468                }
469                return Ok(Some(self.session_manager.clone().open_session(
470                    session.into_stream(),
471                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
472                    self.block_size,
473                )));
474            }
475            fvolume::VolumeRequest::GetTypeGuid { responder } => {
476                let info = self.device_info();
477                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
478                    let mut guid =
479                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
480                    guid.value.copy_from_slice(&partition_info.type_guid);
481                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
482                } else {
483                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
484                }
485            }
486            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
487                let info = self.device_info();
488                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
489                    let mut guid =
490                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
491                    guid.value.copy_from_slice(&partition_info.instance_guid);
492                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
493                } else {
494                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
495                }
496            }
497            fvolume::VolumeRequest::GetName { responder } => {
498                let info = self.device_info();
499                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
500                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
501                } else {
502                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
503                }
504            }
505            fvolume::VolumeRequest::GetMetadata { responder } => {
506                let info = self.device_info();
507                if let DeviceInfo::Partition(info) = info.as_ref() {
508                    let mut type_guid =
509                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
510                    type_guid.value.copy_from_slice(&info.type_guid);
511                    let mut instance_guid =
512                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
513                    instance_guid.value.copy_from_slice(&info.instance_guid);
514                    responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
515                        name: Some(info.name.clone()),
516                        type_guid: Some(type_guid),
517                        instance_guid: Some(instance_guid),
518                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
519                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
520                        flags: Some(info.flags),
521                        ..Default::default()
522                    }))?;
523                } else {
524                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
525                }
526            }
527            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
528                match self.session_manager.query_slices(&start_slices).await {
529                    Ok(mut results) => {
530                        let results_len = results.len();
531                        assert!(results_len <= 16);
532                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
533                        responder.send(
534                            zx::sys::ZX_OK,
535                            &results.try_into().unwrap(),
536                            results_len as u64,
537                        )?;
538                    }
539                    Err(s) => {
540                        responder.send(
541                            s.into_raw(),
542                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
543                            0,
544                        )?;
545                    }
546                }
547            }
548            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
549                match self.session_manager.get_volume_info().await {
550                    Ok((manager_info, volume_info)) => {
551                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
552                    }
553                    Err(s) => responder.send(s.into_raw(), None, None)?,
554                }
555            }
556            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
557                responder.send(
558                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
559                        .into_raw(),
560                )?;
561            }
562            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
563                responder.send(
564                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
565                        .into_raw(),
566                )?;
567            }
568            fvolume::VolumeRequest::Destroy { responder, .. } => {
569                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
570            }
571        }
572        Ok(None)
573    }
574
575    fn device_info(&self) -> Cow<'_, DeviceInfo> {
576        self.session_manager.get_info()
577    }
578}
579
580struct SessionHelper<SM: SessionManager> {
581    session_manager: Arc<SM>,
582    offset_map: OffsetMap,
583    block_size: u32,
584    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
585    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
586}
587
588struct VmoMapping {
589    base: usize,
590    size: usize,
591}
592
593impl VmoMapping {
594    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
595        let size = vmo.get_size().unwrap() as usize;
596        Ok(Arc::new(Self {
597            base: fuchsia_runtime::vmar_root_self()
598                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
599                .inspect_err(|error| {
600                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
601                })?,
602            size,
603        }))
604    }
605}
606
607impl Drop for VmoMapping {
608    fn drop(&mut self) {
609        // SAFETY: We mapped this in `VmoMapping::new`.
610        unsafe {
611            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
612        }
613    }
614}
615
616impl<SM: SessionManager> SessionHelper<SM> {
617    fn new(
618        session_manager: Arc<SM>,
619        offset_map: OffsetMap,
620        block_size: u32,
621    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
622        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
623        Ok((
624            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
625            fifo,
626        ))
627    }
628
629    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
630        match request {
631            fblock::SessionRequest::GetFifo { responder } => {
632                let rights = zx::Rights::TRANSFER
633                    | zx::Rights::READ
634                    | zx::Rights::WRITE
635                    | zx::Rights::SIGNAL
636                    | zx::Rights::WAIT;
637                match self.peer_fifo.duplicate_handle(rights) {
638                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
639                    Err(s) => responder.send(Err(s.into_raw()))?,
640                }
641                Ok(())
642            }
643            fblock::SessionRequest::AttachVmo { vmo, responder } => {
644                let vmo = Arc::new(vmo);
645                let vmo_id = {
646                    let mut vmos = self.vmos.lock();
647                    if vmos.len() == u16::MAX as usize {
648                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
649                        return Ok(());
650                    } else {
651                        let vmo_id = match vmos.last_entry() {
652                            None => 1,
653                            Some(o) => {
654                                o.key().checked_add(1).unwrap_or_else(|| {
655                                    let mut vmo_id = 1;
656                                    // Find the first gap...
657                                    for (&id, _) in &*vmos {
658                                        if id > vmo_id {
659                                            break;
660                                        }
661                                        vmo_id = id + 1;
662                                    }
663                                    vmo_id
664                                })
665                            }
666                        };
667                        vmos.insert(vmo_id, (vmo.clone(), None));
668                        vmo_id
669                    }
670                };
671                self.session_manager.clone().on_attach_vmo(&vmo).await?;
672                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
673                Ok(())
674            }
675            fblock::SessionRequest::Close { responder } => {
676                responder.send(Ok(()))?;
677                Err(anyhow!("Closed"))
678            }
679        }
680    }
681
682    /// Decodes `request`.
683    fn decode_fifo_request(
684        &self,
685        session: SM::Session,
686        request: &BlockFifoRequest,
687    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
688        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
689
690        let request_bytes = request.length as u64 * self.block_size as u64;
691
692        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
693            .ok_or(zx::Status::INVALID_ARGS)
694            .and_then(|code| {
695                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
696                    if code != BlockOpcode::Read {
697                        return Err(zx::Status::INVALID_ARGS);
698                    }
699                    if !SM::SUPPORTS_DECOMPRESSION {
700                        return Err(zx::Status::NOT_SUPPORTED);
701                    }
702                }
703                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
704                    if request.length == 0 {
705                        return Err(zx::Status::INVALID_ARGS);
706                    }
707                    // Make sure the end offsets won't wrap.
708                    if request.dev_offset.checked_add(request.length as u64).is_none()
709                        || (code != BlockOpcode::Trim
710                            && request_bytes.checked_add(request.vmo_offset).is_none())
711                    {
712                        return Err(zx::Status::OUT_OF_RANGE);
713                    }
714                }
715                Ok(match code {
716                    BlockOpcode::Read => Operation::Read {
717                        device_block_offset: request.dev_offset,
718                        block_count: request.length,
719                        _unused: 0,
720                        vmo_offset: request
721                            .vmo_offset
722                            .checked_mul(self.block_size as u64)
723                            .ok_or(zx::Status::OUT_OF_RANGE)?,
724                        options: ReadOptions {
725                            inline_crypto_options: InlineCryptoOptions {
726                                dun: request.dun,
727                                slot: request.slot,
728                            },
729                        },
730                    },
731                    BlockOpcode::Write => {
732                        let mut options = WriteOptions {
733                            inline_crypto_options: InlineCryptoOptions {
734                                dun: request.dun,
735                                slot: request.slot,
736                            },
737                            ..WriteOptions::default()
738                        };
739                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
740                            options.flags |= WriteFlags::FORCE_ACCESS;
741                        }
742                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
743                            options.flags |= WriteFlags::PRE_BARRIER;
744                        }
745                        Operation::Write {
746                            device_block_offset: request.dev_offset,
747                            block_count: request.length,
748                            _unused: 0,
749                            options,
750                            vmo_offset: request
751                                .vmo_offset
752                                .checked_mul(self.block_size as u64)
753                                .ok_or(zx::Status::OUT_OF_RANGE)?,
754                        }
755                    }
756                    BlockOpcode::Flush => Operation::Flush,
757                    BlockOpcode::Trim => Operation::Trim {
758                        device_block_offset: request.dev_offset,
759                        block_count: request.length,
760                    },
761                    BlockOpcode::CloseVmo => Operation::CloseVmo,
762                })
763            });
764
765        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
766            GroupOrRequest::Group(request.group)
767        } else {
768            GroupOrRequest::Request(request.reqid)
769        };
770
771        let mut active_requests = self.session_manager.active_requests().0.lock();
772        let mut request_id = None;
773
774        // Multiple Block I/O request may be sent as a group.
775        // Notes:
776        // - the group is identified by the group id in the request
777        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
778        //   flag is set.
779        // - when processing a request of a group fails, subsequent requests of that
780        //   group will not be processed.
781        // - decompression is a special case, see block-fifo.h for semantics.
782        //
783        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
784        if group_or_request.is_group() {
785            // Search for an existing entry that matches this group.  NOTE: This is a potentially
786            // expensive way to find a group (it's iterating over all slots in the active-requests
787            // slab).  This can be optimised easily should we need to.
788            for (key, group) in &mut active_requests.requests {
789                if group.group_or_request == group_or_request {
790                    if group.req_id.is_some() {
791                        // We have already received a request tagged as last.
792                        if group.status == zx::Status::OK {
793                            group.status = zx::Status::INVALID_ARGS;
794                        }
795                        // Ignore this request.
796                        return Err(None);
797                    }
798                    // See if this is a continuation of a decompressed read.
799                    if group.status == zx::Status::OK
800                        && let Some(info) = &mut group.decompression_info
801                    {
802                        if let Ok(Operation::Read {
803                            device_block_offset,
804                            mut block_count,
805                            options,
806                            vmo_offset: 0,
807                            ..
808                        }) = operation
809                        {
810                            let remaining_bytes = info
811                                .compressed_range
812                                .end
813                                .next_multiple_of(self.block_size as usize)
814                                as u64
815                                - info.bytes_so_far;
816                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
817                                || request.total_compressed_bytes != 0
818                                || request.uncompressed_bytes != 0
819                                || request.compressed_prefix_bytes != 0
820                                || (flags.contains(BlockIoFlag::GROUP_LAST)
821                                    && info.bytes_so_far + request_bytes
822                                        < info.compressed_range.end as u64)
823                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
824                                    && request_bytes >= remaining_bytes)
825                            {
826                                group.status = zx::Status::INVALID_ARGS;
827                            } else {
828                                // We are tolerant of `block_count` being more than we actually
829                                // need.  This can happen if the client is working with a larger
830                                // block size than the device block size.  For example, if Blobfs
831                                // has a 8192 byte block size, but the device might has a 512 byte
832                                // block size, it can ask for a multiple of 16 blocks, when fewer
833                                // than that might actually be required to hold the compressed data.
834                                // It is easier for us to tolerate this here than to get Blobfs to
835                                // change to pass only the blocks that are required.
836                                if request_bytes > remaining_bytes {
837                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
838                                }
839
840                                operation = Ok(Operation::ContinueDecompressedRead {
841                                    offset: info.bytes_so_far,
842                                    device_block_offset,
843                                    block_count,
844                                    options,
845                                });
846
847                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
848                            }
849                        } else {
850                            group.status = zx::Status::INVALID_ARGS;
851                        }
852                    }
853                    if flags.contains(BlockIoFlag::GROUP_LAST) {
854                        group.req_id = Some(request.reqid);
855                        // If the group has had an error, there is no point trying to issue this
856                        // request.
857                        if group.status != zx::Status::OK {
858                            operation = Err(group.status);
859                        }
860                    } else if group.status != zx::Status::OK {
861                        // The group has already encountered an error, so there is no point trying
862                        // to issue this request.
863                        return Err(None);
864                    }
865                    request_id = Some(RequestId(key));
866                    group.count += 1;
867                    break;
868                }
869            }
870        }
871
872        let is_single_request =
873            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
874
875        let mut decompression_info = None;
876        let vmo = match operation {
877            Ok(Operation::Read {
878                device_block_offset,
879                mut block_count,
880                options,
881                vmo_offset,
882                ..
883            }) => match self.vmos.lock().get_mut(&request.vmoid) {
884                Some((vmo, mapping)) => {
885                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
886                        let compressed_range = request.compressed_prefix_bytes as usize
887                            ..request.compressed_prefix_bytes as usize
888                                + request.total_compressed_bytes as usize;
889                        let required_buffer_size =
890                            compressed_range.end.next_multiple_of(self.block_size as usize);
891
892                        // Validate the initial decompression request.
893                        if compressed_range.start >= compressed_range.end
894                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
895                            || (is_single_request && request_bytes < compressed_range.end as u64)
896                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
897                        {
898                            Err(zx::Status::INVALID_ARGS)
899                        } else {
900                            // We are tolerant of `block_count` being more than we actually need.
901                            // This can happen if the client is working in a larger block size than
902                            // the device block size.  For example, Blobfs has a 8192 byte block
903                            // size, but the device might have a 512 byte block size.  It is easier
904                            // for us to tolerate this here than to get Blobfs to change to pass
905                            // only the blocks that are required.
906                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
907                                block_count =
908                                    (required_buffer_size / self.block_size as usize) as u32;
909                                required_buffer_size as u64
910                            } else {
911                                request_bytes
912                            };
913
914                            // To decompress, we need to have the target VMO mapped (cached).
915                            match mapping {
916                                Some(mapping) => Ok(mapping.clone()),
917                                None => {
918                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
919                                }
920                            }
921                            .and_then(|mapping| {
922                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
923                                // range.
924                                if vmo_offset
925                                    .checked_add(request.uncompressed_bytes as u64)
926                                    .is_some_and(|end| end <= mapping.size as u64)
927                                {
928                                    Ok(mapping)
929                                } else {
930                                    Err(zx::Status::OUT_OF_RANGE)
931                                }
932                            })
933                            .map(|mapping| {
934                                // Convert the operation into a `StartDecompressedRead`
935                                // operation. For non-fragmented requests, this will be the only
936                                // operation, but if it's a fragmented read,
937                                // `ContinueDecompressedRead` operations will follow.
938                                operation = Ok(Operation::StartDecompressedRead {
939                                    required_buffer_size,
940                                    device_block_offset,
941                                    block_count,
942                                    options,
943                                });
944                                // Record sufficient information so that we can decompress when all
945                                // the requests complete.
946                                decompression_info = Some(DecompressionInfo {
947                                    compressed_range,
948                                    bytes_so_far,
949                                    mapping,
950                                    uncompressed_range: vmo_offset
951                                        ..vmo_offset + request.uncompressed_bytes as u64,
952                                    buffer: None,
953                                });
954                                None
955                            })
956                        }
957                    } else {
958                        Ok(Some(vmo.clone()))
959                    }
960                }
961                None => Err(zx::Status::IO),
962            },
963            Ok(Operation::Write { .. }) => self
964                .vmos
965                .lock()
966                .get(&request.vmoid)
967                .cloned()
968                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
969            Ok(Operation::CloseVmo) => {
970                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
971                    let vmo_clone = vmo.clone();
972                    // Make sure the VMO is dropped after all current Epoch guards have been
973                    // dropped.
974                    Epoch::global().defer(move || drop(vmo_clone));
975                    Ok(Some(vmo))
976                })
977            }
978            _ => Ok(None),
979        }
980        .unwrap_or_else(|e| {
981            operation = Err(e);
982            None
983        });
984
985        let trace_flow_id = NonZero::new(request.trace_flow_id);
986        let request_id = request_id.unwrap_or_else(|| {
987            RequestId(active_requests.requests.insert(ActiveRequest {
988                session,
989                group_or_request,
990                trace_flow_id,
991                _epoch_guard: Epoch::global().guard(),
992                status: zx::Status::OK,
993                count: 1,
994                req_id: is_single_request.then_some(request.reqid),
995                decompression_info,
996            }))
997        });
998
999        Ok(DecodedRequest {
1000            request_id,
1001            trace_flow_id,
1002            operation: operation.map_err(|status| {
1003                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1004            })?,
1005            vmo,
1006        })
1007    }
1008
1009    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1010        std::mem::take(&mut *self.vmos.lock())
1011    }
1012
1013    /// Maps the request and returns the mapped request with an optional remainder.
1014    fn map_request(
1015        &self,
1016        mut request: DecodedRequest,
1017        active_request: &mut ActiveRequest<SM::Session>,
1018    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1019        if active_request.status != zx::Status::OK {
1020            return Err(zx::Status::BAD_STATE);
1021        }
1022        let mapping = self.offset_map.mapping();
1023        match (mapping, request.operation.blocks()) {
1024            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1025                return Err(zx::Status::OUT_OF_RANGE);
1026            }
1027            _ => {}
1028        }
1029        let remainder = request.operation.map(
1030            self.offset_map.mapping(),
1031            self.offset_map.max_transfer_blocks(),
1032            self.block_size,
1033        );
1034        if remainder.is_some() {
1035            active_request.count += 1;
1036        }
1037        static CACHE: AtomicU64 = AtomicU64::new(0);
1038        if let Some(context) =
1039            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1040        {
1041            use fuchsia_trace::ArgValue;
1042            let trace_args = [
1043                ArgValue::of("request_id", request.request_id.0),
1044                ArgValue::of("opcode", request.operation.trace_label()),
1045            ];
1046            let _scope =
1047                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1048            if let Some(trace_flow_id) = active_request.trace_flow_id {
1049                fuchsia_trace::flow_step(
1050                    &context,
1051                    "block_server::start_transaction",
1052                    trace_flow_id.get().into(),
1053                    &[],
1054                );
1055            }
1056        }
1057        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1058        Ok((request, remainder))
1059    }
1060
1061    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1062        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1063    }
1064}
1065
1066#[repr(transparent)]
1067#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1068pub struct RequestId(usize);
1069
1070#[derive(Clone, Debug)]
1071struct DecodedRequest {
1072    request_id: RequestId,
1073    trace_flow_id: TraceFlowId,
1074    operation: Operation,
1075    vmo: Option<Arc<zx::Vmo>>,
1076}
1077
1078/// cbindgen:no-export
1079pub type WriteFlags = block_protocol::WriteFlags;
1080pub type WriteOptions = block_protocol::WriteOptions;
1081pub type ReadOptions = block_protocol::ReadOptions;
1082
1083#[repr(C)]
1084#[derive(Clone, Debug, PartialEq, Eq)]
1085pub enum Operation {
1086    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1087    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1088    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1089    // write variant).
1090    Read {
1091        device_block_offset: u64,
1092        block_count: u32,
1093        _unused: u32,
1094        vmo_offset: u64,
1095        options: ReadOptions,
1096    },
1097    Write {
1098        device_block_offset: u64,
1099        block_count: u32,
1100        _unused: u32,
1101        vmo_offset: u64,
1102        options: WriteOptions,
1103    },
1104    Flush,
1105    Trim {
1106        device_block_offset: u64,
1107        block_count: u32,
1108    },
1109    /// This will never be seen by the C interface.
1110    CloseVmo,
1111    StartDecompressedRead {
1112        required_buffer_size: usize,
1113        device_block_offset: u64,
1114        block_count: u32,
1115        options: ReadOptions,
1116    },
1117    ContinueDecompressedRead {
1118        offset: u64,
1119        device_block_offset: u64,
1120        block_count: u32,
1121        options: ReadOptions,
1122    },
1123}
1124
1125impl Operation {
1126    fn trace_label(&self) -> &'static str {
1127        match self {
1128            Operation::Read { .. } => "read",
1129            Operation::Write { .. } => "write",
1130            Operation::Flush { .. } => "flush",
1131            Operation::Trim { .. } => "trim",
1132            Operation::CloseVmo { .. } => "close_vmo",
1133            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1134            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1135        }
1136    }
1137
1138    /// Returns (offset, length).
1139    fn blocks(&self) -> Option<(u64, u32)> {
1140        match self {
1141            Operation::Read { device_block_offset, block_count, .. }
1142            | Operation::Write { device_block_offset, block_count, .. }
1143            | Operation::Trim { device_block_offset, block_count, .. } => {
1144                Some((*device_block_offset, *block_count))
1145            }
1146            _ => None,
1147        }
1148    }
1149
1150    /// Returns mutable references to (offset, length).
1151    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1152        match self {
1153            Operation::Read { device_block_offset, block_count, .. }
1154            | Operation::Write { device_block_offset, block_count, .. }
1155            | Operation::Trim { device_block_offset, block_count, .. } => {
1156                Some((device_block_offset, block_count))
1157            }
1158            _ => None,
1159        }
1160    }
1161
1162    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1163    /// start of the operation.
1164    fn map(
1165        &mut self,
1166        mapping: Option<&BlockOffsetMapping>,
1167        max_blocks: Option<NonZero<u32>>,
1168        block_size: u32,
1169    ) -> Option<Self> {
1170        let mut max = match self {
1171            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1172            _ => None,
1173        };
1174        let (offset, length) = self.blocks_mut()?;
1175        let orig_offset = *offset;
1176        if let Some(mapping) = mapping {
1177            let delta = *offset - mapping.source_block_offset;
1178            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1179            *offset = mapping.target_block_offset + delta;
1180            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1181            max = match max {
1182                None => Some(mapping_max),
1183                Some(m) => Some(std::cmp::min(m, mapping_max)),
1184            };
1185        };
1186        if let Some(max) = max {
1187            if *length as u64 > max {
1188                let rem = (*length as u64 - max) as u32;
1189                *length = max as u32;
1190                return Some(match self {
1191                    Operation::Read {
1192                        device_block_offset: _,
1193                        block_count: _,
1194                        vmo_offset,
1195                        _unused,
1196                        options,
1197                    } => Operation::Read {
1198                        device_block_offset: orig_offset + max,
1199                        block_count: rem,
1200                        vmo_offset: *vmo_offset + max * block_size as u64,
1201                        _unused: *_unused,
1202                        options: *options,
1203                    },
1204                    Operation::Write {
1205                        device_block_offset: _,
1206                        block_count: _,
1207                        _unused,
1208                        vmo_offset,
1209                        options,
1210                    } => Operation::Write {
1211                        device_block_offset: orig_offset + max,
1212                        block_count: rem,
1213                        _unused: *_unused,
1214                        vmo_offset: *vmo_offset + max * block_size as u64,
1215                        options: *options,
1216                    },
1217                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1218                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1219                    }
1220                    _ => unreachable!(),
1221                });
1222            }
1223        }
1224        None
1225    }
1226
1227    /// Returns true if the specified write flags are set.
1228    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1229        if let Operation::Write { options, .. } = self {
1230            options.flags.contains(value)
1231        } else {
1232            false
1233        }
1234    }
1235
1236    /// Removes `value` from the request's write flags and returns true if the flag was set.
1237    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1238        if let Operation::Write { options, .. } = self {
1239            let result = options.flags.contains(value);
1240            options.flags.remove(value);
1241            result
1242        } else {
1243            false
1244        }
1245    }
1246}
1247
1248#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1249pub enum GroupOrRequest {
1250    Group(u16),
1251    Request(u32),
1252}
1253
1254impl GroupOrRequest {
1255    fn is_group(&self) -> bool {
1256        matches!(self, Self::Group(_))
1257    }
1258
1259    fn group_id(&self) -> Option<u16> {
1260        match self {
1261            Self::Group(id) => Some(*id),
1262            Self::Request(_) => None,
1263        }
1264    }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269    use super::{
1270        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1271        TraceFlowId,
1272    };
1273    use assert_matches::assert_matches;
1274    use block_protocol::{
1275        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1276        WriteOptions,
1277    };
1278    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1279    use fuchsia_sync::Mutex;
1280    use futures::FutureExt as _;
1281    use futures::channel::oneshot;
1282    use futures::future::BoxFuture;
1283    use std::borrow::Cow;
1284    use std::future::poll_fn;
1285    use std::num::NonZero;
1286    use std::pin::pin;
1287    use std::sync::Arc;
1288    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1289    use std::task::{Context, Poll};
1290    use zx::{AsHandleRef as _, HandleBased as _};
1291    use {
1292        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1293        fuchsia_async as fasync,
1294    };
1295
1296    #[derive(Default)]
1297    struct MockInterface {
1298        read_hook: Option<
1299            Box<
1300                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1301                    + Send
1302                    + Sync,
1303            >,
1304        >,
1305        write_hook:
1306            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1307        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1308    }
1309
1310    impl super::async_interface::Interface for MockInterface {
1311        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1312            Ok(())
1313        }
1314
1315        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1316            Cow::Owned(test_device_info())
1317        }
1318
1319        async fn read(
1320            &self,
1321            device_block_offset: u64,
1322            block_count: u32,
1323            vmo: &Arc<zx::Vmo>,
1324            vmo_offset: u64,
1325            _opts: ReadOptions,
1326            _trace_flow_id: TraceFlowId,
1327        ) -> Result<(), zx::Status> {
1328            if let Some(read_hook) = &self.read_hook {
1329                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1330            } else {
1331                unimplemented!();
1332            }
1333        }
1334
1335        async fn write(
1336            &self,
1337            device_block_offset: u64,
1338            _block_count: u32,
1339            _vmo: &Arc<zx::Vmo>,
1340            _vmo_offset: u64,
1341            opts: WriteOptions,
1342            _trace_flow_id: TraceFlowId,
1343        ) -> Result<(), zx::Status> {
1344            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1345                && let Some(barrier_hook) = &self.barrier_hook
1346            {
1347                barrier_hook()?;
1348            }
1349            if let Some(write_hook) = &self.write_hook {
1350                write_hook(device_block_offset).await
1351            } else {
1352                unimplemented!();
1353            }
1354        }
1355
1356        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1357            Ok(())
1358        }
1359
1360        async fn trim(
1361            &self,
1362            _device_block_offset: u64,
1363            _block_count: u32,
1364            _trace_flow_id: TraceFlowId,
1365        ) -> Result<(), zx::Status> {
1366            unreachable!();
1367        }
1368
1369        async fn get_volume_info(
1370            &self,
1371        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1372            // Hang forever for the test_requests_dont_block_sessions test.
1373            let () = std::future::pending().await;
1374            unreachable!();
1375        }
1376    }
1377
1378    const BLOCK_SIZE: u32 = 512;
1379    const MAX_TRANSFER_BLOCKS: u32 = 10;
1380
1381    fn test_device_info() -> DeviceInfo {
1382        DeviceInfo::Partition(PartitionInfo {
1383            device_flags: fblock::Flag::READONLY
1384                | fblock::Flag::BARRIER_SUPPORT
1385                | fblock::Flag::FUA_SUPPORT,
1386            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1387            block_range: Some(0..100),
1388            type_guid: [1; 16],
1389            instance_guid: [2; 16],
1390            name: "foo".to_string(),
1391            flags: 0xabcd,
1392        })
1393    }
1394
1395    #[fuchsia::test]
1396    async fn test_barriers_ordering() {
1397        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1398        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1399        let barrier_called = Arc::new(AtomicBool::new(false));
1400
1401        futures::join!(
1402            async move {
1403                let barrier_called_clone = barrier_called.clone();
1404                let block_server = BlockServer::new(
1405                    BLOCK_SIZE,
1406                    Arc::new(MockInterface {
1407                        barrier_hook: Some(Box::new(move || {
1408                            barrier_called.store(true, Ordering::Relaxed);
1409                            Ok(())
1410                        })),
1411                        write_hook: Some(Box::new(move |device_block_offset| {
1412                            let barrier_called = barrier_called_clone.clone();
1413                            Box::pin(async move {
1414                                // The sleep allows the server to reorder the fifo requests.
1415                                if device_block_offset % 2 == 0 {
1416                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1417                                        zx::MonotonicDuration::from_millis(200),
1418                                    ))
1419                                    .await;
1420                                }
1421                                assert!(barrier_called.load(Ordering::Relaxed));
1422                                Ok(())
1423                            })
1424                        })),
1425                        ..MockInterface::default()
1426                    }),
1427                );
1428                block_server.handle_requests(stream).await.unwrap();
1429            },
1430            async move {
1431                let (session_proxy, server) = fidl::endpoints::create_proxy();
1432
1433                proxy.open_session(server).unwrap();
1434
1435                let vmo_id = session_proxy
1436                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1437                    .await
1438                    .unwrap()
1439                    .unwrap();
1440                assert_ne!(vmo_id.id, 0);
1441
1442                let mut fifo =
1443                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1444                let (mut reader, mut writer) = fifo.async_io();
1445
1446                writer
1447                    .write_entries(&BlockFifoRequest {
1448                        command: BlockFifoCommand {
1449                            opcode: BlockOpcode::Write.into_primitive(),
1450                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1451                            ..Default::default()
1452                        },
1453                        vmoid: vmo_id.id,
1454                        dev_offset: 0,
1455                        length: 5,
1456                        vmo_offset: 6,
1457                        ..Default::default()
1458                    })
1459                    .await
1460                    .unwrap();
1461
1462                for i in 0..10 {
1463                    writer
1464                        .write_entries(&BlockFifoRequest {
1465                            command: BlockFifoCommand {
1466                                opcode: BlockOpcode::Write.into_primitive(),
1467                                ..Default::default()
1468                            },
1469                            vmoid: vmo_id.id,
1470                            dev_offset: i + 1,
1471                            length: 5,
1472                            vmo_offset: 6,
1473                            ..Default::default()
1474                        })
1475                        .await
1476                        .unwrap();
1477                }
1478                for _ in 0..11 {
1479                    let mut response = BlockFifoResponse::default();
1480                    reader.read_entries(&mut response).await.unwrap();
1481                    assert_eq!(response.status, zx::sys::ZX_OK);
1482                }
1483
1484                std::mem::drop(proxy);
1485            }
1486        );
1487    }
1488
1489    #[fuchsia::test]
1490    async fn test_info() {
1491        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1492
1493        futures::join!(
1494            async {
1495                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1496                block_server.handle_requests(stream).await.unwrap();
1497            },
1498            async {
1499                let expected_info = test_device_info();
1500                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1501                    info
1502                } else {
1503                    unreachable!()
1504                };
1505
1506                let block_info = proxy.get_info().await.unwrap().unwrap();
1507                assert_eq!(
1508                    block_info.block_count,
1509                    partition_info.block_range.as_ref().unwrap().end
1510                        - partition_info.block_range.as_ref().unwrap().start
1511                );
1512                assert_eq!(
1513                    block_info.flags,
1514                    fblock::Flag::READONLY
1515                        | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1516                        | fblock::Flag::BARRIER_SUPPORT
1517                        | fblock::Flag::FUA_SUPPORT
1518                );
1519
1520                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1521
1522                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1523                assert_eq!(status, zx::sys::ZX_OK);
1524                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1525
1526                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1527                assert_eq!(status, zx::sys::ZX_OK);
1528                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1529
1530                let (status, name) = proxy.get_name().await.unwrap();
1531                assert_eq!(status, zx::sys::ZX_OK);
1532                assert_eq!(name.as_ref(), Some(&partition_info.name));
1533
1534                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1535                assert_eq!(metadata.name, name);
1536                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1537                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1538                assert_eq!(
1539                    metadata.start_block_offset,
1540                    Some(partition_info.block_range.as_ref().unwrap().start)
1541                );
1542                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1543                assert_eq!(metadata.flags, Some(partition_info.flags));
1544
1545                std::mem::drop(proxy);
1546            }
1547        );
1548    }
1549
1550    #[fuchsia::test]
1551    async fn test_attach_vmo() {
1552        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1553
1554        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1555        let koid = vmo.get_koid().unwrap();
1556
1557        futures::join!(
1558            async {
1559                let block_server = BlockServer::new(
1560                    BLOCK_SIZE,
1561                    Arc::new(MockInterface {
1562                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1563                            assert_eq!(vmo.get_koid().unwrap(), koid);
1564                            Box::pin(async { Ok(()) })
1565                        })),
1566                        ..MockInterface::default()
1567                    }),
1568                );
1569                block_server.handle_requests(stream).await.unwrap();
1570            },
1571            async move {
1572                let (session_proxy, server) = fidl::endpoints::create_proxy();
1573
1574                proxy.open_session(server).unwrap();
1575
1576                let vmo_id = session_proxy
1577                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1578                    .await
1579                    .unwrap()
1580                    .unwrap();
1581                assert_ne!(vmo_id.id, 0);
1582
1583                let mut fifo =
1584                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1585                let (mut reader, mut writer) = fifo.async_io();
1586
1587                // Keep attaching VMOs until we eventually hit the maximum.
1588                let mut count = 1;
1589                loop {
1590                    match session_proxy
1591                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1592                        .await
1593                        .unwrap()
1594                    {
1595                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1596                        Err(e) => {
1597                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1598                            break;
1599                        }
1600                    }
1601
1602                    // Only test every 10 to keep test time down.
1603                    if count % 10 == 0 {
1604                        writer
1605                            .write_entries(&BlockFifoRequest {
1606                                command: BlockFifoCommand {
1607                                    opcode: BlockOpcode::Read.into_primitive(),
1608                                    ..Default::default()
1609                                },
1610                                vmoid: vmo_id.id,
1611                                length: 1,
1612                                ..Default::default()
1613                            })
1614                            .await
1615                            .unwrap();
1616
1617                        let mut response = BlockFifoResponse::default();
1618                        reader.read_entries(&mut response).await.unwrap();
1619                        assert_eq!(response.status, zx::sys::ZX_OK);
1620                    }
1621
1622                    count += 1;
1623                }
1624
1625                assert_eq!(count, u16::MAX as u64);
1626
1627                // Detach the original VMO, and make sure we can then attach another one.
1628                writer
1629                    .write_entries(&BlockFifoRequest {
1630                        command: BlockFifoCommand {
1631                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1632                            ..Default::default()
1633                        },
1634                        vmoid: vmo_id.id,
1635                        ..Default::default()
1636                    })
1637                    .await
1638                    .unwrap();
1639
1640                let mut response = BlockFifoResponse::default();
1641                reader.read_entries(&mut response).await.unwrap();
1642                assert_eq!(response.status, zx::sys::ZX_OK);
1643
1644                let new_vmo_id = session_proxy
1645                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1646                    .await
1647                    .unwrap()
1648                    .unwrap();
1649                // It should reuse the same ID.
1650                assert_eq!(new_vmo_id.id, vmo_id.id);
1651
1652                std::mem::drop(proxy);
1653            }
1654        );
1655    }
1656
1657    #[fuchsia::test]
1658    async fn test_close() {
1659        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1660
1661        let mut server = std::pin::pin!(
1662            async {
1663                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1664                block_server.handle_requests(stream).await.unwrap();
1665            }
1666            .fuse()
1667        );
1668
1669        let mut client = std::pin::pin!(
1670            async {
1671                let (session_proxy, server) = fidl::endpoints::create_proxy();
1672
1673                proxy.open_session(server).unwrap();
1674
1675                // Dropping the proxy should not cause the session to terminate because the session is
1676                // still live.
1677                std::mem::drop(proxy);
1678
1679                session_proxy.close().await.unwrap().unwrap();
1680
1681                // Keep the session alive.  Calling `close` should cause the server to terminate.
1682                let _: () = std::future::pending().await;
1683            }
1684            .fuse()
1685        );
1686
1687        futures::select!(
1688            _ = server => {}
1689            _ = client => unreachable!(),
1690        );
1691    }
1692
1693    #[derive(Default)]
1694    struct IoMockInterface {
1695        do_checks: bool,
1696        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1697        return_errors: bool,
1698    }
1699
1700    #[derive(Debug)]
1701    enum ExpectedOp {
1702        Read(u64, u32, u64),
1703        Write(u64, u32, u64),
1704        Trim(u64, u32),
1705        Flush,
1706    }
1707
1708    impl super::async_interface::Interface for IoMockInterface {
1709        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1710            Ok(())
1711        }
1712
1713        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1714            Cow::Owned(test_device_info())
1715        }
1716
1717        async fn read(
1718            &self,
1719            device_block_offset: u64,
1720            block_count: u32,
1721            _vmo: &Arc<zx::Vmo>,
1722            vmo_offset: u64,
1723            _opts: ReadOptions,
1724            _trace_flow_id: TraceFlowId,
1725        ) -> Result<(), zx::Status> {
1726            if self.return_errors {
1727                Err(zx::Status::INTERNAL)
1728            } else {
1729                if self.do_checks {
1730                    assert_matches!(
1731                        self.expected_op.lock().take(),
1732                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1733                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1734                        "Read {device_block_offset} {block_count} {vmo_offset}"
1735                    );
1736                }
1737                Ok(())
1738            }
1739        }
1740
1741        async fn write(
1742            &self,
1743            device_block_offset: u64,
1744            block_count: u32,
1745            _vmo: &Arc<zx::Vmo>,
1746            vmo_offset: u64,
1747            _write_opts: WriteOptions,
1748            _trace_flow_id: TraceFlowId,
1749        ) -> Result<(), zx::Status> {
1750            if self.return_errors {
1751                Err(zx::Status::NOT_SUPPORTED)
1752            } else {
1753                if self.do_checks {
1754                    assert_matches!(
1755                        self.expected_op.lock().take(),
1756                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1757                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1758                        "Write {device_block_offset} {block_count} {vmo_offset}"
1759                    );
1760                }
1761                Ok(())
1762            }
1763        }
1764
1765        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1766            if self.return_errors {
1767                Err(zx::Status::NO_RESOURCES)
1768            } else {
1769                if self.do_checks {
1770                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1771                }
1772                Ok(())
1773            }
1774        }
1775
1776        async fn trim(
1777            &self,
1778            device_block_offset: u64,
1779            block_count: u32,
1780            _trace_flow_id: TraceFlowId,
1781        ) -> Result<(), zx::Status> {
1782            if self.return_errors {
1783                Err(zx::Status::NO_MEMORY)
1784            } else {
1785                if self.do_checks {
1786                    assert_matches!(
1787                        self.expected_op.lock().take(),
1788                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1789                            block_count == b,
1790                        "Trim {device_block_offset} {block_count}"
1791                    );
1792                }
1793                Ok(())
1794            }
1795        }
1796    }
1797
1798    #[fuchsia::test]
1799    async fn test_io() {
1800        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1801
1802        let expected_op = Arc::new(Mutex::new(None));
1803        let expected_op_clone = expected_op.clone();
1804
1805        let server = async {
1806            let block_server = BlockServer::new(
1807                BLOCK_SIZE,
1808                Arc::new(IoMockInterface {
1809                    return_errors: false,
1810                    do_checks: true,
1811                    expected_op: expected_op_clone,
1812                }),
1813            );
1814            block_server.handle_requests(stream).await.unwrap();
1815        };
1816
1817        let client = async move {
1818            let (session_proxy, server) = fidl::endpoints::create_proxy();
1819
1820            proxy.open_session(server).unwrap();
1821
1822            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1823            let vmo_id = session_proxy
1824                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1825                .await
1826                .unwrap()
1827                .unwrap();
1828
1829            let mut fifo =
1830                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1831            let (mut reader, mut writer) = fifo.async_io();
1832
1833            // READ
1834            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1835            writer
1836                .write_entries(&BlockFifoRequest {
1837                    command: BlockFifoCommand {
1838                        opcode: BlockOpcode::Read.into_primitive(),
1839                        ..Default::default()
1840                    },
1841                    vmoid: vmo_id.id,
1842                    dev_offset: 1,
1843                    length: 2,
1844                    vmo_offset: 3,
1845                    ..Default::default()
1846                })
1847                .await
1848                .unwrap();
1849
1850            let mut response = BlockFifoResponse::default();
1851            reader.read_entries(&mut response).await.unwrap();
1852            assert_eq!(response.status, zx::sys::ZX_OK);
1853
1854            // WRITE
1855            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1856            writer
1857                .write_entries(&BlockFifoRequest {
1858                    command: BlockFifoCommand {
1859                        opcode: BlockOpcode::Write.into_primitive(),
1860                        ..Default::default()
1861                    },
1862                    vmoid: vmo_id.id,
1863                    dev_offset: 4,
1864                    length: 5,
1865                    vmo_offset: 6,
1866                    ..Default::default()
1867                })
1868                .await
1869                .unwrap();
1870
1871            let mut response = BlockFifoResponse::default();
1872            reader.read_entries(&mut response).await.unwrap();
1873            assert_eq!(response.status, zx::sys::ZX_OK);
1874
1875            // FLUSH
1876            *expected_op.lock() = Some(ExpectedOp::Flush);
1877            writer
1878                .write_entries(&BlockFifoRequest {
1879                    command: BlockFifoCommand {
1880                        opcode: BlockOpcode::Flush.into_primitive(),
1881                        ..Default::default()
1882                    },
1883                    ..Default::default()
1884                })
1885                .await
1886                .unwrap();
1887
1888            reader.read_entries(&mut response).await.unwrap();
1889            assert_eq!(response.status, zx::sys::ZX_OK);
1890
1891            // TRIM
1892            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1893            writer
1894                .write_entries(&BlockFifoRequest {
1895                    command: BlockFifoCommand {
1896                        opcode: BlockOpcode::Trim.into_primitive(),
1897                        ..Default::default()
1898                    },
1899                    dev_offset: 7,
1900                    length: 8,
1901                    ..Default::default()
1902                })
1903                .await
1904                .unwrap();
1905
1906            reader.read_entries(&mut response).await.unwrap();
1907            assert_eq!(response.status, zx::sys::ZX_OK);
1908
1909            std::mem::drop(proxy);
1910        };
1911
1912        futures::join!(server, client);
1913    }
1914
1915    #[fuchsia::test]
1916    async fn test_io_errors() {
1917        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1918
1919        futures::join!(
1920            async {
1921                let block_server = BlockServer::new(
1922                    BLOCK_SIZE,
1923                    Arc::new(IoMockInterface {
1924                        return_errors: true,
1925                        do_checks: false,
1926                        expected_op: Arc::new(Mutex::new(None)),
1927                    }),
1928                );
1929                block_server.handle_requests(stream).await.unwrap();
1930            },
1931            async move {
1932                let (session_proxy, server) = fidl::endpoints::create_proxy();
1933
1934                proxy.open_session(server).unwrap();
1935
1936                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1937                let vmo_id = session_proxy
1938                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1939                    .await
1940                    .unwrap()
1941                    .unwrap();
1942
1943                let mut fifo =
1944                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1945                let (mut reader, mut writer) = fifo.async_io();
1946
1947                // READ
1948                writer
1949                    .write_entries(&BlockFifoRequest {
1950                        command: BlockFifoCommand {
1951                            opcode: BlockOpcode::Read.into_primitive(),
1952                            ..Default::default()
1953                        },
1954                        vmoid: vmo_id.id,
1955                        length: 1,
1956                        reqid: 1,
1957                        ..Default::default()
1958                    })
1959                    .await
1960                    .unwrap();
1961
1962                let mut response = BlockFifoResponse::default();
1963                reader.read_entries(&mut response).await.unwrap();
1964                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1965
1966                // WRITE
1967                writer
1968                    .write_entries(&BlockFifoRequest {
1969                        command: BlockFifoCommand {
1970                            opcode: BlockOpcode::Write.into_primitive(),
1971                            ..Default::default()
1972                        },
1973                        vmoid: vmo_id.id,
1974                        length: 1,
1975                        reqid: 2,
1976                        ..Default::default()
1977                    })
1978                    .await
1979                    .unwrap();
1980
1981                reader.read_entries(&mut response).await.unwrap();
1982                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1983
1984                // FLUSH
1985                writer
1986                    .write_entries(&BlockFifoRequest {
1987                        command: BlockFifoCommand {
1988                            opcode: BlockOpcode::Flush.into_primitive(),
1989                            ..Default::default()
1990                        },
1991                        reqid: 3,
1992                        ..Default::default()
1993                    })
1994                    .await
1995                    .unwrap();
1996
1997                reader.read_entries(&mut response).await.unwrap();
1998                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1999
2000                // TRIM
2001                writer
2002                    .write_entries(&BlockFifoRequest {
2003                        command: BlockFifoCommand {
2004                            opcode: BlockOpcode::Trim.into_primitive(),
2005                            ..Default::default()
2006                        },
2007                        reqid: 4,
2008                        length: 1,
2009                        ..Default::default()
2010                    })
2011                    .await
2012                    .unwrap();
2013
2014                reader.read_entries(&mut response).await.unwrap();
2015                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2016
2017                std::mem::drop(proxy);
2018            }
2019        );
2020    }
2021
2022    #[fuchsia::test]
2023    async fn test_invalid_args() {
2024        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2025
2026        futures::join!(
2027            async {
2028                let block_server = BlockServer::new(
2029                    BLOCK_SIZE,
2030                    Arc::new(IoMockInterface {
2031                        return_errors: false,
2032                        do_checks: false,
2033                        expected_op: Arc::new(Mutex::new(None)),
2034                    }),
2035                );
2036                block_server.handle_requests(stream).await.unwrap();
2037            },
2038            async move {
2039                let (session_proxy, server) = fidl::endpoints::create_proxy();
2040
2041                proxy.open_session(server).unwrap();
2042
2043                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2044                let vmo_id = session_proxy
2045                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2046                    .await
2047                    .unwrap()
2048                    .unwrap();
2049
2050                let mut fifo =
2051                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2052
2053                async fn test(
2054                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2055                    request: BlockFifoRequest,
2056                ) -> Result<(), zx::Status> {
2057                    let (mut reader, mut writer) = fifo.async_io();
2058                    writer.write_entries(&request).await.unwrap();
2059                    let mut response = BlockFifoResponse::default();
2060                    reader.read_entries(&mut response).await.unwrap();
2061                    zx::Status::ok(response.status)
2062                }
2063
2064                // READ
2065
2066                let good_read_request = || BlockFifoRequest {
2067                    command: BlockFifoCommand {
2068                        opcode: BlockOpcode::Read.into_primitive(),
2069                        ..Default::default()
2070                    },
2071                    length: 1,
2072                    vmoid: vmo_id.id,
2073                    ..Default::default()
2074                };
2075
2076                assert_eq!(
2077                    test(
2078                        &mut fifo,
2079                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2080                    )
2081                    .await,
2082                    Err(zx::Status::IO)
2083                );
2084
2085                assert_eq!(
2086                    test(
2087                        &mut fifo,
2088                        BlockFifoRequest {
2089                            vmo_offset: 0xffff_ffff_ffff_ffff,
2090                            ..good_read_request()
2091                        }
2092                    )
2093                    .await,
2094                    Err(zx::Status::OUT_OF_RANGE)
2095                );
2096
2097                assert_eq!(
2098                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2099                    Err(zx::Status::INVALID_ARGS)
2100                );
2101
2102                // WRITE
2103
2104                let good_write_request = || BlockFifoRequest {
2105                    command: BlockFifoCommand {
2106                        opcode: BlockOpcode::Write.into_primitive(),
2107                        ..Default::default()
2108                    },
2109                    length: 1,
2110                    vmoid: vmo_id.id,
2111                    ..Default::default()
2112                };
2113
2114                assert_eq!(
2115                    test(
2116                        &mut fifo,
2117                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2118                    )
2119                    .await,
2120                    Err(zx::Status::IO)
2121                );
2122
2123                assert_eq!(
2124                    test(
2125                        &mut fifo,
2126                        BlockFifoRequest {
2127                            vmo_offset: 0xffff_ffff_ffff_ffff,
2128                            ..good_write_request()
2129                        }
2130                    )
2131                    .await,
2132                    Err(zx::Status::OUT_OF_RANGE)
2133                );
2134
2135                assert_eq!(
2136                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2137                    Err(zx::Status::INVALID_ARGS)
2138                );
2139
2140                // CLOSE VMO
2141
2142                assert_eq!(
2143                    test(
2144                        &mut fifo,
2145                        BlockFifoRequest {
2146                            command: BlockFifoCommand {
2147                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2148                                ..Default::default()
2149                            },
2150                            vmoid: vmo_id.id + 1,
2151                            ..Default::default()
2152                        }
2153                    )
2154                    .await,
2155                    Err(zx::Status::IO)
2156                );
2157
2158                std::mem::drop(proxy);
2159            }
2160        );
2161    }
2162
2163    #[fuchsia::test]
2164    async fn test_concurrent_requests() {
2165        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2166
2167        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2168        let waiting_readers_clone = waiting_readers.clone();
2169
2170        futures::join!(
2171            async move {
2172                let block_server = BlockServer::new(
2173                    BLOCK_SIZE,
2174                    Arc::new(MockInterface {
2175                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2176                            let (tx, rx) = oneshot::channel();
2177                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2178                            Box::pin(async move {
2179                                let _ = rx.await;
2180                                Ok(())
2181                            })
2182                        })),
2183                        ..MockInterface::default()
2184                    }),
2185                );
2186                block_server.handle_requests(stream).await.unwrap();
2187            },
2188            async move {
2189                let (session_proxy, server) = fidl::endpoints::create_proxy();
2190
2191                proxy.open_session(server).unwrap();
2192
2193                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2194                let vmo_id = session_proxy
2195                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2196                    .await
2197                    .unwrap()
2198                    .unwrap();
2199
2200                let mut fifo =
2201                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2202                let (mut reader, mut writer) = fifo.async_io();
2203
2204                writer
2205                    .write_entries(&BlockFifoRequest {
2206                        command: BlockFifoCommand {
2207                            opcode: BlockOpcode::Read.into_primitive(),
2208                            ..Default::default()
2209                        },
2210                        reqid: 1,
2211                        dev_offset: 1, // Intentionally use the same as `reqid`.
2212                        vmoid: vmo_id.id,
2213                        length: 1,
2214                        ..Default::default()
2215                    })
2216                    .await
2217                    .unwrap();
2218
2219                writer
2220                    .write_entries(&BlockFifoRequest {
2221                        command: BlockFifoCommand {
2222                            opcode: BlockOpcode::Read.into_primitive(),
2223                            ..Default::default()
2224                        },
2225                        reqid: 2,
2226                        dev_offset: 2,
2227                        vmoid: vmo_id.id,
2228                        length: 1,
2229                        ..Default::default()
2230                    })
2231                    .await
2232                    .unwrap();
2233
2234                // Wait till both those entries are pending.
2235                poll_fn(|cx: &mut Context<'_>| {
2236                    if waiting_readers.lock().len() == 2 {
2237                        Poll::Ready(())
2238                    } else {
2239                        // Yield to the executor.
2240                        cx.waker().wake_by_ref();
2241                        Poll::Pending
2242                    }
2243                })
2244                .await;
2245
2246                let mut response = BlockFifoResponse::default();
2247                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2248
2249                let (id, tx) = waiting_readers.lock().pop().unwrap();
2250                tx.send(()).unwrap();
2251
2252                reader.read_entries(&mut response).await.unwrap();
2253                assert_eq!(response.status, zx::sys::ZX_OK);
2254                assert_eq!(response.reqid, id);
2255
2256                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2257
2258                let (id, tx) = waiting_readers.lock().pop().unwrap();
2259                tx.send(()).unwrap();
2260
2261                reader.read_entries(&mut response).await.unwrap();
2262                assert_eq!(response.status, zx::sys::ZX_OK);
2263                assert_eq!(response.reqid, id);
2264            }
2265        );
2266    }
2267
2268    #[fuchsia::test]
2269    async fn test_groups() {
2270        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2271
2272        futures::join!(
2273            async move {
2274                let block_server = BlockServer::new(
2275                    BLOCK_SIZE,
2276                    Arc::new(MockInterface {
2277                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2278                        ..MockInterface::default()
2279                    }),
2280                );
2281                block_server.handle_requests(stream).await.unwrap();
2282            },
2283            async move {
2284                let (session_proxy, server) = fidl::endpoints::create_proxy();
2285
2286                proxy.open_session(server).unwrap();
2287
2288                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2289                let vmo_id = session_proxy
2290                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2291                    .await
2292                    .unwrap()
2293                    .unwrap();
2294
2295                let mut fifo =
2296                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2297                let (mut reader, mut writer) = fifo.async_io();
2298
2299                writer
2300                    .write_entries(&BlockFifoRequest {
2301                        command: BlockFifoCommand {
2302                            opcode: BlockOpcode::Read.into_primitive(),
2303                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2304                            ..Default::default()
2305                        },
2306                        group: 1,
2307                        vmoid: vmo_id.id,
2308                        length: 1,
2309                        ..Default::default()
2310                    })
2311                    .await
2312                    .unwrap();
2313
2314                writer
2315                    .write_entries(&BlockFifoRequest {
2316                        command: BlockFifoCommand {
2317                            opcode: BlockOpcode::Read.into_primitive(),
2318                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2319                            ..Default::default()
2320                        },
2321                        reqid: 2,
2322                        group: 1,
2323                        vmoid: vmo_id.id,
2324                        length: 1,
2325                        ..Default::default()
2326                    })
2327                    .await
2328                    .unwrap();
2329
2330                let mut response = BlockFifoResponse::default();
2331                reader.read_entries(&mut response).await.unwrap();
2332                assert_eq!(response.status, zx::sys::ZX_OK);
2333                assert_eq!(response.reqid, 2);
2334                assert_eq!(response.group, 1);
2335            }
2336        );
2337    }
2338
2339    #[fuchsia::test]
2340    async fn test_group_error() {
2341        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2342
2343        let counter = Arc::new(AtomicU64::new(0));
2344        let counter_clone = counter.clone();
2345
2346        futures::join!(
2347            async move {
2348                let block_server = BlockServer::new(
2349                    BLOCK_SIZE,
2350                    Arc::new(MockInterface {
2351                        read_hook: Some(Box::new(move |_, _, _, _| {
2352                            counter_clone.fetch_add(1, Ordering::Relaxed);
2353                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2354                        })),
2355                        ..MockInterface::default()
2356                    }),
2357                );
2358                block_server.handle_requests(stream).await.unwrap();
2359            },
2360            async move {
2361                let (session_proxy, server) = fidl::endpoints::create_proxy();
2362
2363                proxy.open_session(server).unwrap();
2364
2365                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2366                let vmo_id = session_proxy
2367                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2368                    .await
2369                    .unwrap()
2370                    .unwrap();
2371
2372                let mut fifo =
2373                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2374                let (mut reader, mut writer) = fifo.async_io();
2375
2376                writer
2377                    .write_entries(&BlockFifoRequest {
2378                        command: BlockFifoCommand {
2379                            opcode: BlockOpcode::Read.into_primitive(),
2380                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2381                            ..Default::default()
2382                        },
2383                        group: 1,
2384                        vmoid: vmo_id.id,
2385                        length: 1,
2386                        ..Default::default()
2387                    })
2388                    .await
2389                    .unwrap();
2390
2391                // Wait until processed.
2392                poll_fn(|cx: &mut Context<'_>| {
2393                    if counter.load(Ordering::Relaxed) == 1 {
2394                        Poll::Ready(())
2395                    } else {
2396                        // Yield to the executor.
2397                        cx.waker().wake_by_ref();
2398                        Poll::Pending
2399                    }
2400                })
2401                .await;
2402
2403                let mut response = BlockFifoResponse::default();
2404                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2405
2406                writer
2407                    .write_entries(&BlockFifoRequest {
2408                        command: BlockFifoCommand {
2409                            opcode: BlockOpcode::Read.into_primitive(),
2410                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2411                            ..Default::default()
2412                        },
2413                        group: 1,
2414                        vmoid: vmo_id.id,
2415                        length: 1,
2416                        ..Default::default()
2417                    })
2418                    .await
2419                    .unwrap();
2420
2421                writer
2422                    .write_entries(&BlockFifoRequest {
2423                        command: BlockFifoCommand {
2424                            opcode: BlockOpcode::Read.into_primitive(),
2425                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2426                            ..Default::default()
2427                        },
2428                        reqid: 2,
2429                        group: 1,
2430                        vmoid: vmo_id.id,
2431                        length: 1,
2432                        ..Default::default()
2433                    })
2434                    .await
2435                    .unwrap();
2436
2437                reader.read_entries(&mut response).await.unwrap();
2438                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2439                assert_eq!(response.reqid, 2);
2440                assert_eq!(response.group, 1);
2441
2442                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2443
2444                // Only the first request should have been processed.
2445                assert_eq!(counter.load(Ordering::Relaxed), 1);
2446            }
2447        );
2448    }
2449
2450    #[fuchsia::test]
2451    async fn test_group_with_two_lasts() {
2452        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2453
2454        let (tx, rx) = oneshot::channel();
2455
2456        futures::join!(
2457            async move {
2458                let rx = Mutex::new(Some(rx));
2459                let block_server = BlockServer::new(
2460                    BLOCK_SIZE,
2461                    Arc::new(MockInterface {
2462                        read_hook: Some(Box::new(move |_, _, _, _| {
2463                            let rx = rx.lock().take().unwrap();
2464                            Box::pin(async {
2465                                let _ = rx.await;
2466                                Ok(())
2467                            })
2468                        })),
2469                        ..MockInterface::default()
2470                    }),
2471                );
2472                block_server.handle_requests(stream).await.unwrap();
2473            },
2474            async move {
2475                let (session_proxy, server) = fidl::endpoints::create_proxy();
2476
2477                proxy.open_session(server).unwrap();
2478
2479                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2480                let vmo_id = session_proxy
2481                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2482                    .await
2483                    .unwrap()
2484                    .unwrap();
2485
2486                let mut fifo =
2487                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2488                let (mut reader, mut writer) = fifo.async_io();
2489
2490                writer
2491                    .write_entries(&BlockFifoRequest {
2492                        command: BlockFifoCommand {
2493                            opcode: BlockOpcode::Read.into_primitive(),
2494                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2495                            ..Default::default()
2496                        },
2497                        reqid: 1,
2498                        group: 1,
2499                        vmoid: vmo_id.id,
2500                        length: 1,
2501                        ..Default::default()
2502                    })
2503                    .await
2504                    .unwrap();
2505
2506                writer
2507                    .write_entries(&BlockFifoRequest {
2508                        command: BlockFifoCommand {
2509                            opcode: BlockOpcode::Read.into_primitive(),
2510                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2511                            ..Default::default()
2512                        },
2513                        reqid: 2,
2514                        group: 1,
2515                        vmoid: vmo_id.id,
2516                        length: 1,
2517                        ..Default::default()
2518                    })
2519                    .await
2520                    .unwrap();
2521
2522                // Send an independent request to flush through the fifo.
2523                writer
2524                    .write_entries(&BlockFifoRequest {
2525                        command: BlockFifoCommand {
2526                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2527                            ..Default::default()
2528                        },
2529                        reqid: 3,
2530                        vmoid: vmo_id.id,
2531                        ..Default::default()
2532                    })
2533                    .await
2534                    .unwrap();
2535
2536                // It should succeed.
2537                let mut response = BlockFifoResponse::default();
2538                reader.read_entries(&mut response).await.unwrap();
2539                assert_eq!(response.status, zx::sys::ZX_OK);
2540                assert_eq!(response.reqid, 3);
2541
2542                // Now release the original request.
2543                tx.send(()).unwrap();
2544
2545                // The response should be for the first message tagged as last, and it should be
2546                // an error because we sent two messages with the LAST marker.
2547                let mut response = BlockFifoResponse::default();
2548                reader.read_entries(&mut response).await.unwrap();
2549                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2550                assert_eq!(response.reqid, 1);
2551                assert_eq!(response.group, 1);
2552            }
2553        );
2554    }
2555
2556    #[fuchsia::test(allow_stalls = false)]
2557    async fn test_requests_dont_block_sessions() {
2558        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2559
2560        let (tx, rx) = oneshot::channel();
2561
2562        fasync::Task::local(async move {
2563            let rx = Mutex::new(Some(rx));
2564            let block_server = BlockServer::new(
2565                BLOCK_SIZE,
2566                Arc::new(MockInterface {
2567                    read_hook: Some(Box::new(move |_, _, _, _| {
2568                        let rx = rx.lock().take().unwrap();
2569                        Box::pin(async {
2570                            let _ = rx.await;
2571                            Ok(())
2572                        })
2573                    })),
2574                    ..MockInterface::default()
2575                }),
2576            );
2577            block_server.handle_requests(stream).await.unwrap();
2578        })
2579        .detach();
2580
2581        let mut fut = pin!(async {
2582            let (session_proxy, server) = fidl::endpoints::create_proxy();
2583
2584            proxy.open_session(server).unwrap();
2585
2586            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2587            let vmo_id = session_proxy
2588                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2589                .await
2590                .unwrap()
2591                .unwrap();
2592
2593            let mut fifo =
2594                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2595            let (mut reader, mut writer) = fifo.async_io();
2596
2597            writer
2598                .write_entries(&BlockFifoRequest {
2599                    command: BlockFifoCommand {
2600                        opcode: BlockOpcode::Read.into_primitive(),
2601                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2602                        ..Default::default()
2603                    },
2604                    reqid: 1,
2605                    group: 1,
2606                    vmoid: vmo_id.id,
2607                    length: 1,
2608                    ..Default::default()
2609                })
2610                .await
2611                .unwrap();
2612
2613            let mut response = BlockFifoResponse::default();
2614            reader.read_entries(&mut response).await.unwrap();
2615            assert_eq!(response.status, zx::sys::ZX_OK);
2616        });
2617
2618        // The response won't come back until we send on `tx`.
2619        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2620
2621        let mut fut2 = pin!(proxy.get_volume_info());
2622
2623        // get_volume_info is set up to stall forever.
2624        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2625
2626        // If we now free up the first future, it should resolve; the stalled call to
2627        // get_volume_info should not block the fifo response.
2628        let _ = tx.send(());
2629
2630        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2631    }
2632
2633    #[fuchsia::test]
2634    async fn test_request_flow_control() {
2635        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2636
2637        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2638        // server will block until that happens.
2639        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2640        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2641        let event_clone = event.clone();
2642        futures::join!(
2643            async move {
2644                let block_server = BlockServer::new(
2645                    BLOCK_SIZE,
2646                    Arc::new(MockInterface {
2647                        read_hook: Some(Box::new(move |_, _, _, _| {
2648                            let event_clone = event_clone.clone();
2649                            Box::pin(async move {
2650                                if !event_clone.1.load(Ordering::SeqCst) {
2651                                    event_clone.0.listen().await;
2652                                }
2653                                Ok(())
2654                            })
2655                        })),
2656                        ..MockInterface::default()
2657                    }),
2658                );
2659                block_server.handle_requests(stream).await.unwrap();
2660            },
2661            async move {
2662                let (session_proxy, server) = fidl::endpoints::create_proxy();
2663
2664                proxy.open_session(server).unwrap();
2665
2666                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2667                let vmo_id = session_proxy
2668                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2669                    .await
2670                    .unwrap()
2671                    .unwrap();
2672
2673                let mut fifo =
2674                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2675                let (mut reader, mut writer) = fifo.async_io();
2676
2677                for i in 0..MAX_REQUESTS {
2678                    writer
2679                        .write_entries(&BlockFifoRequest {
2680                            command: BlockFifoCommand {
2681                                opcode: BlockOpcode::Read.into_primitive(),
2682                                ..Default::default()
2683                            },
2684                            reqid: (i + 1) as u32,
2685                            dev_offset: i,
2686                            vmoid: vmo_id.id,
2687                            length: 1,
2688                            ..Default::default()
2689                        })
2690                        .await
2691                        .unwrap();
2692                }
2693                assert!(
2694                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2695                        command: BlockFifoCommand {
2696                            opcode: BlockOpcode::Read.into_primitive(),
2697                            ..Default::default()
2698                        },
2699                        reqid: u32::MAX,
2700                        dev_offset: MAX_REQUESTS,
2701                        vmoid: vmo_id.id,
2702                        length: 1,
2703                        ..Default::default()
2704                    })))
2705                    .is_pending()
2706                );
2707                // OK, let the server start to process.
2708                event.1.store(true, Ordering::SeqCst);
2709                event.0.notify(usize::MAX);
2710                // For each entry we read, make sure we can write a new one in.
2711                let mut finished_reqids = vec![];
2712                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2713                    let mut response = BlockFifoResponse::default();
2714                    reader.read_entries(&mut response).await.unwrap();
2715                    assert_eq!(response.status, zx::sys::ZX_OK);
2716                    finished_reqids.push(response.reqid);
2717                    writer
2718                        .write_entries(&BlockFifoRequest {
2719                            command: BlockFifoCommand {
2720                                opcode: BlockOpcode::Read.into_primitive(),
2721                                ..Default::default()
2722                            },
2723                            reqid: (i + 1) as u32,
2724                            dev_offset: i,
2725                            vmoid: vmo_id.id,
2726                            length: 1,
2727                            ..Default::default()
2728                        })
2729                        .await
2730                        .unwrap();
2731                }
2732                let mut response = BlockFifoResponse::default();
2733                for _ in 0..MAX_REQUESTS {
2734                    reader.read_entries(&mut response).await.unwrap();
2735                    assert_eq!(response.status, zx::sys::ZX_OK);
2736                    finished_reqids.push(response.reqid);
2737                }
2738                // Verify that we got a response for each request.  Note that we can't assume FIFO
2739                // ordering.
2740                finished_reqids.sort();
2741                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2742                let mut i = 1;
2743                for reqid in finished_reqids {
2744                    assert_eq!(reqid, i);
2745                    i += 1;
2746                }
2747            }
2748        );
2749    }
2750
2751    #[fuchsia::test]
2752    async fn test_passthrough_io_with_fixed_map() {
2753        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2754
2755        let expected_op = Arc::new(Mutex::new(None));
2756        let expected_op_clone = expected_op.clone();
2757        futures::join!(
2758            async {
2759                let block_server = BlockServer::new(
2760                    BLOCK_SIZE,
2761                    Arc::new(IoMockInterface {
2762                        return_errors: false,
2763                        do_checks: true,
2764                        expected_op: expected_op_clone,
2765                    }),
2766                );
2767                block_server.handle_requests(stream).await.unwrap();
2768            },
2769            async move {
2770                let (session_proxy, server) = fidl::endpoints::create_proxy();
2771
2772                let mapping = fblock::BlockOffsetMapping {
2773                    source_block_offset: 0,
2774                    target_block_offset: 10,
2775                    length: 20,
2776                };
2777                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2778
2779                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2780                let vmo_id = session_proxy
2781                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2782                    .await
2783                    .unwrap()
2784                    .unwrap();
2785
2786                let mut fifo =
2787                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2788                let (mut reader, mut writer) = fifo.async_io();
2789
2790                // READ
2791                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2792                writer
2793                    .write_entries(&BlockFifoRequest {
2794                        command: BlockFifoCommand {
2795                            opcode: BlockOpcode::Read.into_primitive(),
2796                            ..Default::default()
2797                        },
2798                        vmoid: vmo_id.id,
2799                        dev_offset: 1,
2800                        length: 2,
2801                        vmo_offset: 3,
2802                        ..Default::default()
2803                    })
2804                    .await
2805                    .unwrap();
2806
2807                let mut response = BlockFifoResponse::default();
2808                reader.read_entries(&mut response).await.unwrap();
2809                assert_eq!(response.status, zx::sys::ZX_OK);
2810
2811                // WRITE
2812                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2813                writer
2814                    .write_entries(&BlockFifoRequest {
2815                        command: BlockFifoCommand {
2816                            opcode: BlockOpcode::Write.into_primitive(),
2817                            ..Default::default()
2818                        },
2819                        vmoid: vmo_id.id,
2820                        dev_offset: 4,
2821                        length: 5,
2822                        vmo_offset: 6,
2823                        ..Default::default()
2824                    })
2825                    .await
2826                    .unwrap();
2827
2828                reader.read_entries(&mut response).await.unwrap();
2829                assert_eq!(response.status, zx::sys::ZX_OK);
2830
2831                // FLUSH
2832                *expected_op.lock() = Some(ExpectedOp::Flush);
2833                writer
2834                    .write_entries(&BlockFifoRequest {
2835                        command: BlockFifoCommand {
2836                            opcode: BlockOpcode::Flush.into_primitive(),
2837                            ..Default::default()
2838                        },
2839                        ..Default::default()
2840                    })
2841                    .await
2842                    .unwrap();
2843
2844                reader.read_entries(&mut response).await.unwrap();
2845                assert_eq!(response.status, zx::sys::ZX_OK);
2846
2847                // TRIM
2848                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2849                writer
2850                    .write_entries(&BlockFifoRequest {
2851                        command: BlockFifoCommand {
2852                            opcode: BlockOpcode::Trim.into_primitive(),
2853                            ..Default::default()
2854                        },
2855                        dev_offset: 7,
2856                        length: 3,
2857                        ..Default::default()
2858                    })
2859                    .await
2860                    .unwrap();
2861
2862                reader.read_entries(&mut response).await.unwrap();
2863                assert_eq!(response.status, zx::sys::ZX_OK);
2864
2865                // READ past window
2866                *expected_op.lock() = None;
2867                writer
2868                    .write_entries(&BlockFifoRequest {
2869                        command: BlockFifoCommand {
2870                            opcode: BlockOpcode::Read.into_primitive(),
2871                            ..Default::default()
2872                        },
2873                        vmoid: vmo_id.id,
2874                        dev_offset: 19,
2875                        length: 2,
2876                        vmo_offset: 3,
2877                        ..Default::default()
2878                    })
2879                    .await
2880                    .unwrap();
2881
2882                reader.read_entries(&mut response).await.unwrap();
2883                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2884
2885                std::mem::drop(proxy);
2886            }
2887        );
2888    }
2889
2890    #[fuchsia::test]
2891    fn operation_map() {
2892        fn expect_map_result(
2893            mut operation: Operation,
2894            mapping: Option<BlockOffsetMapping>,
2895            max_blocks: Option<NonZero<u32>>,
2896            expected_operations: Vec<Operation>,
2897        ) {
2898            let mut ops = vec![];
2899            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2900                ops.push(operation);
2901                operation = remainder;
2902            }
2903            ops.push(operation);
2904            assert_eq!(ops, expected_operations);
2905        }
2906
2907        // No limits
2908        expect_map_result(
2909            Operation::Read {
2910                device_block_offset: 10,
2911                block_count: 200,
2912                _unused: 0,
2913                vmo_offset: 0,
2914                options: ReadOptions::default(),
2915            },
2916            None,
2917            None,
2918            vec![Operation::Read {
2919                device_block_offset: 10,
2920                block_count: 200,
2921                _unused: 0,
2922                vmo_offset: 0,
2923                options: ReadOptions::default(),
2924            }],
2925        );
2926
2927        // Max block count
2928        expect_map_result(
2929            Operation::Read {
2930                device_block_offset: 10,
2931                block_count: 200,
2932                _unused: 0,
2933                vmo_offset: 0,
2934                options: ReadOptions::default(),
2935            },
2936            None,
2937            NonZero::new(120),
2938            vec![
2939                Operation::Read {
2940                    device_block_offset: 10,
2941                    block_count: 120,
2942                    _unused: 0,
2943                    vmo_offset: 0,
2944                    options: ReadOptions::default(),
2945                },
2946                Operation::Read {
2947                    device_block_offset: 130,
2948                    block_count: 80,
2949                    _unused: 0,
2950                    vmo_offset: 120 * 512,
2951                    options: ReadOptions::default(),
2952                },
2953            ],
2954        );
2955        expect_map_result(
2956            Operation::Trim { device_block_offset: 10, block_count: 200 },
2957            None,
2958            NonZero::new(120),
2959            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2960        );
2961
2962        // Remapping + Max block count
2963        expect_map_result(
2964            Operation::Read {
2965                device_block_offset: 10,
2966                block_count: 200,
2967                _unused: 0,
2968                vmo_offset: 0,
2969                options: ReadOptions::default(),
2970            },
2971            Some(BlockOffsetMapping {
2972                source_block_offset: 10,
2973                target_block_offset: 100,
2974                length: 200,
2975            }),
2976            NonZero::new(120),
2977            vec![
2978                Operation::Read {
2979                    device_block_offset: 100,
2980                    block_count: 120,
2981                    _unused: 0,
2982                    vmo_offset: 0,
2983                    options: ReadOptions::default(),
2984                },
2985                Operation::Read {
2986                    device_block_offset: 220,
2987                    block_count: 80,
2988                    _unused: 0,
2989                    vmo_offset: 120 * 512,
2990                    options: ReadOptions::default(),
2991                },
2992            ],
2993        );
2994        expect_map_result(
2995            Operation::Trim { device_block_offset: 10, block_count: 200 },
2996            Some(BlockOffsetMapping {
2997                source_block_offset: 10,
2998                target_block_offset: 100,
2999                length: 200,
3000            }),
3001            NonZero::new(120),
3002            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3003        );
3004    }
3005
3006    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3007    #[fuchsia::test]
3008    async fn test_pre_barrier_flush_failure() {
3009        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3010
3011        struct NoBarrierInterface;
3012        impl super::async_interface::Interface for NoBarrierInterface {
3013            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3014                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3015                    device_flags: fblock::Flag::empty(), // No BARRIER_SUPPORT
3016                    max_transfer_blocks: NonZero::new(100),
3017                    block_range: Some(0..100),
3018                    type_guid: [0; 16],
3019                    instance_guid: [0; 16],
3020                    name: "test".to_string(),
3021                    flags: 0,
3022                }))
3023            }
3024            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3025                Ok(())
3026            }
3027            async fn read(
3028                &self,
3029                _: u64,
3030                _: u32,
3031                _: &Arc<zx::Vmo>,
3032                _: u64,
3033                _: ReadOptions,
3034                _: TraceFlowId,
3035            ) -> Result<(), zx::Status> {
3036                unreachable!()
3037            }
3038            async fn write(
3039                &self,
3040                _: u64,
3041                _: u32,
3042                _: &Arc<zx::Vmo>,
3043                _: u64,
3044                _: WriteOptions,
3045                _: TraceFlowId,
3046            ) -> Result<(), zx::Status> {
3047                panic!("Write should not be called");
3048            }
3049            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3050                Err(zx::Status::IO)
3051            }
3052            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3053                unreachable!()
3054            }
3055        }
3056
3057        futures::join!(
3058            async move {
3059                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3060                block_server.handle_requests(stream).await.unwrap();
3061            },
3062            async move {
3063                let (session_proxy, server) = fidl::endpoints::create_proxy();
3064                proxy.open_session(server).unwrap();
3065                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3066                let vmo_id = session_proxy
3067                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3068                    .await
3069                    .unwrap()
3070                    .unwrap();
3071
3072                let mut fifo =
3073                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3074                let (mut reader, mut writer) = fifo.async_io();
3075
3076                writer
3077                    .write_entries(&BlockFifoRequest {
3078                        command: BlockFifoCommand {
3079                            opcode: BlockOpcode::Write.into_primitive(),
3080                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3081                            ..Default::default()
3082                        },
3083                        vmoid: vmo_id.id,
3084                        length: 1,
3085                        ..Default::default()
3086                    })
3087                    .await
3088                    .unwrap();
3089
3090                let mut response = BlockFifoResponse::default();
3091                reader.read_entries(&mut response).await.unwrap();
3092                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3093            }
3094        );
3095    }
3096
3097    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3098    // post-flush is not executed.
3099    #[fuchsia::test]
3100    async fn test_post_barrier_write_failure() {
3101        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3102
3103        struct NoBarrierInterface;
3104        impl super::async_interface::Interface for NoBarrierInterface {
3105            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3106                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3107                    device_flags: fblock::Flag::empty(), // No FUA_SUPPORT
3108                    max_transfer_blocks: NonZero::new(100),
3109                    block_range: Some(0..100),
3110                    type_guid: [0; 16],
3111                    instance_guid: [0; 16],
3112                    name: "test".to_string(),
3113                    flags: 0,
3114                }))
3115            }
3116            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3117                Ok(())
3118            }
3119            async fn read(
3120                &self,
3121                _: u64,
3122                _: u32,
3123                _: &Arc<zx::Vmo>,
3124                _: u64,
3125                _: ReadOptions,
3126                _: TraceFlowId,
3127            ) -> Result<(), zx::Status> {
3128                unreachable!()
3129            }
3130            async fn write(
3131                &self,
3132                _: u64,
3133                _: u32,
3134                _: &Arc<zx::Vmo>,
3135                _: u64,
3136                _: WriteOptions,
3137                _: TraceFlowId,
3138            ) -> Result<(), zx::Status> {
3139                Err(zx::Status::IO)
3140            }
3141            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3142                panic!("Flush should not be called")
3143            }
3144            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3145                unreachable!()
3146            }
3147        }
3148
3149        futures::join!(
3150            async move {
3151                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3152                block_server.handle_requests(stream).await.unwrap();
3153            },
3154            async move {
3155                let (session_proxy, server) = fidl::endpoints::create_proxy();
3156                proxy.open_session(server).unwrap();
3157                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3158                let vmo_id = session_proxy
3159                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3160                    .await
3161                    .unwrap()
3162                    .unwrap();
3163
3164                let mut fifo =
3165                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3166                let (mut reader, mut writer) = fifo.async_io();
3167
3168                writer
3169                    .write_entries(&BlockFifoRequest {
3170                        command: BlockFifoCommand {
3171                            opcode: BlockOpcode::Write.into_primitive(),
3172                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3173                            ..Default::default()
3174                        },
3175                        vmoid: vmo_id.id,
3176                        length: 1,
3177                        ..Default::default()
3178                    })
3179                    .await
3180                    .unwrap();
3181
3182                let mut response = BlockFifoResponse::default();
3183                reader.read_entries(&mut response).await.unwrap();
3184                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3185            }
3186        );
3187    }
3188}