block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37    Block(BlockInfo),
38    Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42    pub fn block_count(&self) -> Option<u64> {
43        match self {
44            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
45            Self::Partition(PartitionInfo { block_range, .. }) => {
46                block_range.as_ref().map(|range| range.end - range.start)
47            }
48        }
49    }
50
51    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
52        match self {
53            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
54            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
55                max_transfer_blocks.clone()
56            }
57        }
58    }
59
60    fn max_transfer_size(&self, block_size: u32) -> u32 {
61        if let Some(max_blocks) = self.max_transfer_blocks() {
62            max_blocks.get() * block_size
63        } else {
64            MAX_TRANSFER_UNBOUNDED
65        }
66    }
67}
68
69/// Information associated with non-partition block devices.
70#[derive(Clone, Default)]
71pub struct BlockInfo {
72    pub device_flags: fblock::Flag,
73    pub block_count: u64,
74    pub max_transfer_blocks: Option<NonZero<u32>>,
75}
76
77/// Information associated with a block device that is also a partition.
78#[derive(Clone, Default)]
79pub struct PartitionInfo {
80    /// The device flags reported by the underlying device.
81    pub device_flags: fblock::Flag,
82    pub max_transfer_blocks: Option<NonZero<u32>>,
83    /// If `block_range` is None, the partition is a volume and may not be contiguous.
84    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
85    /// slices and use that (along with the slice and block sizes) to determine the block count.
86    pub block_range: Option<Range<u64>>,
87    pub type_guid: [u8; 16],
88    pub instance_guid: [u8; 16],
89    pub name: String,
90    pub flags: u64,
91}
92
93/// We internally keep track of active requests, so that when the server is torn down, we can
94/// deallocate all of the resources for pending requests.
95struct ActiveRequest<S> {
96    session: S,
97    group_or_request: GroupOrRequest,
98    trace_flow_id: TraceFlowId,
99    _epoch_guard: EpochGuard<'static>,
100    status: zx::Status,
101    count: u32,
102    req_id: Option<u32>,
103    decompression_info: Option<DecompressionInfo>,
104}
105
106struct DecompressionInfo {
107    // This is the range of compressed bytes in receiving buffer.
108    compressed_range: Range<usize>,
109
110    // This is the range in the target VMO where we will write uncompressed bytes.
111    uncompressed_range: Range<u64>,
112
113    bytes_so_far: u64,
114    mapping: Arc<VmoMapping>,
115    buffer: Option<Buffer<'static>>,
116}
117
118impl DecompressionInfo {
119    /// Returns the uncompressed slice.
120    fn uncompressed_slice(&self) -> *mut [u8] {
121        std::ptr::slice_from_raw_parts_mut(
122            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
123            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
124        )
125    }
126}
127
128pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
129
130impl<S> Default for ActiveRequests<S> {
131    fn default() -> Self {
132        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
133    }
134}
135
136impl<S> ActiveRequests<S> {
137    fn complete_and_take_response(
138        &self,
139        request_id: RequestId,
140        status: zx::Status,
141    ) -> Option<(S, BlockFifoResponse)> {
142        self.0.lock().complete_and_take_response(request_id, status)
143    }
144
145    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
146        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
147    }
148}
149
150struct ActiveRequestsInner<S> {
151    requests: Slab<ActiveRequest<S>>,
152}
153
154// Keeps track of all the requests that are currently being processed
155impl<S> ActiveRequestsInner<S> {
156    /// Completes a request.
157    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
158        let group = &mut self.requests[request_id.0];
159
160        group.count = group.count.checked_sub(1).unwrap();
161        if status != zx::Status::OK && group.status == zx::Status::OK {
162            group.status = status
163        }
164
165        fuchsia_trace::duration!(
166            c"storage",
167            c"block_server::finish_transaction",
168            "request_id" => request_id.0,
169            "group_completed" => group.count == 0,
170            "status" => status.into_raw());
171        if let Some(trace_flow_id) = group.trace_flow_id {
172            fuchsia_trace::flow_step!(
173                c"storage",
174                c"block_server::finish_request",
175                trace_flow_id.get().into()
176            );
177        }
178
179        if group.count == 0
180            && group.status == zx::Status::OK
181            && let Some(info) = &mut group.decompression_info
182        {
183            thread_local! {
184                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
185                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
186            }
187            DECOMPRESSOR.with(|decompressor| {
188                // SAFETY: We verified `uncompressed_range` fits within our mapping.
189                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
190                let mut decompressor = decompressor.borrow_mut();
191                if let Err(error) = decompressor.decompress_to_buffer(
192                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
193                    target_slice,
194                ) {
195                    log::warn!(error:?; "Decompression error");
196                    group.status = zx::Status::IO_DATA_INTEGRITY;
197                };
198            });
199        }
200    }
201
202    /// Takes the response if all requests are finished.
203    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
204        let group = &self.requests[request_id.0];
205        match group.req_id {
206            Some(reqid) if group.count == 0 => {
207                let group = self.requests.remove(request_id.0);
208                Some((
209                    group.session,
210                    BlockFifoResponse {
211                        status: group.status.into_raw(),
212                        reqid,
213                        group: group.group_or_request.group_id().unwrap_or(0),
214                        ..Default::default()
215                    },
216                ))
217            }
218            _ => None,
219        }
220    }
221
222    /// Competes the request and returns a response if the request group is finished.
223    fn complete_and_take_response(
224        &mut self,
225        request_id: RequestId,
226        status: zx::Status,
227    ) -> Option<(S, BlockFifoResponse)> {
228        self.complete(request_id, status);
229        self.take_response(request_id)
230    }
231}
232
233/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
234/// cbindgen:no-export
235pub struct BlockServer<SM> {
236    block_size: u32,
237    session_manager: Arc<SM>,
238}
239
240/// A single entry in `[OffsetMap]`.
241#[derive(Debug)]
242pub struct BlockOffsetMapping {
243    source_block_offset: u64,
244    target_block_offset: u64,
245    length: u64,
246}
247
248impl BlockOffsetMapping {
249    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
250        blocks.0 >= self.source_block_offset
251            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
252    }
253}
254
255impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
256    type Error = zx::Status;
257
258    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
259        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
260        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
261        Ok(Self {
262            source_block_offset: wire.source_block_offset,
263            target_block_offset: wire.target_block_offset,
264            length: wire.length,
265        })
266    }
267}
268
269/// Remaps the offset of block requests based on an internal map, and truncates long requests.
270pub struct OffsetMap {
271    mapping: Option<BlockOffsetMapping>,
272    max_transfer_blocks: Option<NonZero<u32>>,
273}
274
275impl OffsetMap {
276    /// An OffsetMap that remaps requests.
277    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
278        Self { mapping: Some(mapping), max_transfer_blocks }
279    }
280
281    /// An OffsetMap that just enforces maximum request sizes.
282    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
283        Self { mapping: None, max_transfer_blocks }
284    }
285
286    pub fn is_empty(&self) -> bool {
287        self.mapping.is_none()
288    }
289
290    fn mapping(&self) -> Option<&BlockOffsetMapping> {
291        self.mapping.as_ref()
292    }
293
294    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
295        self.max_transfer_blocks
296    }
297}
298
299// Methods take Arc<Self> rather than &self because of
300// https://github.com/rust-lang/rust/issues/42940.
301pub trait SessionManager: 'static {
302    const SUPPORTS_DECOMPRESSION: bool;
303
304    type Session;
305
306    fn on_attach_vmo(
307        self: Arc<Self>,
308        vmo: &Arc<zx::Vmo>,
309    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
310
311    /// Creates a new session to handle `stream`.
312    /// The returned future should run until the session completes, for example when the client end
313    /// closes.
314    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
315    fn open_session(
316        self: Arc<Self>,
317        stream: fblock::SessionRequestStream,
318        offset_map: OffsetMap,
319        block_size: u32,
320    ) -> impl Future<Output = Result<(), Error>> + Send;
321
322    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
323    fn get_info(&self) -> Cow<'_, DeviceInfo>;
324
325    /// Called to handle the GetVolumeInfo FIDL call.
326    fn get_volume_info(
327        &self,
328    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
329    {
330        async { Err(zx::Status::NOT_SUPPORTED) }
331    }
332
333    /// Called to handle the QuerySlices FIDL call.
334    fn query_slices(
335        &self,
336        _start_slices: &[u64],
337    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
338        async { Err(zx::Status::NOT_SUPPORTED) }
339    }
340
341    /// Called to handle the Shrink FIDL call.
342    fn extend(
343        &self,
344        _start_slice: u64,
345        _slice_count: u64,
346    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
347        async { Err(zx::Status::NOT_SUPPORTED) }
348    }
349
350    /// Called to handle the Shrink FIDL call.
351    fn shrink(
352        &self,
353        _start_slice: u64,
354        _slice_count: u64,
355    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
356        async { Err(zx::Status::NOT_SUPPORTED) }
357    }
358
359    /// Returns the active requests.
360    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
361}
362
363pub trait IntoSessionManager {
364    type SM: SessionManager;
365
366    fn into_session_manager(self) -> Arc<Self::SM>;
367}
368
369impl<SM: SessionManager> BlockServer<SM> {
370    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
371        Self { block_size, session_manager: session_manager.into_session_manager() }
372    }
373
374    pub fn session_manager(&self) -> &SM {
375        self.session_manager.as_ref()
376    }
377
378    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
379    pub async fn handle_requests(
380        &self,
381        mut requests: fvolume::VolumeRequestStream,
382    ) -> Result<(), Error> {
383        let scope = fasync::Scope::new();
384        loop {
385            match requests.try_next().await {
386                Ok(Some(request)) => {
387                    if let Some(session) = self.handle_request(request).await? {
388                        scope.spawn(session.map(|_| ()));
389                    }
390                }
391                Ok(None) => break,
392                Err(err) => log::warn!(err:?; "Invalid request"),
393            }
394        }
395        scope.await;
396        Ok(())
397    }
398
399    /// Processes a partition request.  If a new session task is created in response to the request,
400    /// it is returned.
401    async fn handle_request(
402        &self,
403        request: fvolume::VolumeRequest,
404    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
405        match request {
406            fvolume::VolumeRequest::GetInfo { responder } => {
407                let info = self.device_info();
408                let max_transfer_size = info.max_transfer_size(self.block_size);
409                let (block_count, mut flags) = match info.as_ref() {
410                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
411                        (*block_count, *device_flags)
412                    }
413                    DeviceInfo::Partition(partition_info) => {
414                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
415                            range.end - range.start
416                        } else {
417                            let volume_info = self.session_manager.get_volume_info().await?;
418                            volume_info.0.slice_size * volume_info.1.partition_slice_count
419                                / self.block_size as u64
420                        };
421                        (block_count, partition_info.device_flags)
422                    }
423                };
424                if SM::SUPPORTS_DECOMPRESSION {
425                    flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
426                }
427                responder.send(Ok(&fblock::BlockInfo {
428                    block_count,
429                    block_size: self.block_size,
430                    max_transfer_size,
431                    flags,
432                }))?;
433            }
434            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
435                let info = self.device_info();
436                return Ok(Some(self.session_manager.clone().open_session(
437                    session.into_stream(),
438                    OffsetMap::empty(info.max_transfer_blocks()),
439                    self.block_size,
440                )));
441            }
442            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
443                session,
444                mapping,
445                control_handle: _,
446            } => {
447                let info = self.device_info();
448                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
449                    Ok(m) => m,
450                    Err(status) => {
451                        session.close_with_epitaph(status)?;
452                        return Ok(None);
453                    }
454                };
455                if let Some(max) = info.block_count() {
456                    if initial_mapping.target_block_offset + initial_mapping.length > max {
457                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
458                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
459                        return Ok(None);
460                    }
461                }
462                return Ok(Some(self.session_manager.clone().open_session(
463                    session.into_stream(),
464                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
465                    self.block_size,
466                )));
467            }
468            fvolume::VolumeRequest::GetTypeGuid { responder } => {
469                let info = self.device_info();
470                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
471                    let mut guid =
472                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
473                    guid.value.copy_from_slice(&partition_info.type_guid);
474                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
475                } else {
476                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
477                }
478            }
479            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
480                let info = self.device_info();
481                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
482                    let mut guid =
483                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
484                    guid.value.copy_from_slice(&partition_info.instance_guid);
485                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
486                } else {
487                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
488                }
489            }
490            fvolume::VolumeRequest::GetName { responder } => {
491                let info = self.device_info();
492                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
493                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
494                } else {
495                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
496                }
497            }
498            fvolume::VolumeRequest::GetMetadata { responder } => {
499                let info = self.device_info();
500                if let DeviceInfo::Partition(info) = info.as_ref() {
501                    let mut type_guid =
502                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
503                    type_guid.value.copy_from_slice(&info.type_guid);
504                    let mut instance_guid =
505                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
506                    instance_guid.value.copy_from_slice(&info.instance_guid);
507                    responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
508                        name: Some(info.name.clone()),
509                        type_guid: Some(type_guid),
510                        instance_guid: Some(instance_guid),
511                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
512                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
513                        flags: Some(info.flags),
514                        ..Default::default()
515                    }))?;
516                } else {
517                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
518                }
519            }
520            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
521                match self.session_manager.query_slices(&start_slices).await {
522                    Ok(mut results) => {
523                        let results_len = results.len();
524                        assert!(results_len <= 16);
525                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
526                        responder.send(
527                            zx::sys::ZX_OK,
528                            &results.try_into().unwrap(),
529                            results_len as u64,
530                        )?;
531                    }
532                    Err(s) => {
533                        responder.send(
534                            s.into_raw(),
535                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
536                            0,
537                        )?;
538                    }
539                }
540            }
541            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
542                match self.session_manager.get_volume_info().await {
543                    Ok((manager_info, volume_info)) => {
544                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
545                    }
546                    Err(s) => responder.send(s.into_raw(), None, None)?,
547                }
548            }
549            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
550                responder.send(
551                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
552                        .into_raw(),
553                )?;
554            }
555            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
556                responder.send(
557                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
558                        .into_raw(),
559                )?;
560            }
561            fvolume::VolumeRequest::Destroy { responder, .. } => {
562                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
563            }
564        }
565        Ok(None)
566    }
567
568    fn device_info(&self) -> Cow<'_, DeviceInfo> {
569        self.session_manager.get_info()
570    }
571}
572
573struct SessionHelper<SM: SessionManager> {
574    session_manager: Arc<SM>,
575    offset_map: OffsetMap,
576    block_size: u32,
577    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
578    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
579}
580
581struct VmoMapping {
582    base: usize,
583    size: usize,
584}
585
586impl VmoMapping {
587    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
588        let size = vmo.get_size().unwrap() as usize;
589        Ok(Arc::new(Self {
590            base: fuchsia_runtime::vmar_root_self()
591                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
592                .inspect_err(|error| {
593                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
594                })?,
595            size,
596        }))
597    }
598}
599
600impl Drop for VmoMapping {
601    fn drop(&mut self) {
602        // SAFETY: We mapped this in `VmoMapping::new`.
603        unsafe {
604            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
605        }
606    }
607}
608
609impl<SM: SessionManager> SessionHelper<SM> {
610    fn new(
611        session_manager: Arc<SM>,
612        offset_map: OffsetMap,
613        block_size: u32,
614    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
615        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
616        Ok((
617            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
618            fifo,
619        ))
620    }
621
622    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
623        match request {
624            fblock::SessionRequest::GetFifo { responder } => {
625                let rights = zx::Rights::TRANSFER
626                    | zx::Rights::READ
627                    | zx::Rights::WRITE
628                    | zx::Rights::SIGNAL
629                    | zx::Rights::WAIT;
630                match self.peer_fifo.duplicate_handle(rights) {
631                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
632                    Err(s) => responder.send(Err(s.into_raw()))?,
633                }
634                Ok(())
635            }
636            fblock::SessionRequest::AttachVmo { vmo, responder } => {
637                let vmo = Arc::new(vmo);
638                let vmo_id = {
639                    let mut vmos = self.vmos.lock();
640                    if vmos.len() == u16::MAX as usize {
641                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
642                        return Ok(());
643                    } else {
644                        let vmo_id = match vmos.last_entry() {
645                            None => 1,
646                            Some(o) => {
647                                o.key().checked_add(1).unwrap_or_else(|| {
648                                    let mut vmo_id = 1;
649                                    // Find the first gap...
650                                    for (&id, _) in &*vmos {
651                                        if id > vmo_id {
652                                            break;
653                                        }
654                                        vmo_id = id + 1;
655                                    }
656                                    vmo_id
657                                })
658                            }
659                        };
660                        vmos.insert(vmo_id, (vmo.clone(), None));
661                        vmo_id
662                    }
663                };
664                self.session_manager.clone().on_attach_vmo(&vmo).await?;
665                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
666                Ok(())
667            }
668            fblock::SessionRequest::Close { responder } => {
669                responder.send(Ok(()))?;
670                Err(anyhow!("Closed"))
671            }
672        }
673    }
674
675    /// Decodes `request`.
676    fn decode_fifo_request(
677        &self,
678        session: SM::Session,
679        request: &BlockFifoRequest,
680    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
681        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
682
683        let request_bytes = request.length as u64 * self.block_size as u64;
684
685        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
686            .ok_or(zx::Status::INVALID_ARGS)
687            .and_then(|code| {
688                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
689                    if code != BlockOpcode::Read {
690                        return Err(zx::Status::INVALID_ARGS);
691                    }
692                    if !SM::SUPPORTS_DECOMPRESSION {
693                        return Err(zx::Status::NOT_SUPPORTED);
694                    }
695                }
696                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
697                    if request.length == 0 {
698                        return Err(zx::Status::INVALID_ARGS);
699                    }
700                    // Make sure the end offsets won't wrap.
701                    if request.dev_offset.checked_add(request.length as u64).is_none()
702                        || (code != BlockOpcode::Trim
703                            && request_bytes.checked_add(request.vmo_offset).is_none())
704                    {
705                        return Err(zx::Status::OUT_OF_RANGE);
706                    }
707                }
708                Ok(match code {
709                    BlockOpcode::Read => Operation::Read {
710                        device_block_offset: request.dev_offset,
711                        block_count: request.length,
712                        _unused: 0,
713                        vmo_offset: request
714                            .vmo_offset
715                            .checked_mul(self.block_size as u64)
716                            .ok_or(zx::Status::OUT_OF_RANGE)?,
717                        options: ReadOptions {
718                            inline_crypto_options: InlineCryptoOptions {
719                                dun: request.dun,
720                                slot: request.slot,
721                            },
722                        },
723                    },
724                    BlockOpcode::Write => {
725                        let mut options = WriteOptions {
726                            inline_crypto_options: InlineCryptoOptions {
727                                dun: request.dun,
728                                slot: request.slot,
729                            },
730                            ..WriteOptions::default()
731                        };
732                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
733                            options.flags |= WriteFlags::FORCE_ACCESS;
734                        }
735                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
736                            options.flags |= WriteFlags::PRE_BARRIER;
737                        }
738                        Operation::Write {
739                            device_block_offset: request.dev_offset,
740                            block_count: request.length,
741                            _unused: 0,
742                            options,
743                            vmo_offset: request
744                                .vmo_offset
745                                .checked_mul(self.block_size as u64)
746                                .ok_or(zx::Status::OUT_OF_RANGE)?,
747                        }
748                    }
749                    BlockOpcode::Flush => Operation::Flush,
750                    BlockOpcode::Trim => Operation::Trim {
751                        device_block_offset: request.dev_offset,
752                        block_count: request.length,
753                    },
754                    BlockOpcode::CloseVmo => Operation::CloseVmo,
755                })
756            });
757
758        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
759            GroupOrRequest::Group(request.group)
760        } else {
761            GroupOrRequest::Request(request.reqid)
762        };
763
764        let mut active_requests = self.session_manager.active_requests().0.lock();
765        let mut request_id = None;
766
767        // Multiple Block I/O request may be sent as a group.
768        // Notes:
769        // - the group is identified by the group id in the request
770        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
771        //   flag is set.
772        // - when processing a request of a group fails, subsequent requests of that
773        //   group will not be processed.
774        // - decompression is a special case, see block-fifo.h for semantics.
775        //
776        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
777        if group_or_request.is_group() {
778            // Search for an existing entry that matches this group.  NOTE: This is a potentially
779            // expensive way to find a group (it's iterating over all slots in the active-requests
780            // slab).  This can be optimised easily should we need to.
781            for (key, group) in &mut active_requests.requests {
782                if group.group_or_request == group_or_request {
783                    if group.req_id.is_some() {
784                        // We have already received a request tagged as last.
785                        if group.status == zx::Status::OK {
786                            group.status = zx::Status::INVALID_ARGS;
787                        }
788                        // Ignore this request.
789                        return Err(None);
790                    }
791                    // See if this is a continuation of a decompressed read.
792                    if group.status == zx::Status::OK
793                        && let Some(info) = &mut group.decompression_info
794                    {
795                        if let Ok(Operation::Read {
796                            device_block_offset,
797                            mut block_count,
798                            options,
799                            vmo_offset: 0,
800                            ..
801                        }) = operation
802                        {
803                            let remaining_bytes = info
804                                .compressed_range
805                                .end
806                                .next_multiple_of(self.block_size as usize)
807                                as u64
808                                - info.bytes_so_far;
809                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
810                                || request.total_compressed_bytes != 0
811                                || request.uncompressed_bytes != 0
812                                || request.compressed_prefix_bytes != 0
813                                || (flags.contains(BlockIoFlag::GROUP_LAST)
814                                    && info.bytes_so_far + request_bytes
815                                        < info.compressed_range.end as u64)
816                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
817                                    && request_bytes >= remaining_bytes)
818                            {
819                                group.status = zx::Status::INVALID_ARGS;
820                            } else {
821                                // We are tolerant of `block_count` being more than we actually
822                                // need.  This can happen if the client is working with a larger
823                                // block size than the device block size.  For example, if Blobfs
824                                // has a 8192 byte block size, but the device might has a 512 byte
825                                // block size, it can ask for a multiple of 16 blocks, when fewer
826                                // than that might actually be required to hold the compressed data.
827                                // It is easier for us to tolerate this here than to get Blobfs to
828                                // change to pass only the blocks that are required.
829                                if request_bytes > remaining_bytes {
830                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
831                                }
832
833                                operation = Ok(Operation::ContinueDecompressedRead {
834                                    offset: info.bytes_so_far,
835                                    device_block_offset,
836                                    block_count,
837                                    options,
838                                });
839
840                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
841                            }
842                        } else {
843                            group.status = zx::Status::INVALID_ARGS;
844                        }
845                    }
846                    if flags.contains(BlockIoFlag::GROUP_LAST) {
847                        group.req_id = Some(request.reqid);
848                        // If the group has had an error, there is no point trying to issue this
849                        // request.
850                        if group.status != zx::Status::OK {
851                            operation = Err(group.status);
852                        }
853                    } else if group.status != zx::Status::OK {
854                        // The group has already encountered an error, so there is no point trying
855                        // to issue this request.
856                        return Err(None);
857                    }
858                    request_id = Some(RequestId(key));
859                    group.count += 1;
860                    break;
861                }
862            }
863        }
864
865        let is_single_request =
866            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
867
868        let mut decompression_info = None;
869        let vmo = match operation {
870            Ok(Operation::Read {
871                device_block_offset,
872                mut block_count,
873                options,
874                vmo_offset,
875                ..
876            }) => match self.vmos.lock().get_mut(&request.vmoid) {
877                Some((vmo, mapping)) => {
878                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
879                        let compressed_range = request.compressed_prefix_bytes as usize
880                            ..request.compressed_prefix_bytes as usize
881                                + request.total_compressed_bytes as usize;
882                        let required_buffer_size =
883                            compressed_range.end.next_multiple_of(self.block_size as usize);
884
885                        // Validate the initial decompression request.
886                        if compressed_range.start >= compressed_range.end
887                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
888                            || (is_single_request && request_bytes < compressed_range.end as u64)
889                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
890                        {
891                            Err(zx::Status::INVALID_ARGS)
892                        } else {
893                            // We are tolerant of `block_count` being more than we actually need.
894                            // This can happen if the client is working in a larger block size than
895                            // the device block size.  For example, Blobfs has a 8192 byte block
896                            // size, but the device might have a 512 byte block size.  It is easier
897                            // for us to tolerate this here than to get Blobfs to change to pass
898                            // only the blocks that are required.
899                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
900                                block_count =
901                                    (required_buffer_size / self.block_size as usize) as u32;
902                                required_buffer_size as u64
903                            } else {
904                                request_bytes
905                            };
906
907                            // To decompress, we need to have the target VMO mapped (cached).
908                            match mapping {
909                                Some(mapping) => Ok(mapping.clone()),
910                                None => {
911                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
912                                }
913                            }
914                            .and_then(|mapping| {
915                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
916                                // range.
917                                if vmo_offset
918                                    .checked_add(request.uncompressed_bytes as u64)
919                                    .is_some_and(|end| end <= mapping.size as u64)
920                                {
921                                    Ok(mapping)
922                                } else {
923                                    Err(zx::Status::OUT_OF_RANGE)
924                                }
925                            })
926                            .map(|mapping| {
927                                // Convert the operation into a `StartDecompressedRead`
928                                // operation. For non-fragmented requests, this will be the only
929                                // operation, but if it's a fragmented read,
930                                // `ContinueDecompressedRead` operations will follow.
931                                operation = Ok(Operation::StartDecompressedRead {
932                                    required_buffer_size,
933                                    device_block_offset,
934                                    block_count,
935                                    options,
936                                });
937                                // Record sufficient information so that we can decompress when all
938                                // the requests complete.
939                                decompression_info = Some(DecompressionInfo {
940                                    compressed_range,
941                                    bytes_so_far,
942                                    mapping,
943                                    uncompressed_range: vmo_offset
944                                        ..vmo_offset + request.uncompressed_bytes as u64,
945                                    buffer: None,
946                                });
947                                None
948                            })
949                        }
950                    } else {
951                        Ok(Some(vmo.clone()))
952                    }
953                }
954                None => Err(zx::Status::IO),
955            },
956            Ok(Operation::Write { .. }) => self
957                .vmos
958                .lock()
959                .get(&request.vmoid)
960                .cloned()
961                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
962            Ok(Operation::CloseVmo) => {
963                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
964                    let vmo_clone = vmo.clone();
965                    // Make sure the VMO is dropped after all current Epoch guards have been
966                    // dropped.
967                    Epoch::global().defer(move || drop(vmo_clone));
968                    Ok(Some(vmo))
969                })
970            }
971            _ => Ok(None),
972        }
973        .unwrap_or_else(|e| {
974            operation = Err(e);
975            None
976        });
977
978        let trace_flow_id = NonZero::new(request.trace_flow_id);
979        let request_id = request_id.unwrap_or_else(|| {
980            RequestId(active_requests.requests.insert(ActiveRequest {
981                session,
982                group_or_request,
983                trace_flow_id,
984                _epoch_guard: Epoch::global().guard(),
985                status: zx::Status::OK,
986                count: 1,
987                req_id: is_single_request.then_some(request.reqid),
988                decompression_info,
989            }))
990        });
991
992        Ok(DecodedRequest {
993            request_id,
994            trace_flow_id,
995            operation: operation.map_err(|status| {
996                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
997            })?,
998            vmo,
999        })
1000    }
1001
1002    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1003        std::mem::take(&mut *self.vmos.lock())
1004    }
1005
1006    /// Maps the request and returns the mapped request with an optional remainder.
1007    fn map_request(
1008        &self,
1009        mut request: DecodedRequest,
1010        active_request: &mut ActiveRequest<SM::Session>,
1011    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1012        if active_request.status != zx::Status::OK {
1013            return Err(zx::Status::BAD_STATE);
1014        }
1015        let mapping = self.offset_map.mapping();
1016        match (mapping, request.operation.blocks()) {
1017            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1018                return Err(zx::Status::OUT_OF_RANGE);
1019            }
1020            _ => {}
1021        }
1022        let remainder = request.operation.map(
1023            self.offset_map.mapping(),
1024            self.offset_map.max_transfer_blocks(),
1025            self.block_size,
1026        );
1027        if remainder.is_some() {
1028            active_request.count += 1;
1029        }
1030        static CACHE: AtomicU64 = AtomicU64::new(0);
1031        if let Some(context) =
1032            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
1033        {
1034            use fuchsia_trace::ArgValue;
1035            let trace_args = [
1036                ArgValue::of("request_id", request.request_id.0),
1037                ArgValue::of("opcode", request.operation.trace_label()),
1038            ];
1039            let _scope = fuchsia_trace::duration(
1040                c"storage",
1041                c"block_server::start_transaction",
1042                &trace_args,
1043            );
1044            if let Some(trace_flow_id) = active_request.trace_flow_id {
1045                fuchsia_trace::flow_step(
1046                    &context,
1047                    c"block_server::start_transaction",
1048                    trace_flow_id.get().into(),
1049                    &[],
1050                );
1051            }
1052        }
1053        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1054        Ok((request, remainder))
1055    }
1056
1057    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1058        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1059    }
1060}
1061
1062#[repr(transparent)]
1063#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1064pub struct RequestId(usize);
1065
1066#[derive(Clone, Debug)]
1067struct DecodedRequest {
1068    request_id: RequestId,
1069    trace_flow_id: TraceFlowId,
1070    operation: Operation,
1071    vmo: Option<Arc<zx::Vmo>>,
1072}
1073
1074/// cbindgen:no-export
1075pub type WriteFlags = block_protocol::WriteFlags;
1076pub type WriteOptions = block_protocol::WriteOptions;
1077pub type ReadOptions = block_protocol::ReadOptions;
1078
1079#[repr(C)]
1080#[derive(Clone, Debug, PartialEq, Eq)]
1081pub enum Operation {
1082    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1083    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1084    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1085    // write variant).
1086    Read {
1087        device_block_offset: u64,
1088        block_count: u32,
1089        _unused: u32,
1090        vmo_offset: u64,
1091        options: ReadOptions,
1092    },
1093    Write {
1094        device_block_offset: u64,
1095        block_count: u32,
1096        _unused: u32,
1097        vmo_offset: u64,
1098        options: WriteOptions,
1099    },
1100    Flush,
1101    Trim {
1102        device_block_offset: u64,
1103        block_count: u32,
1104    },
1105    /// This will never be seen by the C interface.
1106    CloseVmo,
1107    StartDecompressedRead {
1108        required_buffer_size: usize,
1109        device_block_offset: u64,
1110        block_count: u32,
1111        options: ReadOptions,
1112    },
1113    ContinueDecompressedRead {
1114        offset: u64,
1115        device_block_offset: u64,
1116        block_count: u32,
1117        options: ReadOptions,
1118    },
1119}
1120
1121impl Operation {
1122    fn trace_label(&self) -> &'static str {
1123        match self {
1124            Operation::Read { .. } => "read",
1125            Operation::Write { .. } => "write",
1126            Operation::Flush { .. } => "flush",
1127            Operation::Trim { .. } => "trim",
1128            Operation::CloseVmo { .. } => "close_vmo",
1129            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1130            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1131        }
1132    }
1133
1134    /// Returns (offset, length).
1135    fn blocks(&self) -> Option<(u64, u32)> {
1136        match self {
1137            Operation::Read { device_block_offset, block_count, .. }
1138            | Operation::Write { device_block_offset, block_count, .. }
1139            | Operation::Trim { device_block_offset, block_count, .. } => {
1140                Some((*device_block_offset, *block_count))
1141            }
1142            _ => None,
1143        }
1144    }
1145
1146    /// Returns mutable references to (offset, length).
1147    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1148        match self {
1149            Operation::Read { device_block_offset, block_count, .. }
1150            | Operation::Write { device_block_offset, block_count, .. }
1151            | Operation::Trim { device_block_offset, block_count, .. } => {
1152                Some((device_block_offset, block_count))
1153            }
1154            _ => None,
1155        }
1156    }
1157
1158    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1159    /// start of the operation.
1160    fn map(
1161        &mut self,
1162        mapping: Option<&BlockOffsetMapping>,
1163        max_blocks: Option<NonZero<u32>>,
1164        block_size: u32,
1165    ) -> Option<Self> {
1166        let mut max = match self {
1167            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1168            _ => None,
1169        };
1170        let (offset, length) = self.blocks_mut()?;
1171        let orig_offset = *offset;
1172        if let Some(mapping) = mapping {
1173            let delta = *offset - mapping.source_block_offset;
1174            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1175            *offset = mapping.target_block_offset + delta;
1176            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1177            max = match max {
1178                None => Some(mapping_max),
1179                Some(m) => Some(std::cmp::min(m, mapping_max)),
1180            };
1181        };
1182        if let Some(max) = max {
1183            if *length as u64 > max {
1184                let rem = (*length as u64 - max) as u32;
1185                *length = max as u32;
1186                return Some(match self {
1187                    Operation::Read {
1188                        device_block_offset: _,
1189                        block_count: _,
1190                        vmo_offset,
1191                        _unused,
1192                        options,
1193                    } => Operation::Read {
1194                        device_block_offset: orig_offset + max,
1195                        block_count: rem,
1196                        vmo_offset: *vmo_offset + max * block_size as u64,
1197                        _unused: *_unused,
1198                        options: *options,
1199                    },
1200                    Operation::Write {
1201                        device_block_offset: _,
1202                        block_count: _,
1203                        _unused,
1204                        vmo_offset,
1205                        options,
1206                    } => {
1207                        // Only send the barrier flag once per write request.
1208                        let new_options = WriteOptions {
1209                            flags: options.flags & !WriteFlags::PRE_BARRIER,
1210                            ..*options
1211                        };
1212
1213                        Operation::Write {
1214                            device_block_offset: orig_offset + max,
1215                            block_count: rem,
1216                            _unused: *_unused,
1217                            vmo_offset: *vmo_offset + max * block_size as u64,
1218                            options: new_options,
1219                        }
1220                    }
1221                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1222                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1223                    }
1224                    _ => unreachable!(),
1225                });
1226            }
1227        }
1228        None
1229    }
1230}
1231
1232#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1233pub enum GroupOrRequest {
1234    Group(u16),
1235    Request(u32),
1236}
1237
1238impl GroupOrRequest {
1239    fn is_group(&self) -> bool {
1240        matches!(self, Self::Group(_))
1241    }
1242
1243    fn group_id(&self) -> Option<u16> {
1244        match self {
1245            Self::Group(id) => Some(*id),
1246            Self::Request(_) => None,
1247        }
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::{
1254        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1255        TraceFlowId,
1256    };
1257    use assert_matches::assert_matches;
1258    use block_protocol::{
1259        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1260        WriteOptions,
1261    };
1262    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1263    use fuchsia_sync::Mutex;
1264    use futures::FutureExt as _;
1265    use futures::channel::oneshot;
1266    use futures::future::BoxFuture;
1267    use std::borrow::Cow;
1268    use std::future::poll_fn;
1269    use std::num::NonZero;
1270    use std::pin::pin;
1271    use std::sync::Arc;
1272    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1273    use std::task::{Context, Poll};
1274    use zx::{AsHandleRef as _, HandleBased as _};
1275    use {
1276        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1277        fuchsia_async as fasync,
1278    };
1279
1280    #[derive(Default)]
1281    struct MockInterface {
1282        read_hook: Option<
1283            Box<
1284                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1285                    + Send
1286                    + Sync,
1287            >,
1288        >,
1289        write_hook:
1290            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1291        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1292    }
1293
1294    impl super::async_interface::Interface for MockInterface {
1295        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1296            Ok(())
1297        }
1298
1299        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1300            Cow::Owned(test_device_info())
1301        }
1302
1303        async fn read(
1304            &self,
1305            device_block_offset: u64,
1306            block_count: u32,
1307            vmo: &Arc<zx::Vmo>,
1308            vmo_offset: u64,
1309            _opts: ReadOptions,
1310            _trace_flow_id: TraceFlowId,
1311        ) -> Result<(), zx::Status> {
1312            if let Some(read_hook) = &self.read_hook {
1313                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1314            } else {
1315                unimplemented!();
1316            }
1317        }
1318
1319        async fn write(
1320            &self,
1321            device_block_offset: u64,
1322            _block_count: u32,
1323            _vmo: &Arc<zx::Vmo>,
1324            _vmo_offset: u64,
1325            _opts: WriteOptions,
1326            _trace_flow_id: TraceFlowId,
1327        ) -> Result<(), zx::Status> {
1328            if let Some(write_hook) = &self.write_hook {
1329                write_hook(device_block_offset).await
1330            } else {
1331                unimplemented!();
1332            }
1333        }
1334
1335        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1336            unreachable!();
1337        }
1338
1339        fn barrier(&self) -> Result<(), zx::Status> {
1340            if let Some(barrier_hook) = &self.barrier_hook {
1341                barrier_hook()
1342            } else {
1343                unimplemented!();
1344            }
1345        }
1346
1347        async fn trim(
1348            &self,
1349            _device_block_offset: u64,
1350            _block_count: u32,
1351            _trace_flow_id: TraceFlowId,
1352        ) -> Result<(), zx::Status> {
1353            unreachable!();
1354        }
1355
1356        async fn get_volume_info(
1357            &self,
1358        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1359            // Hang forever for the test_requests_dont_block_sessions test.
1360            let () = std::future::pending().await;
1361            unreachable!();
1362        }
1363    }
1364
1365    const BLOCK_SIZE: u32 = 512;
1366    const MAX_TRANSFER_BLOCKS: u32 = 10;
1367
1368    fn test_device_info() -> DeviceInfo {
1369        DeviceInfo::Partition(PartitionInfo {
1370            device_flags: fblock::Flag::READONLY,
1371            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1372            block_range: Some(0..100),
1373            type_guid: [1; 16],
1374            instance_guid: [2; 16],
1375            name: "foo".to_string(),
1376            flags: 0xabcd,
1377        })
1378    }
1379
1380    #[fuchsia::test]
1381    async fn test_split_request_only_issues_single_barrier() {
1382        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1383        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1384        let barrier_called = Arc::new(AtomicU32::new(0));
1385        let barrier_called_clone = barrier_called.clone();
1386
1387        futures::join!(
1388            async move {
1389                let block_server = BlockServer::new(
1390                    BLOCK_SIZE,
1391                    Arc::new(MockInterface {
1392                        barrier_hook: Some(Box::new(move || {
1393                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1394                            Ok(())
1395                        })),
1396                        write_hook: Some(Box::new(move |_device_block_offset| {
1397                            Box::pin(async move { Ok(()) })
1398                        })),
1399                        ..MockInterface::default()
1400                    }),
1401                );
1402                block_server.handle_requests(stream).await.unwrap();
1403            },
1404            async move {
1405                let (session_proxy, server) = fidl::endpoints::create_proxy();
1406
1407                proxy.open_session(server).unwrap();
1408
1409                let vmo_id = session_proxy
1410                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1411                    .await
1412                    .unwrap()
1413                    .unwrap();
1414                assert_ne!(vmo_id.id, 0);
1415
1416                let mut fifo =
1417                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1418                let (mut reader, mut writer) = fifo.async_io();
1419                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1420                // sub requests.
1421                writer
1422                    .write_entries(&BlockFifoRequest {
1423                        command: BlockFifoCommand {
1424                            opcode: BlockOpcode::Write.into_primitive(),
1425                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1426                            ..Default::default()
1427                        },
1428                        vmoid: vmo_id.id,
1429                        dev_offset: 0,
1430                        length: MAX_TRANSFER_BLOCKS + 1,
1431                        vmo_offset: 6,
1432                        ..Default::default()
1433                    })
1434                    .await
1435                    .unwrap();
1436
1437                let mut response = BlockFifoResponse::default();
1438                reader.read_entries(&mut response).await.unwrap();
1439                assert_eq!(response.status, zx::sys::ZX_OK);
1440
1441                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1442
1443                std::mem::drop(proxy);
1444            }
1445        );
1446    }
1447
1448    #[fuchsia::test]
1449    async fn test_barriers_not_supported() {
1450        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1451        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1452
1453        futures::join!(
1454            async move {
1455                let block_server = BlockServer::new(
1456                    BLOCK_SIZE,
1457                    Arc::new(MockInterface {
1458                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1459                        write_hook: Some(Box::new(move |_device_block_offset| {
1460                            Box::pin(async move { Ok(()) })
1461                        })),
1462                        ..MockInterface::default()
1463                    }),
1464                );
1465                block_server.handle_requests(stream).await.unwrap();
1466            },
1467            async move {
1468                let (session_proxy, server) = fidl::endpoints::create_proxy();
1469
1470                proxy.open_session(server).unwrap();
1471
1472                let vmo_id = session_proxy
1473                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1474                    .await
1475                    .unwrap()
1476                    .unwrap();
1477                assert_ne!(vmo_id.id, 0);
1478
1479                let mut fifo =
1480                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1481                let (mut reader, mut writer) = fifo.async_io();
1482
1483                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1484                // sub requests.
1485                writer
1486                    .write_entries(&BlockFifoRequest {
1487                        command: BlockFifoCommand {
1488                            opcode: BlockOpcode::Write.into_primitive(),
1489                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1490                            ..Default::default()
1491                        },
1492                        vmoid: vmo_id.id,
1493                        dev_offset: 0,
1494                        length: MAX_TRANSFER_BLOCKS + 1,
1495                        vmo_offset: 6,
1496                        ..Default::default()
1497                    })
1498                    .await
1499                    .unwrap();
1500
1501                let mut response = BlockFifoResponse::default();
1502                reader.read_entries(&mut response).await.unwrap();
1503                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1504
1505                std::mem::drop(proxy);
1506            }
1507        );
1508    }
1509
1510    #[fuchsia::test]
1511    async fn test_barriers_ordering() {
1512        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1513        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1514        let barrier_called = Arc::new(AtomicBool::new(false));
1515
1516        futures::join!(
1517            async move {
1518                let barrier_called_clone = barrier_called.clone();
1519                let block_server = BlockServer::new(
1520                    BLOCK_SIZE,
1521                    Arc::new(MockInterface {
1522                        barrier_hook: Some(Box::new(move || {
1523                            barrier_called.store(true, Ordering::Relaxed);
1524                            Ok(())
1525                        })),
1526                        write_hook: Some(Box::new(move |device_block_offset| {
1527                            let barrier_called = barrier_called_clone.clone();
1528                            Box::pin(async move {
1529                                // The sleep allows the server to reorder the fifo requests.
1530                                if device_block_offset % 2 == 0 {
1531                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1532                                        zx::MonotonicDuration::from_millis(200),
1533                                    ))
1534                                    .await;
1535                                }
1536                                assert!(barrier_called.load(Ordering::Relaxed));
1537                                Ok(())
1538                            })
1539                        })),
1540                        ..MockInterface::default()
1541                    }),
1542                );
1543                block_server.handle_requests(stream).await.unwrap();
1544            },
1545            async move {
1546                let (session_proxy, server) = fidl::endpoints::create_proxy();
1547
1548                proxy.open_session(server).unwrap();
1549
1550                let vmo_id = session_proxy
1551                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1552                    .await
1553                    .unwrap()
1554                    .unwrap();
1555                assert_ne!(vmo_id.id, 0);
1556
1557                let mut fifo =
1558                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1559                let (mut reader, mut writer) = fifo.async_io();
1560
1561                writer
1562                    .write_entries(&BlockFifoRequest {
1563                        command: BlockFifoCommand {
1564                            opcode: BlockOpcode::Write.into_primitive(),
1565                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1566                            ..Default::default()
1567                        },
1568                        vmoid: vmo_id.id,
1569                        dev_offset: 0,
1570                        length: 5,
1571                        vmo_offset: 6,
1572                        ..Default::default()
1573                    })
1574                    .await
1575                    .unwrap();
1576
1577                for i in 0..10 {
1578                    writer
1579                        .write_entries(&BlockFifoRequest {
1580                            command: BlockFifoCommand {
1581                                opcode: BlockOpcode::Write.into_primitive(),
1582                                ..Default::default()
1583                            },
1584                            vmoid: vmo_id.id,
1585                            dev_offset: i + 1,
1586                            length: 5,
1587                            vmo_offset: 6,
1588                            ..Default::default()
1589                        })
1590                        .await
1591                        .unwrap();
1592                }
1593                for _ in 0..11 {
1594                    let mut response = BlockFifoResponse::default();
1595                    reader.read_entries(&mut response).await.unwrap();
1596                    assert_eq!(response.status, zx::sys::ZX_OK);
1597                }
1598
1599                std::mem::drop(proxy);
1600            }
1601        );
1602    }
1603
1604    #[fuchsia::test]
1605    async fn test_info() {
1606        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1607
1608        futures::join!(
1609            async {
1610                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1611                block_server.handle_requests(stream).await.unwrap();
1612            },
1613            async {
1614                let expected_info = test_device_info();
1615                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1616                    info
1617                } else {
1618                    unreachable!()
1619                };
1620
1621                let block_info = proxy.get_info().await.unwrap().unwrap();
1622                assert_eq!(
1623                    block_info.block_count,
1624                    partition_info.block_range.as_ref().unwrap().end
1625                        - partition_info.block_range.as_ref().unwrap().start
1626                );
1627                assert_eq!(
1628                    block_info.flags,
1629                    fblock::Flag::READONLY | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1630                );
1631
1632                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1633
1634                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1635                assert_eq!(status, zx::sys::ZX_OK);
1636                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1637
1638                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1639                assert_eq!(status, zx::sys::ZX_OK);
1640                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1641
1642                let (status, name) = proxy.get_name().await.unwrap();
1643                assert_eq!(status, zx::sys::ZX_OK);
1644                assert_eq!(name.as_ref(), Some(&partition_info.name));
1645
1646                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1647                assert_eq!(metadata.name, name);
1648                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1649                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1650                assert_eq!(
1651                    metadata.start_block_offset,
1652                    Some(partition_info.block_range.as_ref().unwrap().start)
1653                );
1654                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1655                assert_eq!(metadata.flags, Some(partition_info.flags));
1656
1657                std::mem::drop(proxy);
1658            }
1659        );
1660    }
1661
1662    #[fuchsia::test]
1663    async fn test_attach_vmo() {
1664        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1665
1666        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1667        let koid = vmo.get_koid().unwrap();
1668
1669        futures::join!(
1670            async {
1671                let block_server = BlockServer::new(
1672                    BLOCK_SIZE,
1673                    Arc::new(MockInterface {
1674                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1675                            assert_eq!(vmo.get_koid().unwrap(), koid);
1676                            Box::pin(async { Ok(()) })
1677                        })),
1678                        ..MockInterface::default()
1679                    }),
1680                );
1681                block_server.handle_requests(stream).await.unwrap();
1682            },
1683            async move {
1684                let (session_proxy, server) = fidl::endpoints::create_proxy();
1685
1686                proxy.open_session(server).unwrap();
1687
1688                let vmo_id = session_proxy
1689                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1690                    .await
1691                    .unwrap()
1692                    .unwrap();
1693                assert_ne!(vmo_id.id, 0);
1694
1695                let mut fifo =
1696                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1697                let (mut reader, mut writer) = fifo.async_io();
1698
1699                // Keep attaching VMOs until we eventually hit the maximum.
1700                let mut count = 1;
1701                loop {
1702                    match session_proxy
1703                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1704                        .await
1705                        .unwrap()
1706                    {
1707                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1708                        Err(e) => {
1709                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1710                            break;
1711                        }
1712                    }
1713
1714                    // Only test every 10 to keep test time down.
1715                    if count % 10 == 0 {
1716                        writer
1717                            .write_entries(&BlockFifoRequest {
1718                                command: BlockFifoCommand {
1719                                    opcode: BlockOpcode::Read.into_primitive(),
1720                                    ..Default::default()
1721                                },
1722                                vmoid: vmo_id.id,
1723                                length: 1,
1724                                ..Default::default()
1725                            })
1726                            .await
1727                            .unwrap();
1728
1729                        let mut response = BlockFifoResponse::default();
1730                        reader.read_entries(&mut response).await.unwrap();
1731                        assert_eq!(response.status, zx::sys::ZX_OK);
1732                    }
1733
1734                    count += 1;
1735                }
1736
1737                assert_eq!(count, u16::MAX as u64);
1738
1739                // Detach the original VMO, and make sure we can then attach another one.
1740                writer
1741                    .write_entries(&BlockFifoRequest {
1742                        command: BlockFifoCommand {
1743                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1744                            ..Default::default()
1745                        },
1746                        vmoid: vmo_id.id,
1747                        ..Default::default()
1748                    })
1749                    .await
1750                    .unwrap();
1751
1752                let mut response = BlockFifoResponse::default();
1753                reader.read_entries(&mut response).await.unwrap();
1754                assert_eq!(response.status, zx::sys::ZX_OK);
1755
1756                let new_vmo_id = session_proxy
1757                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1758                    .await
1759                    .unwrap()
1760                    .unwrap();
1761                // It should reuse the same ID.
1762                assert_eq!(new_vmo_id.id, vmo_id.id);
1763
1764                std::mem::drop(proxy);
1765            }
1766        );
1767    }
1768
1769    #[fuchsia::test]
1770    async fn test_close() {
1771        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1772
1773        let mut server = std::pin::pin!(
1774            async {
1775                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1776                block_server.handle_requests(stream).await.unwrap();
1777            }
1778            .fuse()
1779        );
1780
1781        let mut client = std::pin::pin!(
1782            async {
1783                let (session_proxy, server) = fidl::endpoints::create_proxy();
1784
1785                proxy.open_session(server).unwrap();
1786
1787                // Dropping the proxy should not cause the session to terminate because the session is
1788                // still live.
1789                std::mem::drop(proxy);
1790
1791                session_proxy.close().await.unwrap().unwrap();
1792
1793                // Keep the session alive.  Calling `close` should cause the server to terminate.
1794                let _: () = std::future::pending().await;
1795            }
1796            .fuse()
1797        );
1798
1799        futures::select!(
1800            _ = server => {}
1801            _ = client => unreachable!(),
1802        );
1803    }
1804
1805    #[derive(Default)]
1806    struct IoMockInterface {
1807        do_checks: bool,
1808        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1809        return_errors: bool,
1810    }
1811
1812    #[derive(Debug)]
1813    enum ExpectedOp {
1814        Read(u64, u32, u64),
1815        Write(u64, u32, u64),
1816        Trim(u64, u32),
1817        Flush,
1818    }
1819
1820    impl super::async_interface::Interface for IoMockInterface {
1821        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1822            Ok(())
1823        }
1824
1825        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1826            Cow::Owned(test_device_info())
1827        }
1828
1829        async fn read(
1830            &self,
1831            device_block_offset: u64,
1832            block_count: u32,
1833            _vmo: &Arc<zx::Vmo>,
1834            vmo_offset: u64,
1835            _opts: ReadOptions,
1836            _trace_flow_id: TraceFlowId,
1837        ) -> Result<(), zx::Status> {
1838            if self.return_errors {
1839                Err(zx::Status::INTERNAL)
1840            } else {
1841                if self.do_checks {
1842                    assert_matches!(
1843                        self.expected_op.lock().take(),
1844                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1845                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1846                        "Read {device_block_offset} {block_count} {vmo_offset}"
1847                    );
1848                }
1849                Ok(())
1850            }
1851        }
1852
1853        async fn write(
1854            &self,
1855            device_block_offset: u64,
1856            block_count: u32,
1857            _vmo: &Arc<zx::Vmo>,
1858            vmo_offset: u64,
1859            _write_opts: WriteOptions,
1860            _trace_flow_id: TraceFlowId,
1861        ) -> Result<(), zx::Status> {
1862            if self.return_errors {
1863                Err(zx::Status::NOT_SUPPORTED)
1864            } else {
1865                if self.do_checks {
1866                    assert_matches!(
1867                        self.expected_op.lock().take(),
1868                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1869                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1870                        "Write {device_block_offset} {block_count} {vmo_offset}"
1871                    );
1872                }
1873                Ok(())
1874            }
1875        }
1876
1877        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1878            if self.return_errors {
1879                Err(zx::Status::NO_RESOURCES)
1880            } else {
1881                if self.do_checks {
1882                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1883                }
1884                Ok(())
1885            }
1886        }
1887
1888        fn barrier(&self) -> Result<(), zx::Status> {
1889            unreachable!()
1890        }
1891
1892        async fn trim(
1893            &self,
1894            device_block_offset: u64,
1895            block_count: u32,
1896            _trace_flow_id: TraceFlowId,
1897        ) -> Result<(), zx::Status> {
1898            if self.return_errors {
1899                Err(zx::Status::NO_MEMORY)
1900            } else {
1901                if self.do_checks {
1902                    assert_matches!(
1903                        self.expected_op.lock().take(),
1904                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1905                            block_count == b,
1906                        "Trim {device_block_offset} {block_count}"
1907                    );
1908                }
1909                Ok(())
1910            }
1911        }
1912    }
1913
1914    #[fuchsia::test]
1915    async fn test_io() {
1916        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1917
1918        let expected_op = Arc::new(Mutex::new(None));
1919        let expected_op_clone = expected_op.clone();
1920
1921        let server = async {
1922            let block_server = BlockServer::new(
1923                BLOCK_SIZE,
1924                Arc::new(IoMockInterface {
1925                    return_errors: false,
1926                    do_checks: true,
1927                    expected_op: expected_op_clone,
1928                }),
1929            );
1930            block_server.handle_requests(stream).await.unwrap();
1931        };
1932
1933        let client = async move {
1934            let (session_proxy, server) = fidl::endpoints::create_proxy();
1935
1936            proxy.open_session(server).unwrap();
1937
1938            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1939            let vmo_id = session_proxy
1940                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1941                .await
1942                .unwrap()
1943                .unwrap();
1944
1945            let mut fifo =
1946                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1947            let (mut reader, mut writer) = fifo.async_io();
1948
1949            // READ
1950            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1951            writer
1952                .write_entries(&BlockFifoRequest {
1953                    command: BlockFifoCommand {
1954                        opcode: BlockOpcode::Read.into_primitive(),
1955                        ..Default::default()
1956                    },
1957                    vmoid: vmo_id.id,
1958                    dev_offset: 1,
1959                    length: 2,
1960                    vmo_offset: 3,
1961                    ..Default::default()
1962                })
1963                .await
1964                .unwrap();
1965
1966            let mut response = BlockFifoResponse::default();
1967            reader.read_entries(&mut response).await.unwrap();
1968            assert_eq!(response.status, zx::sys::ZX_OK);
1969
1970            // WRITE
1971            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1972            writer
1973                .write_entries(&BlockFifoRequest {
1974                    command: BlockFifoCommand {
1975                        opcode: BlockOpcode::Write.into_primitive(),
1976                        ..Default::default()
1977                    },
1978                    vmoid: vmo_id.id,
1979                    dev_offset: 4,
1980                    length: 5,
1981                    vmo_offset: 6,
1982                    ..Default::default()
1983                })
1984                .await
1985                .unwrap();
1986
1987            let mut response = BlockFifoResponse::default();
1988            reader.read_entries(&mut response).await.unwrap();
1989            assert_eq!(response.status, zx::sys::ZX_OK);
1990
1991            // FLUSH
1992            *expected_op.lock() = Some(ExpectedOp::Flush);
1993            writer
1994                .write_entries(&BlockFifoRequest {
1995                    command: BlockFifoCommand {
1996                        opcode: BlockOpcode::Flush.into_primitive(),
1997                        ..Default::default()
1998                    },
1999                    ..Default::default()
2000                })
2001                .await
2002                .unwrap();
2003
2004            reader.read_entries(&mut response).await.unwrap();
2005            assert_eq!(response.status, zx::sys::ZX_OK);
2006
2007            // TRIM
2008            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
2009            writer
2010                .write_entries(&BlockFifoRequest {
2011                    command: BlockFifoCommand {
2012                        opcode: BlockOpcode::Trim.into_primitive(),
2013                        ..Default::default()
2014                    },
2015                    dev_offset: 7,
2016                    length: 8,
2017                    ..Default::default()
2018                })
2019                .await
2020                .unwrap();
2021
2022            reader.read_entries(&mut response).await.unwrap();
2023            assert_eq!(response.status, zx::sys::ZX_OK);
2024
2025            std::mem::drop(proxy);
2026        };
2027
2028        futures::join!(server, client);
2029    }
2030
2031    #[fuchsia::test]
2032    async fn test_io_errors() {
2033        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2034
2035        futures::join!(
2036            async {
2037                let block_server = BlockServer::new(
2038                    BLOCK_SIZE,
2039                    Arc::new(IoMockInterface {
2040                        return_errors: true,
2041                        do_checks: false,
2042                        expected_op: Arc::new(Mutex::new(None)),
2043                    }),
2044                );
2045                block_server.handle_requests(stream).await.unwrap();
2046            },
2047            async move {
2048                let (session_proxy, server) = fidl::endpoints::create_proxy();
2049
2050                proxy.open_session(server).unwrap();
2051
2052                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2053                let vmo_id = session_proxy
2054                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2055                    .await
2056                    .unwrap()
2057                    .unwrap();
2058
2059                let mut fifo =
2060                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2061                let (mut reader, mut writer) = fifo.async_io();
2062
2063                // READ
2064                writer
2065                    .write_entries(&BlockFifoRequest {
2066                        command: BlockFifoCommand {
2067                            opcode: BlockOpcode::Read.into_primitive(),
2068                            ..Default::default()
2069                        },
2070                        vmoid: vmo_id.id,
2071                        length: 1,
2072                        reqid: 1,
2073                        ..Default::default()
2074                    })
2075                    .await
2076                    .unwrap();
2077
2078                let mut response = BlockFifoResponse::default();
2079                reader.read_entries(&mut response).await.unwrap();
2080                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2081
2082                // WRITE
2083                writer
2084                    .write_entries(&BlockFifoRequest {
2085                        command: BlockFifoCommand {
2086                            opcode: BlockOpcode::Write.into_primitive(),
2087                            ..Default::default()
2088                        },
2089                        vmoid: vmo_id.id,
2090                        length: 1,
2091                        reqid: 2,
2092                        ..Default::default()
2093                    })
2094                    .await
2095                    .unwrap();
2096
2097                reader.read_entries(&mut response).await.unwrap();
2098                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2099
2100                // FLUSH
2101                writer
2102                    .write_entries(&BlockFifoRequest {
2103                        command: BlockFifoCommand {
2104                            opcode: BlockOpcode::Flush.into_primitive(),
2105                            ..Default::default()
2106                        },
2107                        reqid: 3,
2108                        ..Default::default()
2109                    })
2110                    .await
2111                    .unwrap();
2112
2113                reader.read_entries(&mut response).await.unwrap();
2114                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2115
2116                // TRIM
2117                writer
2118                    .write_entries(&BlockFifoRequest {
2119                        command: BlockFifoCommand {
2120                            opcode: BlockOpcode::Trim.into_primitive(),
2121                            ..Default::default()
2122                        },
2123                        reqid: 4,
2124                        length: 1,
2125                        ..Default::default()
2126                    })
2127                    .await
2128                    .unwrap();
2129
2130                reader.read_entries(&mut response).await.unwrap();
2131                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2132
2133                std::mem::drop(proxy);
2134            }
2135        );
2136    }
2137
2138    #[fuchsia::test]
2139    async fn test_invalid_args() {
2140        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2141
2142        futures::join!(
2143            async {
2144                let block_server = BlockServer::new(
2145                    BLOCK_SIZE,
2146                    Arc::new(IoMockInterface {
2147                        return_errors: false,
2148                        do_checks: false,
2149                        expected_op: Arc::new(Mutex::new(None)),
2150                    }),
2151                );
2152                block_server.handle_requests(stream).await.unwrap();
2153            },
2154            async move {
2155                let (session_proxy, server) = fidl::endpoints::create_proxy();
2156
2157                proxy.open_session(server).unwrap();
2158
2159                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2160                let vmo_id = session_proxy
2161                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2162                    .await
2163                    .unwrap()
2164                    .unwrap();
2165
2166                let mut fifo =
2167                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2168
2169                async fn test(
2170                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2171                    request: BlockFifoRequest,
2172                ) -> Result<(), zx::Status> {
2173                    let (mut reader, mut writer) = fifo.async_io();
2174                    writer.write_entries(&request).await.unwrap();
2175                    let mut response = BlockFifoResponse::default();
2176                    reader.read_entries(&mut response).await.unwrap();
2177                    zx::Status::ok(response.status)
2178                }
2179
2180                // READ
2181
2182                let good_read_request = || BlockFifoRequest {
2183                    command: BlockFifoCommand {
2184                        opcode: BlockOpcode::Read.into_primitive(),
2185                        ..Default::default()
2186                    },
2187                    length: 1,
2188                    vmoid: vmo_id.id,
2189                    ..Default::default()
2190                };
2191
2192                assert_eq!(
2193                    test(
2194                        &mut fifo,
2195                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2196                    )
2197                    .await,
2198                    Err(zx::Status::IO)
2199                );
2200
2201                assert_eq!(
2202                    test(
2203                        &mut fifo,
2204                        BlockFifoRequest {
2205                            vmo_offset: 0xffff_ffff_ffff_ffff,
2206                            ..good_read_request()
2207                        }
2208                    )
2209                    .await,
2210                    Err(zx::Status::OUT_OF_RANGE)
2211                );
2212
2213                assert_eq!(
2214                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2215                    Err(zx::Status::INVALID_ARGS)
2216                );
2217
2218                // WRITE
2219
2220                let good_write_request = || BlockFifoRequest {
2221                    command: BlockFifoCommand {
2222                        opcode: BlockOpcode::Write.into_primitive(),
2223                        ..Default::default()
2224                    },
2225                    length: 1,
2226                    vmoid: vmo_id.id,
2227                    ..Default::default()
2228                };
2229
2230                assert_eq!(
2231                    test(
2232                        &mut fifo,
2233                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2234                    )
2235                    .await,
2236                    Err(zx::Status::IO)
2237                );
2238
2239                assert_eq!(
2240                    test(
2241                        &mut fifo,
2242                        BlockFifoRequest {
2243                            vmo_offset: 0xffff_ffff_ffff_ffff,
2244                            ..good_write_request()
2245                        }
2246                    )
2247                    .await,
2248                    Err(zx::Status::OUT_OF_RANGE)
2249                );
2250
2251                assert_eq!(
2252                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2253                    Err(zx::Status::INVALID_ARGS)
2254                );
2255
2256                // CLOSE VMO
2257
2258                assert_eq!(
2259                    test(
2260                        &mut fifo,
2261                        BlockFifoRequest {
2262                            command: BlockFifoCommand {
2263                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2264                                ..Default::default()
2265                            },
2266                            vmoid: vmo_id.id + 1,
2267                            ..Default::default()
2268                        }
2269                    )
2270                    .await,
2271                    Err(zx::Status::IO)
2272                );
2273
2274                std::mem::drop(proxy);
2275            }
2276        );
2277    }
2278
2279    #[fuchsia::test]
2280    async fn test_concurrent_requests() {
2281        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2282
2283        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2284        let waiting_readers_clone = waiting_readers.clone();
2285
2286        futures::join!(
2287            async move {
2288                let block_server = BlockServer::new(
2289                    BLOCK_SIZE,
2290                    Arc::new(MockInterface {
2291                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2292                            let (tx, rx) = oneshot::channel();
2293                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2294                            Box::pin(async move {
2295                                let _ = rx.await;
2296                                Ok(())
2297                            })
2298                        })),
2299                        ..MockInterface::default()
2300                    }),
2301                );
2302                block_server.handle_requests(stream).await.unwrap();
2303            },
2304            async move {
2305                let (session_proxy, server) = fidl::endpoints::create_proxy();
2306
2307                proxy.open_session(server).unwrap();
2308
2309                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2310                let vmo_id = session_proxy
2311                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2312                    .await
2313                    .unwrap()
2314                    .unwrap();
2315
2316                let mut fifo =
2317                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2318                let (mut reader, mut writer) = fifo.async_io();
2319
2320                writer
2321                    .write_entries(&BlockFifoRequest {
2322                        command: BlockFifoCommand {
2323                            opcode: BlockOpcode::Read.into_primitive(),
2324                            ..Default::default()
2325                        },
2326                        reqid: 1,
2327                        dev_offset: 1, // Intentionally use the same as `reqid`.
2328                        vmoid: vmo_id.id,
2329                        length: 1,
2330                        ..Default::default()
2331                    })
2332                    .await
2333                    .unwrap();
2334
2335                writer
2336                    .write_entries(&BlockFifoRequest {
2337                        command: BlockFifoCommand {
2338                            opcode: BlockOpcode::Read.into_primitive(),
2339                            ..Default::default()
2340                        },
2341                        reqid: 2,
2342                        dev_offset: 2,
2343                        vmoid: vmo_id.id,
2344                        length: 1,
2345                        ..Default::default()
2346                    })
2347                    .await
2348                    .unwrap();
2349
2350                // Wait till both those entries are pending.
2351                poll_fn(|cx: &mut Context<'_>| {
2352                    if waiting_readers.lock().len() == 2 {
2353                        Poll::Ready(())
2354                    } else {
2355                        // Yield to the executor.
2356                        cx.waker().wake_by_ref();
2357                        Poll::Pending
2358                    }
2359                })
2360                .await;
2361
2362                let mut response = BlockFifoResponse::default();
2363                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2364
2365                let (id, tx) = waiting_readers.lock().pop().unwrap();
2366                tx.send(()).unwrap();
2367
2368                reader.read_entries(&mut response).await.unwrap();
2369                assert_eq!(response.status, zx::sys::ZX_OK);
2370                assert_eq!(response.reqid, id);
2371
2372                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2373
2374                let (id, tx) = waiting_readers.lock().pop().unwrap();
2375                tx.send(()).unwrap();
2376
2377                reader.read_entries(&mut response).await.unwrap();
2378                assert_eq!(response.status, zx::sys::ZX_OK);
2379                assert_eq!(response.reqid, id);
2380            }
2381        );
2382    }
2383
2384    #[fuchsia::test]
2385    async fn test_groups() {
2386        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2387
2388        futures::join!(
2389            async move {
2390                let block_server = BlockServer::new(
2391                    BLOCK_SIZE,
2392                    Arc::new(MockInterface {
2393                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2394                        ..MockInterface::default()
2395                    }),
2396                );
2397                block_server.handle_requests(stream).await.unwrap();
2398            },
2399            async move {
2400                let (session_proxy, server) = fidl::endpoints::create_proxy();
2401
2402                proxy.open_session(server).unwrap();
2403
2404                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2405                let vmo_id = session_proxy
2406                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2407                    .await
2408                    .unwrap()
2409                    .unwrap();
2410
2411                let mut fifo =
2412                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2413                let (mut reader, mut writer) = fifo.async_io();
2414
2415                writer
2416                    .write_entries(&BlockFifoRequest {
2417                        command: BlockFifoCommand {
2418                            opcode: BlockOpcode::Read.into_primitive(),
2419                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2420                            ..Default::default()
2421                        },
2422                        group: 1,
2423                        vmoid: vmo_id.id,
2424                        length: 1,
2425                        ..Default::default()
2426                    })
2427                    .await
2428                    .unwrap();
2429
2430                writer
2431                    .write_entries(&BlockFifoRequest {
2432                        command: BlockFifoCommand {
2433                            opcode: BlockOpcode::Read.into_primitive(),
2434                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2435                            ..Default::default()
2436                        },
2437                        reqid: 2,
2438                        group: 1,
2439                        vmoid: vmo_id.id,
2440                        length: 1,
2441                        ..Default::default()
2442                    })
2443                    .await
2444                    .unwrap();
2445
2446                let mut response = BlockFifoResponse::default();
2447                reader.read_entries(&mut response).await.unwrap();
2448                assert_eq!(response.status, zx::sys::ZX_OK);
2449                assert_eq!(response.reqid, 2);
2450                assert_eq!(response.group, 1);
2451            }
2452        );
2453    }
2454
2455    #[fuchsia::test]
2456    async fn test_group_error() {
2457        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2458
2459        let counter = Arc::new(AtomicU64::new(0));
2460        let counter_clone = counter.clone();
2461
2462        futures::join!(
2463            async move {
2464                let block_server = BlockServer::new(
2465                    BLOCK_SIZE,
2466                    Arc::new(MockInterface {
2467                        read_hook: Some(Box::new(move |_, _, _, _| {
2468                            counter_clone.fetch_add(1, Ordering::Relaxed);
2469                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2470                        })),
2471                        ..MockInterface::default()
2472                    }),
2473                );
2474                block_server.handle_requests(stream).await.unwrap();
2475            },
2476            async move {
2477                let (session_proxy, server) = fidl::endpoints::create_proxy();
2478
2479                proxy.open_session(server).unwrap();
2480
2481                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2482                let vmo_id = session_proxy
2483                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2484                    .await
2485                    .unwrap()
2486                    .unwrap();
2487
2488                let mut fifo =
2489                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2490                let (mut reader, mut writer) = fifo.async_io();
2491
2492                writer
2493                    .write_entries(&BlockFifoRequest {
2494                        command: BlockFifoCommand {
2495                            opcode: BlockOpcode::Read.into_primitive(),
2496                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2497                            ..Default::default()
2498                        },
2499                        group: 1,
2500                        vmoid: vmo_id.id,
2501                        length: 1,
2502                        ..Default::default()
2503                    })
2504                    .await
2505                    .unwrap();
2506
2507                // Wait until processed.
2508                poll_fn(|cx: &mut Context<'_>| {
2509                    if counter.load(Ordering::Relaxed) == 1 {
2510                        Poll::Ready(())
2511                    } else {
2512                        // Yield to the executor.
2513                        cx.waker().wake_by_ref();
2514                        Poll::Pending
2515                    }
2516                })
2517                .await;
2518
2519                let mut response = BlockFifoResponse::default();
2520                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2521
2522                writer
2523                    .write_entries(&BlockFifoRequest {
2524                        command: BlockFifoCommand {
2525                            opcode: BlockOpcode::Read.into_primitive(),
2526                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2527                            ..Default::default()
2528                        },
2529                        group: 1,
2530                        vmoid: vmo_id.id,
2531                        length: 1,
2532                        ..Default::default()
2533                    })
2534                    .await
2535                    .unwrap();
2536
2537                writer
2538                    .write_entries(&BlockFifoRequest {
2539                        command: BlockFifoCommand {
2540                            opcode: BlockOpcode::Read.into_primitive(),
2541                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2542                            ..Default::default()
2543                        },
2544                        reqid: 2,
2545                        group: 1,
2546                        vmoid: vmo_id.id,
2547                        length: 1,
2548                        ..Default::default()
2549                    })
2550                    .await
2551                    .unwrap();
2552
2553                reader.read_entries(&mut response).await.unwrap();
2554                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2555                assert_eq!(response.reqid, 2);
2556                assert_eq!(response.group, 1);
2557
2558                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2559
2560                // Only the first request should have been processed.
2561                assert_eq!(counter.load(Ordering::Relaxed), 1);
2562            }
2563        );
2564    }
2565
2566    #[fuchsia::test]
2567    async fn test_group_with_two_lasts() {
2568        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2569
2570        let (tx, rx) = oneshot::channel();
2571
2572        futures::join!(
2573            async move {
2574                let rx = Mutex::new(Some(rx));
2575                let block_server = BlockServer::new(
2576                    BLOCK_SIZE,
2577                    Arc::new(MockInterface {
2578                        read_hook: Some(Box::new(move |_, _, _, _| {
2579                            let rx = rx.lock().take().unwrap();
2580                            Box::pin(async {
2581                                let _ = rx.await;
2582                                Ok(())
2583                            })
2584                        })),
2585                        ..MockInterface::default()
2586                    }),
2587                );
2588                block_server.handle_requests(stream).await.unwrap();
2589            },
2590            async move {
2591                let (session_proxy, server) = fidl::endpoints::create_proxy();
2592
2593                proxy.open_session(server).unwrap();
2594
2595                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2596                let vmo_id = session_proxy
2597                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2598                    .await
2599                    .unwrap()
2600                    .unwrap();
2601
2602                let mut fifo =
2603                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2604                let (mut reader, mut writer) = fifo.async_io();
2605
2606                writer
2607                    .write_entries(&BlockFifoRequest {
2608                        command: BlockFifoCommand {
2609                            opcode: BlockOpcode::Read.into_primitive(),
2610                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2611                            ..Default::default()
2612                        },
2613                        reqid: 1,
2614                        group: 1,
2615                        vmoid: vmo_id.id,
2616                        length: 1,
2617                        ..Default::default()
2618                    })
2619                    .await
2620                    .unwrap();
2621
2622                writer
2623                    .write_entries(&BlockFifoRequest {
2624                        command: BlockFifoCommand {
2625                            opcode: BlockOpcode::Read.into_primitive(),
2626                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2627                            ..Default::default()
2628                        },
2629                        reqid: 2,
2630                        group: 1,
2631                        vmoid: vmo_id.id,
2632                        length: 1,
2633                        ..Default::default()
2634                    })
2635                    .await
2636                    .unwrap();
2637
2638                // Send an independent request to flush through the fifo.
2639                writer
2640                    .write_entries(&BlockFifoRequest {
2641                        command: BlockFifoCommand {
2642                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2643                            ..Default::default()
2644                        },
2645                        reqid: 3,
2646                        vmoid: vmo_id.id,
2647                        ..Default::default()
2648                    })
2649                    .await
2650                    .unwrap();
2651
2652                // It should succeed.
2653                let mut response = BlockFifoResponse::default();
2654                reader.read_entries(&mut response).await.unwrap();
2655                assert_eq!(response.status, zx::sys::ZX_OK);
2656                assert_eq!(response.reqid, 3);
2657
2658                // Now release the original request.
2659                tx.send(()).unwrap();
2660
2661                // The response should be for the first message tagged as last, and it should be
2662                // an error because we sent two messages with the LAST marker.
2663                let mut response = BlockFifoResponse::default();
2664                reader.read_entries(&mut response).await.unwrap();
2665                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2666                assert_eq!(response.reqid, 1);
2667                assert_eq!(response.group, 1);
2668            }
2669        );
2670    }
2671
2672    #[fuchsia::test(allow_stalls = false)]
2673    async fn test_requests_dont_block_sessions() {
2674        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2675
2676        let (tx, rx) = oneshot::channel();
2677
2678        fasync::Task::local(async move {
2679            let rx = Mutex::new(Some(rx));
2680            let block_server = BlockServer::new(
2681                BLOCK_SIZE,
2682                Arc::new(MockInterface {
2683                    read_hook: Some(Box::new(move |_, _, _, _| {
2684                        let rx = rx.lock().take().unwrap();
2685                        Box::pin(async {
2686                            let _ = rx.await;
2687                            Ok(())
2688                        })
2689                    })),
2690                    ..MockInterface::default()
2691                }),
2692            );
2693            block_server.handle_requests(stream).await.unwrap();
2694        })
2695        .detach();
2696
2697        let mut fut = pin!(async {
2698            let (session_proxy, server) = fidl::endpoints::create_proxy();
2699
2700            proxy.open_session(server).unwrap();
2701
2702            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2703            let vmo_id = session_proxy
2704                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2705                .await
2706                .unwrap()
2707                .unwrap();
2708
2709            let mut fifo =
2710                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2711            let (mut reader, mut writer) = fifo.async_io();
2712
2713            writer
2714                .write_entries(&BlockFifoRequest {
2715                    command: BlockFifoCommand {
2716                        opcode: BlockOpcode::Read.into_primitive(),
2717                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2718                        ..Default::default()
2719                    },
2720                    reqid: 1,
2721                    group: 1,
2722                    vmoid: vmo_id.id,
2723                    length: 1,
2724                    ..Default::default()
2725                })
2726                .await
2727                .unwrap();
2728
2729            let mut response = BlockFifoResponse::default();
2730            reader.read_entries(&mut response).await.unwrap();
2731            assert_eq!(response.status, zx::sys::ZX_OK);
2732        });
2733
2734        // The response won't come back until we send on `tx`.
2735        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2736
2737        let mut fut2 = pin!(proxy.get_volume_info());
2738
2739        // get_volume_info is set up to stall forever.
2740        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2741
2742        // If we now free up the first future, it should resolve; the stalled call to
2743        // get_volume_info should not block the fifo response.
2744        let _ = tx.send(());
2745
2746        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2747    }
2748
2749    #[fuchsia::test]
2750    async fn test_request_flow_control() {
2751        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2752
2753        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2754        // server will block until that happens.
2755        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2756        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2757        let event_clone = event.clone();
2758        futures::join!(
2759            async move {
2760                let block_server = BlockServer::new(
2761                    BLOCK_SIZE,
2762                    Arc::new(MockInterface {
2763                        read_hook: Some(Box::new(move |_, _, _, _| {
2764                            let event_clone = event_clone.clone();
2765                            Box::pin(async move {
2766                                if !event_clone.1.load(Ordering::SeqCst) {
2767                                    event_clone.0.listen().await;
2768                                }
2769                                Ok(())
2770                            })
2771                        })),
2772                        ..MockInterface::default()
2773                    }),
2774                );
2775                block_server.handle_requests(stream).await.unwrap();
2776            },
2777            async move {
2778                let (session_proxy, server) = fidl::endpoints::create_proxy();
2779
2780                proxy.open_session(server).unwrap();
2781
2782                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2783                let vmo_id = session_proxy
2784                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2785                    .await
2786                    .unwrap()
2787                    .unwrap();
2788
2789                let mut fifo =
2790                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2791                let (mut reader, mut writer) = fifo.async_io();
2792
2793                for i in 0..MAX_REQUESTS {
2794                    writer
2795                        .write_entries(&BlockFifoRequest {
2796                            command: BlockFifoCommand {
2797                                opcode: BlockOpcode::Read.into_primitive(),
2798                                ..Default::default()
2799                            },
2800                            reqid: (i + 1) as u32,
2801                            dev_offset: i,
2802                            vmoid: vmo_id.id,
2803                            length: 1,
2804                            ..Default::default()
2805                        })
2806                        .await
2807                        .unwrap();
2808                }
2809                assert!(
2810                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2811                        command: BlockFifoCommand {
2812                            opcode: BlockOpcode::Read.into_primitive(),
2813                            ..Default::default()
2814                        },
2815                        reqid: u32::MAX,
2816                        dev_offset: MAX_REQUESTS,
2817                        vmoid: vmo_id.id,
2818                        length: 1,
2819                        ..Default::default()
2820                    })))
2821                    .is_pending()
2822                );
2823                // OK, let the server start to process.
2824                event.1.store(true, Ordering::SeqCst);
2825                event.0.notify(usize::MAX);
2826                // For each entry we read, make sure we can write a new one in.
2827                let mut finished_reqids = vec![];
2828                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2829                    let mut response = BlockFifoResponse::default();
2830                    reader.read_entries(&mut response).await.unwrap();
2831                    assert_eq!(response.status, zx::sys::ZX_OK);
2832                    finished_reqids.push(response.reqid);
2833                    writer
2834                        .write_entries(&BlockFifoRequest {
2835                            command: BlockFifoCommand {
2836                                opcode: BlockOpcode::Read.into_primitive(),
2837                                ..Default::default()
2838                            },
2839                            reqid: (i + 1) as u32,
2840                            dev_offset: i,
2841                            vmoid: vmo_id.id,
2842                            length: 1,
2843                            ..Default::default()
2844                        })
2845                        .await
2846                        .unwrap();
2847                }
2848                let mut response = BlockFifoResponse::default();
2849                for _ in 0..MAX_REQUESTS {
2850                    reader.read_entries(&mut response).await.unwrap();
2851                    assert_eq!(response.status, zx::sys::ZX_OK);
2852                    finished_reqids.push(response.reqid);
2853                }
2854                // Verify that we got a response for each request.  Note that we can't assume FIFO
2855                // ordering.
2856                finished_reqids.sort();
2857                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2858                let mut i = 1;
2859                for reqid in finished_reqids {
2860                    assert_eq!(reqid, i);
2861                    i += 1;
2862                }
2863            }
2864        );
2865    }
2866
2867    #[fuchsia::test]
2868    async fn test_passthrough_io_with_fixed_map() {
2869        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2870
2871        let expected_op = Arc::new(Mutex::new(None));
2872        let expected_op_clone = expected_op.clone();
2873        futures::join!(
2874            async {
2875                let block_server = BlockServer::new(
2876                    BLOCK_SIZE,
2877                    Arc::new(IoMockInterface {
2878                        return_errors: false,
2879                        do_checks: true,
2880                        expected_op: expected_op_clone,
2881                    }),
2882                );
2883                block_server.handle_requests(stream).await.unwrap();
2884            },
2885            async move {
2886                let (session_proxy, server) = fidl::endpoints::create_proxy();
2887
2888                let mapping = fblock::BlockOffsetMapping {
2889                    source_block_offset: 0,
2890                    target_block_offset: 10,
2891                    length: 20,
2892                };
2893                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2894
2895                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2896                let vmo_id = session_proxy
2897                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2898                    .await
2899                    .unwrap()
2900                    .unwrap();
2901
2902                let mut fifo =
2903                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2904                let (mut reader, mut writer) = fifo.async_io();
2905
2906                // READ
2907                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2908                writer
2909                    .write_entries(&BlockFifoRequest {
2910                        command: BlockFifoCommand {
2911                            opcode: BlockOpcode::Read.into_primitive(),
2912                            ..Default::default()
2913                        },
2914                        vmoid: vmo_id.id,
2915                        dev_offset: 1,
2916                        length: 2,
2917                        vmo_offset: 3,
2918                        ..Default::default()
2919                    })
2920                    .await
2921                    .unwrap();
2922
2923                let mut response = BlockFifoResponse::default();
2924                reader.read_entries(&mut response).await.unwrap();
2925                assert_eq!(response.status, zx::sys::ZX_OK);
2926
2927                // WRITE
2928                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2929                writer
2930                    .write_entries(&BlockFifoRequest {
2931                        command: BlockFifoCommand {
2932                            opcode: BlockOpcode::Write.into_primitive(),
2933                            ..Default::default()
2934                        },
2935                        vmoid: vmo_id.id,
2936                        dev_offset: 4,
2937                        length: 5,
2938                        vmo_offset: 6,
2939                        ..Default::default()
2940                    })
2941                    .await
2942                    .unwrap();
2943
2944                reader.read_entries(&mut response).await.unwrap();
2945                assert_eq!(response.status, zx::sys::ZX_OK);
2946
2947                // FLUSH
2948                *expected_op.lock() = Some(ExpectedOp::Flush);
2949                writer
2950                    .write_entries(&BlockFifoRequest {
2951                        command: BlockFifoCommand {
2952                            opcode: BlockOpcode::Flush.into_primitive(),
2953                            ..Default::default()
2954                        },
2955                        ..Default::default()
2956                    })
2957                    .await
2958                    .unwrap();
2959
2960                reader.read_entries(&mut response).await.unwrap();
2961                assert_eq!(response.status, zx::sys::ZX_OK);
2962
2963                // TRIM
2964                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2965                writer
2966                    .write_entries(&BlockFifoRequest {
2967                        command: BlockFifoCommand {
2968                            opcode: BlockOpcode::Trim.into_primitive(),
2969                            ..Default::default()
2970                        },
2971                        dev_offset: 7,
2972                        length: 3,
2973                        ..Default::default()
2974                    })
2975                    .await
2976                    .unwrap();
2977
2978                reader.read_entries(&mut response).await.unwrap();
2979                assert_eq!(response.status, zx::sys::ZX_OK);
2980
2981                // READ past window
2982                *expected_op.lock() = None;
2983                writer
2984                    .write_entries(&BlockFifoRequest {
2985                        command: BlockFifoCommand {
2986                            opcode: BlockOpcode::Read.into_primitive(),
2987                            ..Default::default()
2988                        },
2989                        vmoid: vmo_id.id,
2990                        dev_offset: 19,
2991                        length: 2,
2992                        vmo_offset: 3,
2993                        ..Default::default()
2994                    })
2995                    .await
2996                    .unwrap();
2997
2998                reader.read_entries(&mut response).await.unwrap();
2999                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3000
3001                std::mem::drop(proxy);
3002            }
3003        );
3004    }
3005
3006    #[fuchsia::test]
3007    fn operation_map() {
3008        fn expect_map_result(
3009            mut operation: Operation,
3010            mapping: Option<BlockOffsetMapping>,
3011            max_blocks: Option<NonZero<u32>>,
3012            expected_operations: Vec<Operation>,
3013        ) {
3014            let mut ops = vec![];
3015            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
3016                ops.push(operation);
3017                operation = remainder;
3018            }
3019            ops.push(operation);
3020            assert_eq!(ops, expected_operations);
3021        }
3022
3023        // No limits
3024        expect_map_result(
3025            Operation::Read {
3026                device_block_offset: 10,
3027                block_count: 200,
3028                _unused: 0,
3029                vmo_offset: 0,
3030                options: ReadOptions::default(),
3031            },
3032            None,
3033            None,
3034            vec![Operation::Read {
3035                device_block_offset: 10,
3036                block_count: 200,
3037                _unused: 0,
3038                vmo_offset: 0,
3039                options: ReadOptions::default(),
3040            }],
3041        );
3042
3043        // Max block count
3044        expect_map_result(
3045            Operation::Read {
3046                device_block_offset: 10,
3047                block_count: 200,
3048                _unused: 0,
3049                vmo_offset: 0,
3050                options: ReadOptions::default(),
3051            },
3052            None,
3053            NonZero::new(120),
3054            vec![
3055                Operation::Read {
3056                    device_block_offset: 10,
3057                    block_count: 120,
3058                    _unused: 0,
3059                    vmo_offset: 0,
3060                    options: ReadOptions::default(),
3061                },
3062                Operation::Read {
3063                    device_block_offset: 130,
3064                    block_count: 80,
3065                    _unused: 0,
3066                    vmo_offset: 120 * 512,
3067                    options: ReadOptions::default(),
3068                },
3069            ],
3070        );
3071        expect_map_result(
3072            Operation::Write {
3073                device_block_offset: 10,
3074                block_count: 200,
3075                _unused: 0,
3076                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3077                vmo_offset: 0,
3078            },
3079            None,
3080            NonZero::new(120),
3081            vec![
3082                Operation::Write {
3083                    device_block_offset: 10,
3084                    block_count: 120,
3085                    _unused: 0,
3086                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3087                    vmo_offset: 0,
3088                },
3089                Operation::Write {
3090                    device_block_offset: 130,
3091                    block_count: 80,
3092                    _unused: 0,
3093                    options: WriteOptions::default(),
3094                    vmo_offset: 120 * 512,
3095                },
3096            ],
3097        );
3098        expect_map_result(
3099            Operation::Trim { device_block_offset: 10, block_count: 200 },
3100            None,
3101            NonZero::new(120),
3102            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3103        );
3104
3105        // Remapping + Max block count
3106        expect_map_result(
3107            Operation::Read {
3108                device_block_offset: 10,
3109                block_count: 200,
3110                _unused: 0,
3111                vmo_offset: 0,
3112                options: ReadOptions::default(),
3113            },
3114            Some(BlockOffsetMapping {
3115                source_block_offset: 10,
3116                target_block_offset: 100,
3117                length: 200,
3118            }),
3119            NonZero::new(120),
3120            vec![
3121                Operation::Read {
3122                    device_block_offset: 100,
3123                    block_count: 120,
3124                    _unused: 0,
3125                    vmo_offset: 0,
3126                    options: ReadOptions::default(),
3127                },
3128                Operation::Read {
3129                    device_block_offset: 220,
3130                    block_count: 80,
3131                    _unused: 0,
3132                    vmo_offset: 120 * 512,
3133                    options: ReadOptions::default(),
3134                },
3135            ],
3136        );
3137        expect_map_result(
3138            Operation::Write {
3139                device_block_offset: 10,
3140                block_count: 200,
3141                _unused: 0,
3142                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3143                vmo_offset: 0,
3144            },
3145            Some(BlockOffsetMapping {
3146                source_block_offset: 10,
3147                target_block_offset: 100,
3148                length: 200,
3149            }),
3150            NonZero::new(120),
3151            vec![
3152                Operation::Write {
3153                    device_block_offset: 100,
3154                    block_count: 120,
3155                    _unused: 0,
3156                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3157                    vmo_offset: 0,
3158                },
3159                Operation::Write {
3160                    device_block_offset: 220,
3161                    block_count: 80,
3162                    _unused: 0,
3163                    options: WriteOptions::default(),
3164                    vmo_offset: 120 * 512,
3165                },
3166            ],
3167        );
3168        expect_map_result(
3169            Operation::Trim { device_block_offset: 10, block_count: 200 },
3170            Some(BlockOffsetMapping {
3171                source_block_offset: 10,
3172                target_block_offset: 100,
3173                length: 200,
3174            }),
3175            NonZero::new(120),
3176            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3177        );
3178    }
3179}