block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[cfg(test)]
25mod decompression_tests;
26
27pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
28
29type TraceFlowId = Option<NonZero<u64>>;
30
31#[derive(Clone)]
32pub enum DeviceInfo {
33    Block(BlockInfo),
34    Partition(PartitionInfo),
35}
36
37impl DeviceInfo {
38    pub fn device_flags(&self) -> fblock::DeviceFlag {
39        match self {
40            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
41            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
42        }
43    }
44
45    pub fn block_count(&self) -> Option<u64> {
46        match self {
47            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
48            Self::Partition(PartitionInfo { block_range, .. }) => {
49                block_range.as_ref().map(|range| range.end - range.start)
50            }
51        }
52    }
53
54    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
55        match self {
56            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
57            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
58                max_transfer_blocks.clone()
59            }
60        }
61    }
62
63    fn max_transfer_size(&self, block_size: u32) -> u32 {
64        if let Some(max_blocks) = self.max_transfer_blocks() {
65            max_blocks.get() * block_size
66        } else {
67            MAX_TRANSFER_UNBOUNDED
68        }
69    }
70}
71
72/// Information associated with non-partition block devices.
73#[derive(Clone, Default)]
74pub struct BlockInfo {
75    pub device_flags: fblock::DeviceFlag,
76    pub block_count: u64,
77    pub max_transfer_blocks: Option<NonZero<u32>>,
78}
79
80/// Information associated with a block device that is also a partition.
81#[derive(Clone, Default)]
82pub struct PartitionInfo {
83    /// The device flags reported by the underlying device.
84    pub device_flags: fblock::DeviceFlag,
85    pub max_transfer_blocks: Option<NonZero<u32>>,
86    /// If `block_range` is None, the partition is a volume and may not be contiguous.
87    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
88    /// slices and use that (along with the slice and block sizes) to determine the block count.
89    pub block_range: Option<Range<u64>>,
90    pub type_guid: [u8; 16],
91    pub instance_guid: [u8; 16],
92    pub name: String,
93    pub flags: u64,
94}
95
96/// We internally keep track of active requests, so that when the server is torn down, we can
97/// deallocate all of the resources for pending requests.
98struct ActiveRequest<S> {
99    session: S,
100    group_or_request: GroupOrRequest,
101    trace_flow_id: TraceFlowId,
102    _epoch_guard: EpochGuard<'static>,
103    status: zx::Status,
104    count: u32,
105    req_id: Option<u32>,
106    decompression_info: Option<DecompressionInfo>,
107}
108
109struct DecompressionInfo {
110    // This is the range of compressed bytes in receiving buffer.
111    compressed_range: Range<usize>,
112
113    // This is the range in the target VMO where we will write uncompressed bytes.
114    uncompressed_range: Range<u64>,
115
116    bytes_so_far: u64,
117    mapping: Arc<VmoMapping>,
118    buffer: Option<Buffer<'static>>,
119}
120
121impl DecompressionInfo {
122    /// Returns the uncompressed slice.
123    fn uncompressed_slice(&self) -> *mut [u8] {
124        std::ptr::slice_from_raw_parts_mut(
125            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
126            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
127        )
128    }
129}
130
131pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
132
133impl<S> Default for ActiveRequests<S> {
134    fn default() -> Self {
135        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
136    }
137}
138
139impl<S> ActiveRequests<S> {
140    fn complete_and_take_response(
141        &self,
142        request_id: RequestId,
143        status: zx::Status,
144    ) -> Option<(S, BlockFifoResponse)> {
145        self.0.lock().complete_and_take_response(request_id, status)
146    }
147
148    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
149        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
150    }
151}
152
153struct ActiveRequestsInner<S> {
154    requests: Slab<ActiveRequest<S>>,
155}
156
157// Keeps track of all the requests that are currently being processed
158impl<S> ActiveRequestsInner<S> {
159    /// Completes a request.
160    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
161        let group = &mut self.requests[request_id.0];
162
163        group.count = group.count.checked_sub(1).unwrap();
164        if status != zx::Status::OK && group.status == zx::Status::OK {
165            group.status = status
166        }
167
168        fuchsia_trace::duration!(
169            "storage",
170            "block_server::finish_transaction",
171            "request_id" => request_id.0,
172            "group_completed" => group.count == 0,
173            "status" => status.into_raw());
174        if let Some(trace_flow_id) = group.trace_flow_id {
175            fuchsia_trace::flow_step!(
176                "storage",
177                "block_server::finish_request",
178                trace_flow_id.get().into()
179            );
180        }
181
182        if group.count == 0
183            && group.status == zx::Status::OK
184            && let Some(info) = &mut group.decompression_info
185        {
186            thread_local! {
187                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
188                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
189            }
190            DECOMPRESSOR.with(|decompressor| {
191                // SAFETY: We verified `uncompressed_range` fits within our mapping.
192                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
193                let mut decompressor = decompressor.borrow_mut();
194                if let Err(error) = decompressor.decompress_to_buffer(
195                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
196                    target_slice,
197                ) {
198                    log::warn!(error:?; "Decompression error");
199                    group.status = zx::Status::IO_DATA_INTEGRITY;
200                };
201            });
202        }
203    }
204
205    /// Takes the response if all requests are finished.
206    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
207        let group = &self.requests[request_id.0];
208        match group.req_id {
209            Some(reqid) if group.count == 0 => {
210                let group = self.requests.remove(request_id.0);
211                Some((
212                    group.session,
213                    BlockFifoResponse {
214                        status: group.status.into_raw(),
215                        reqid,
216                        group: group.group_or_request.group_id().unwrap_or(0),
217                        ..Default::default()
218                    },
219                ))
220            }
221            _ => None,
222        }
223    }
224
225    /// Competes the request and returns a response if the request group is finished.
226    fn complete_and_take_response(
227        &mut self,
228        request_id: RequestId,
229        status: zx::Status,
230    ) -> Option<(S, BlockFifoResponse)> {
231        self.complete(request_id, status);
232        self.take_response(request_id)
233    }
234}
235
236/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
237/// cbindgen:no-export
238pub struct BlockServer<SM> {
239    block_size: u32,
240    session_manager: Arc<SM>,
241}
242
243/// A single entry in `[OffsetMap]`.
244#[derive(Debug)]
245pub struct BlockOffsetMapping {
246    source_block_offset: u64,
247    target_block_offset: u64,
248    length: u64,
249}
250
251impl BlockOffsetMapping {
252    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
253        blocks.0 >= self.source_block_offset
254            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
255    }
256}
257
258impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
259    type Error = zx::Status;
260
261    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
262        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
263        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
264        Ok(Self {
265            source_block_offset: wire.source_block_offset,
266            target_block_offset: wire.target_block_offset,
267            length: wire.length,
268        })
269    }
270}
271
272/// Remaps the offset of block requests based on an internal map, and truncates long requests.
273pub struct OffsetMap {
274    mapping: Option<BlockOffsetMapping>,
275    max_transfer_blocks: Option<NonZero<u32>>,
276}
277
278impl OffsetMap {
279    /// An OffsetMap that remaps requests.
280    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
281        Self { mapping: Some(mapping), max_transfer_blocks }
282    }
283
284    /// An OffsetMap that just enforces maximum request sizes.
285    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
286        Self { mapping: None, max_transfer_blocks }
287    }
288
289    pub fn is_empty(&self) -> bool {
290        self.mapping.is_none()
291    }
292
293    fn mapping(&self) -> Option<&BlockOffsetMapping> {
294        self.mapping.as_ref()
295    }
296
297    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
298        self.max_transfer_blocks
299    }
300}
301
302// Methods take Arc<Self> rather than &self because of
303// https://github.com/rust-lang/rust/issues/42940.
304pub trait SessionManager: 'static {
305    const SUPPORTS_DECOMPRESSION: bool;
306
307    type Session;
308
309    fn on_attach_vmo(
310        self: Arc<Self>,
311        vmo: &Arc<zx::Vmo>,
312    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
313
314    /// Creates a new session to handle `stream`.
315    /// The returned future should run until the session completes, for example when the client end
316    /// closes.
317    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
318    fn open_session(
319        self: Arc<Self>,
320        stream: fblock::SessionRequestStream,
321        offset_map: OffsetMap,
322        block_size: u32,
323    ) -> impl Future<Output = Result<(), Error>> + Send;
324
325    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
326    fn get_info(&self) -> Cow<'_, DeviceInfo>;
327
328    /// Called to handle the GetVolumeInfo FIDL call.
329    fn get_volume_info(
330        &self,
331    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
332    {
333        async { Err(zx::Status::NOT_SUPPORTED) }
334    }
335
336    /// Called to handle the QuerySlices FIDL call.
337    fn query_slices(
338        &self,
339        _start_slices: &[u64],
340    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
341        async { Err(zx::Status::NOT_SUPPORTED) }
342    }
343
344    /// Called to handle the Shrink FIDL call.
345    fn extend(
346        &self,
347        _start_slice: u64,
348        _slice_count: u64,
349    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
350        async { Err(zx::Status::NOT_SUPPORTED) }
351    }
352
353    /// Called to handle the Shrink FIDL call.
354    fn shrink(
355        &self,
356        _start_slice: u64,
357        _slice_count: u64,
358    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
359        async { Err(zx::Status::NOT_SUPPORTED) }
360    }
361
362    /// Returns the active requests.
363    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
364}
365
366pub trait IntoSessionManager {
367    type SM: SessionManager;
368
369    fn into_session_manager(self) -> Arc<Self::SM>;
370}
371
372impl<SM: SessionManager> BlockServer<SM> {
373    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
374        Self { block_size, session_manager: session_manager.into_session_manager() }
375    }
376
377    pub fn session_manager(&self) -> &SM {
378        self.session_manager.as_ref()
379    }
380
381    /// Called to process requests for fuchsia.storage.block.Block.
382    pub async fn handle_requests(
383        &self,
384        mut requests: fblock::BlockRequestStream,
385    ) -> Result<(), Error> {
386        let scope = fasync::Scope::new();
387        loop {
388            match requests.try_next().await {
389                Ok(Some(request)) => {
390                    if let Some(session) = self.handle_request(request).await? {
391                        scope.spawn(session.map(|_| ()));
392                    }
393                }
394                Ok(None) => break,
395                Err(err) => log::warn!(err:?; "Invalid request"),
396            }
397        }
398        scope.await;
399        Ok(())
400    }
401
402    /// Processes a Block request.  If a new session task is created in response to the request,
403    /// it is returned.
404    async fn handle_request(
405        &self,
406        request: fblock::BlockRequest,
407    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
408        match request {
409            fblock::BlockRequest::GetInfo { responder } => {
410                let info = self.device_info();
411                let max_transfer_size = info.max_transfer_size(self.block_size);
412                let (block_count, mut flags) = match info.as_ref() {
413                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
414                        (*block_count, *device_flags)
415                    }
416                    DeviceInfo::Partition(partition_info) => {
417                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
418                            range.end - range.start
419                        } else {
420                            let volume_info = self.session_manager.get_volume_info().await?;
421                            volume_info.0.slice_size * volume_info.1.partition_slice_count
422                                / self.block_size as u64
423                        };
424                        (block_count, partition_info.device_flags)
425                    }
426                };
427                if SM::SUPPORTS_DECOMPRESSION {
428                    flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
429                }
430                responder.send(Ok(&fblock::BlockInfo {
431                    block_count,
432                    block_size: self.block_size,
433                    max_transfer_size,
434                    flags,
435                }))?;
436            }
437            fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
438                let info = self.device_info();
439                return Ok(Some(self.session_manager.clone().open_session(
440                    session.into_stream(),
441                    OffsetMap::empty(info.max_transfer_blocks()),
442                    self.block_size,
443                )));
444            }
445            fblock::BlockRequest::OpenSessionWithOffsetMap {
446                session,
447                mapping,
448                control_handle: _,
449            } => {
450                let info = self.device_info();
451                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
452                    Ok(m) => m,
453                    Err(status) => {
454                        session.close_with_epitaph(status)?;
455                        return Ok(None);
456                    }
457                };
458                if let Some(max) = info.block_count() {
459                    if initial_mapping.target_block_offset + initial_mapping.length > max {
460                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
461                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
462                        return Ok(None);
463                    }
464                }
465                return Ok(Some(self.session_manager.clone().open_session(
466                    session.into_stream(),
467                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
468                    self.block_size,
469                )));
470            }
471            fblock::BlockRequest::GetTypeGuid { responder } => {
472                let info = self.device_info();
473                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
474                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
475                    guid.value.copy_from_slice(&partition_info.type_guid);
476                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
477                } else {
478                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
479                }
480            }
481            fblock::BlockRequest::GetInstanceGuid { responder } => {
482                let info = self.device_info();
483                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
484                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
485                    guid.value.copy_from_slice(&partition_info.instance_guid);
486                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
487                } else {
488                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
489                }
490            }
491            fblock::BlockRequest::GetName { responder } => {
492                let info = self.device_info();
493                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
494                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
495                } else {
496                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497                }
498            }
499            fblock::BlockRequest::GetMetadata { responder } => {
500                let info = self.device_info();
501                if let DeviceInfo::Partition(info) = info.as_ref() {
502                    let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503                    type_guid.value.copy_from_slice(&info.type_guid);
504                    let mut instance_guid =
505                        fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
506                    instance_guid.value.copy_from_slice(&info.instance_guid);
507                    responder.send(Ok(&fblock::BlockGetMetadataResponse {
508                        name: Some(info.name.clone()),
509                        type_guid: Some(type_guid),
510                        instance_guid: Some(instance_guid),
511                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
512                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
513                        flags: Some(info.flags),
514                        ..Default::default()
515                    }))?;
516                } else {
517                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
518                }
519            }
520            fblock::BlockRequest::QuerySlices { responder, start_slices } => {
521                match self.session_manager.query_slices(&start_slices).await {
522                    Ok(mut results) => {
523                        let results_len = results.len();
524                        assert!(results_len <= 16);
525                        results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
526                        responder.send(
527                            zx::sys::ZX_OK,
528                            &results.try_into().unwrap(),
529                            results_len as u64,
530                        )?;
531                    }
532                    Err(s) => {
533                        responder.send(
534                            s.into_raw(),
535                            &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
536                            0,
537                        )?;
538                    }
539                }
540            }
541            fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
542                match self.session_manager.get_volume_info().await {
543                    Ok((manager_info, volume_info)) => {
544                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
545                    }
546                    Err(s) => responder.send(s.into_raw(), None, None)?,
547                }
548            }
549            fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
550                responder.send(
551                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
552                        .into_raw(),
553                )?;
554            }
555            fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
556                responder.send(
557                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
558                        .into_raw(),
559                )?;
560            }
561            fblock::BlockRequest::Destroy { responder, .. } => {
562                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
563            }
564        }
565        Ok(None)
566    }
567
568    fn device_info(&self) -> Cow<'_, DeviceInfo> {
569        self.session_manager.get_info()
570    }
571}
572
573struct SessionHelper<SM: SessionManager> {
574    session_manager: Arc<SM>,
575    offset_map: OffsetMap,
576    block_size: u32,
577    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
578    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
579}
580
581struct VmoMapping {
582    base: usize,
583    size: usize,
584}
585
586impl VmoMapping {
587    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
588        let size = vmo.get_size().unwrap() as usize;
589        Ok(Arc::new(Self {
590            base: fuchsia_runtime::vmar_root_self()
591                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
592                .inspect_err(|error| {
593                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
594                })?,
595            size,
596        }))
597    }
598}
599
600impl Drop for VmoMapping {
601    fn drop(&mut self) {
602        // SAFETY: We mapped this in `VmoMapping::new`.
603        unsafe {
604            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
605        }
606    }
607}
608
609impl<SM: SessionManager> SessionHelper<SM> {
610    fn new(
611        session_manager: Arc<SM>,
612        offset_map: OffsetMap,
613        block_size: u32,
614    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
615        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
616        Ok((
617            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
618            fifo,
619        ))
620    }
621
622    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
623        match request {
624            fblock::SessionRequest::GetFifo { responder } => {
625                let rights = zx::Rights::TRANSFER
626                    | zx::Rights::READ
627                    | zx::Rights::WRITE
628                    | zx::Rights::SIGNAL
629                    | zx::Rights::WAIT;
630                match self.peer_fifo.duplicate_handle(rights) {
631                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
632                    Err(s) => responder.send(Err(s.into_raw()))?,
633                }
634                Ok(())
635            }
636            fblock::SessionRequest::AttachVmo { vmo, responder } => {
637                let vmo = Arc::new(vmo);
638                let vmo_id = {
639                    let mut vmos = self.vmos.lock();
640                    if vmos.len() == u16::MAX as usize {
641                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
642                        return Ok(());
643                    } else {
644                        let vmo_id = match vmos.last_entry() {
645                            None => 1,
646                            Some(o) => {
647                                o.key().checked_add(1).unwrap_or_else(|| {
648                                    let mut vmo_id = 1;
649                                    // Find the first gap...
650                                    for (&id, _) in &*vmos {
651                                        if id > vmo_id {
652                                            break;
653                                        }
654                                        vmo_id = id + 1;
655                                    }
656                                    vmo_id
657                                })
658                            }
659                        };
660                        vmos.insert(vmo_id, (vmo.clone(), None));
661                        vmo_id
662                    }
663                };
664                self.session_manager.clone().on_attach_vmo(&vmo).await?;
665                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
666                Ok(())
667            }
668            fblock::SessionRequest::Close { responder } => {
669                responder.send(Ok(()))?;
670                Err(anyhow!("Closed"))
671            }
672        }
673    }
674
675    /// Decodes `request`.
676    fn decode_fifo_request(
677        &self,
678        session: SM::Session,
679        request: &BlockFifoRequest,
680    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
681        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
682
683        let request_bytes = request.length as u64 * self.block_size as u64;
684
685        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
686            .ok_or(zx::Status::INVALID_ARGS)
687            .and_then(|code| {
688                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
689                    if code != BlockOpcode::Read {
690                        return Err(zx::Status::INVALID_ARGS);
691                    }
692                    if !SM::SUPPORTS_DECOMPRESSION {
693                        return Err(zx::Status::NOT_SUPPORTED);
694                    }
695                }
696                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
697                    if request.length == 0 {
698                        return Err(zx::Status::INVALID_ARGS);
699                    }
700                    // Make sure the end offsets won't wrap.
701                    if request.dev_offset.checked_add(request.length as u64).is_none()
702                        || (code != BlockOpcode::Trim
703                            && request_bytes.checked_add(request.vmo_offset).is_none())
704                    {
705                        return Err(zx::Status::OUT_OF_RANGE);
706                    }
707                }
708                Ok(match code {
709                    BlockOpcode::Read => Operation::Read {
710                        device_block_offset: request.dev_offset,
711                        block_count: request.length,
712                        _unused: 0,
713                        vmo_offset: request
714                            .vmo_offset
715                            .checked_mul(self.block_size as u64)
716                            .ok_or(zx::Status::OUT_OF_RANGE)?,
717                        options: ReadOptions {
718                            inline_crypto: InlineCryptoOptions {
719                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
720                                dun: request.dun,
721                                slot: request.slot,
722                            },
723                        },
724                    },
725                    BlockOpcode::Write => {
726                        let mut options = WriteOptions {
727                            inline_crypto: InlineCryptoOptions {
728                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
729                                dun: request.dun,
730                                slot: request.slot,
731                            },
732                            ..WriteOptions::default()
733                        };
734                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
735                            options.flags |= WriteFlags::FORCE_ACCESS;
736                        }
737                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
738                            options.flags |= WriteFlags::PRE_BARRIER;
739                        }
740                        Operation::Write {
741                            device_block_offset: request.dev_offset,
742                            block_count: request.length,
743                            _unused: 0,
744                            options,
745                            vmo_offset: request
746                                .vmo_offset
747                                .checked_mul(self.block_size as u64)
748                                .ok_or(zx::Status::OUT_OF_RANGE)?,
749                        }
750                    }
751                    BlockOpcode::Flush => Operation::Flush,
752                    BlockOpcode::Trim => Operation::Trim {
753                        device_block_offset: request.dev_offset,
754                        block_count: request.length,
755                    },
756                    BlockOpcode::CloseVmo => Operation::CloseVmo,
757                })
758            });
759
760        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
761            GroupOrRequest::Group(request.group)
762        } else {
763            GroupOrRequest::Request(request.reqid)
764        };
765
766        let mut active_requests = self.session_manager.active_requests().0.lock();
767        let mut request_id = None;
768
769        // Multiple Block I/O request may be sent as a group.
770        // Notes:
771        // - the group is identified by the group id in the request
772        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
773        //   flag is set.
774        // - when processing a request of a group fails, subsequent requests of that
775        //   group will not be processed.
776        // - decompression is a special case, see block-fifo.h for semantics.
777        //
778        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
779        if group_or_request.is_group() {
780            // Search for an existing entry that matches this group.  NOTE: This is a potentially
781            // expensive way to find a group (it's iterating over all slots in the active-requests
782            // slab).  This can be optimised easily should we need to.
783            for (key, group) in &mut active_requests.requests {
784                if group.group_or_request == group_or_request {
785                    if group.req_id.is_some() {
786                        // We have already received a request tagged as last.
787                        if group.status == zx::Status::OK {
788                            group.status = zx::Status::INVALID_ARGS;
789                        }
790                        // Ignore this request.
791                        return Err(None);
792                    }
793                    // See if this is a continuation of a decompressed read.
794                    if group.status == zx::Status::OK
795                        && let Some(info) = &mut group.decompression_info
796                    {
797                        if let Ok(Operation::Read {
798                            device_block_offset,
799                            mut block_count,
800                            options,
801                            vmo_offset: 0,
802                            ..
803                        }) = operation
804                        {
805                            let remaining_bytes = info
806                                .compressed_range
807                                .end
808                                .next_multiple_of(self.block_size as usize)
809                                as u64
810                                - info.bytes_so_far;
811                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
812                                || request.total_compressed_bytes != 0
813                                || request.uncompressed_bytes != 0
814                                || request.compressed_prefix_bytes != 0
815                                || (flags.contains(BlockIoFlag::GROUP_LAST)
816                                    && info.bytes_so_far + request_bytes
817                                        < info.compressed_range.end as u64)
818                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
819                                    && request_bytes >= remaining_bytes)
820                            {
821                                group.status = zx::Status::INVALID_ARGS;
822                            } else {
823                                // We are tolerant of `block_count` being more than we actually
824                                // need.  This can happen if the client is working with a larger
825                                // block size than the device block size.  For example, if Blobfs
826                                // has a 8192 byte block size, but the device might has a 512 byte
827                                // block size, it can ask for a multiple of 16 blocks, when fewer
828                                // than that might actually be required to hold the compressed data.
829                                // It is easier for us to tolerate this here than to get Blobfs to
830                                // change to pass only the blocks that are required.
831                                if request_bytes > remaining_bytes {
832                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
833                                }
834
835                                operation = Ok(Operation::ContinueDecompressedRead {
836                                    offset: info.bytes_so_far,
837                                    device_block_offset,
838                                    block_count,
839                                    options,
840                                });
841
842                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
843                            }
844                        } else {
845                            group.status = zx::Status::INVALID_ARGS;
846                        }
847                    }
848                    if flags.contains(BlockIoFlag::GROUP_LAST) {
849                        group.req_id = Some(request.reqid);
850                        // If the group has had an error, there is no point trying to issue this
851                        // request.
852                        if group.status != zx::Status::OK {
853                            operation = Err(group.status);
854                        }
855                    } else if group.status != zx::Status::OK {
856                        // The group has already encountered an error, so there is no point trying
857                        // to issue this request.
858                        return Err(None);
859                    }
860                    request_id = Some(RequestId(key));
861                    group.count += 1;
862                    break;
863                }
864            }
865        }
866
867        let is_single_request =
868            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
869
870        let mut decompression_info = None;
871        let vmo = match operation {
872            Ok(Operation::Read {
873                device_block_offset,
874                mut block_count,
875                options,
876                vmo_offset,
877                ..
878            }) => match self.vmos.lock().get_mut(&request.vmoid) {
879                Some((vmo, mapping)) => {
880                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
881                        let compressed_range = request.compressed_prefix_bytes as usize
882                            ..request.compressed_prefix_bytes as usize
883                                + request.total_compressed_bytes as usize;
884                        let required_buffer_size =
885                            compressed_range.end.next_multiple_of(self.block_size as usize);
886
887                        // Validate the initial decompression request.
888                        if compressed_range.start >= compressed_range.end
889                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
890                            || (is_single_request && request_bytes < compressed_range.end as u64)
891                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
892                        {
893                            Err(zx::Status::INVALID_ARGS)
894                        } else {
895                            // We are tolerant of `block_count` being more than we actually need.
896                            // This can happen if the client is working in a larger block size than
897                            // the device block size.  For example, Blobfs has a 8192 byte block
898                            // size, but the device might have a 512 byte block size.  It is easier
899                            // for us to tolerate this here than to get Blobfs to change to pass
900                            // only the blocks that are required.
901                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
902                                block_count =
903                                    (required_buffer_size / self.block_size as usize) as u32;
904                                required_buffer_size as u64
905                            } else {
906                                request_bytes
907                            };
908
909                            // To decompress, we need to have the target VMO mapped (cached).
910                            match mapping {
911                                Some(mapping) => Ok(mapping.clone()),
912                                None => {
913                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
914                                }
915                            }
916                            .and_then(|mapping| {
917                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
918                                // range.
919                                if vmo_offset
920                                    .checked_add(request.uncompressed_bytes as u64)
921                                    .is_some_and(|end| end <= mapping.size as u64)
922                                {
923                                    Ok(mapping)
924                                } else {
925                                    Err(zx::Status::OUT_OF_RANGE)
926                                }
927                            })
928                            .map(|mapping| {
929                                // Convert the operation into a `StartDecompressedRead`
930                                // operation. For non-fragmented requests, this will be the only
931                                // operation, but if it's a fragmented read,
932                                // `ContinueDecompressedRead` operations will follow.
933                                operation = Ok(Operation::StartDecompressedRead {
934                                    required_buffer_size,
935                                    device_block_offset,
936                                    block_count,
937                                    options,
938                                });
939                                // Record sufficient information so that we can decompress when all
940                                // the requests complete.
941                                decompression_info = Some(DecompressionInfo {
942                                    compressed_range,
943                                    bytes_so_far,
944                                    mapping,
945                                    uncompressed_range: vmo_offset
946                                        ..vmo_offset + request.uncompressed_bytes as u64,
947                                    buffer: None,
948                                });
949                                None
950                            })
951                        }
952                    } else {
953                        Ok(Some(vmo.clone()))
954                    }
955                }
956                None => Err(zx::Status::IO),
957            },
958            Ok(Operation::Write { .. }) => self
959                .vmos
960                .lock()
961                .get(&request.vmoid)
962                .cloned()
963                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
964            Ok(Operation::CloseVmo) => {
965                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
966                    let vmo_clone = vmo.clone();
967                    // Make sure the VMO is dropped after all current Epoch guards have been
968                    // dropped.
969                    Epoch::global().defer(move || drop(vmo_clone));
970                    Ok(Some(vmo))
971                })
972            }
973            _ => Ok(None),
974        }
975        .unwrap_or_else(|e| {
976            operation = Err(e);
977            None
978        });
979
980        let trace_flow_id = NonZero::new(request.trace_flow_id);
981        let request_id = request_id.unwrap_or_else(|| {
982            RequestId(active_requests.requests.insert(ActiveRequest {
983                session,
984                group_or_request,
985                trace_flow_id,
986                _epoch_guard: Epoch::global().guard(),
987                status: zx::Status::OK,
988                count: 1,
989                req_id: is_single_request.then_some(request.reqid),
990                decompression_info,
991            }))
992        });
993
994        Ok(DecodedRequest {
995            request_id,
996            trace_flow_id,
997            operation: operation.map_err(|status| {
998                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
999            })?,
1000            vmo,
1001        })
1002    }
1003
1004    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1005        std::mem::take(&mut *self.vmos.lock())
1006    }
1007
1008    /// Maps the request and returns the mapped request with an optional remainder.
1009    fn map_request(
1010        &self,
1011        mut request: DecodedRequest,
1012        active_request: &mut ActiveRequest<SM::Session>,
1013    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1014        if active_request.status != zx::Status::OK {
1015            return Err(zx::Status::BAD_STATE);
1016        }
1017        let mapping = self.offset_map.mapping();
1018        match (mapping, request.operation.blocks()) {
1019            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1020                return Err(zx::Status::OUT_OF_RANGE);
1021            }
1022            _ => {}
1023        }
1024        let remainder = request.operation.map(
1025            self.offset_map.mapping(),
1026            self.offset_map.max_transfer_blocks(),
1027            self.block_size,
1028        );
1029        if remainder.is_some() {
1030            active_request.count += 1;
1031        }
1032        static CACHE: AtomicU64 = AtomicU64::new(0);
1033        if let Some(context) =
1034            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1035        {
1036            use fuchsia_trace::ArgValue;
1037            let trace_args = [
1038                ArgValue::of("request_id", request.request_id.0),
1039                ArgValue::of("opcode", request.operation.trace_label()),
1040            ];
1041            let _scope =
1042                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1043            if let Some(trace_flow_id) = active_request.trace_flow_id {
1044                fuchsia_trace::flow_step(
1045                    &context,
1046                    "block_server::start_transaction",
1047                    trace_flow_id.get().into(),
1048                    &[],
1049                );
1050            }
1051        }
1052        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1053        Ok((request, remainder))
1054    }
1055
1056    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1057        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1058    }
1059}
1060
1061#[repr(transparent)]
1062#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1063pub struct RequestId(usize);
1064
1065#[derive(Clone, Debug)]
1066struct DecodedRequest {
1067    request_id: RequestId,
1068    trace_flow_id: TraceFlowId,
1069    operation: Operation,
1070    vmo: Option<Arc<zx::Vmo>>,
1071}
1072
1073/// cbindgen:no-export
1074pub type WriteFlags = block_protocol::WriteFlags;
1075pub type WriteOptions = block_protocol::WriteOptions;
1076pub type ReadOptions = block_protocol::ReadOptions;
1077pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1078
1079#[repr(C)]
1080#[derive(Clone, Debug, PartialEq, Eq)]
1081pub enum Operation {
1082    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1083    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1084    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1085    // write variant).
1086    Read {
1087        device_block_offset: u64,
1088        block_count: u32,
1089        _unused: u32,
1090        vmo_offset: u64,
1091        options: ReadOptions,
1092    },
1093    Write {
1094        device_block_offset: u64,
1095        block_count: u32,
1096        _unused: u32,
1097        vmo_offset: u64,
1098        options: WriteOptions,
1099    },
1100    Flush,
1101    Trim {
1102        device_block_offset: u64,
1103        block_count: u32,
1104    },
1105    /// This will never be seen by the C interface.
1106    CloseVmo,
1107    StartDecompressedRead {
1108        required_buffer_size: usize,
1109        device_block_offset: u64,
1110        block_count: u32,
1111        options: ReadOptions,
1112    },
1113    ContinueDecompressedRead {
1114        offset: u64,
1115        device_block_offset: u64,
1116        block_count: u32,
1117        options: ReadOptions,
1118    },
1119}
1120
1121impl Operation {
1122    fn trace_label(&self) -> &'static str {
1123        match self {
1124            Operation::Read { .. } => "read",
1125            Operation::Write { .. } => "write",
1126            Operation::Flush { .. } => "flush",
1127            Operation::Trim { .. } => "trim",
1128            Operation::CloseVmo { .. } => "close_vmo",
1129            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1130            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1131        }
1132    }
1133
1134    /// Returns (offset, length).
1135    fn blocks(&self) -> Option<(u64, u32)> {
1136        match self {
1137            Operation::Read { device_block_offset, block_count, .. }
1138            | Operation::Write { device_block_offset, block_count, .. }
1139            | Operation::Trim { device_block_offset, block_count, .. } => {
1140                Some((*device_block_offset, *block_count))
1141            }
1142            _ => None,
1143        }
1144    }
1145
1146    /// Returns mutable references to (offset, length).
1147    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1148        match self {
1149            Operation::Read { device_block_offset, block_count, .. }
1150            | Operation::Write { device_block_offset, block_count, .. }
1151            | Operation::Trim { device_block_offset, block_count, .. } => {
1152                Some((device_block_offset, block_count))
1153            }
1154            _ => None,
1155        }
1156    }
1157
1158    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1159    /// start of the operation.
1160    fn map(
1161        &mut self,
1162        mapping: Option<&BlockOffsetMapping>,
1163        max_blocks: Option<NonZero<u32>>,
1164        block_size: u32,
1165    ) -> Option<Self> {
1166        let mut max = match self {
1167            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1168            _ => None,
1169        };
1170        let (offset, length) = self.blocks_mut()?;
1171        let orig_offset = *offset;
1172        if let Some(mapping) = mapping {
1173            let delta = *offset - mapping.source_block_offset;
1174            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1175            *offset = mapping.target_block_offset + delta;
1176            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1177            max = match max {
1178                None => Some(mapping_max),
1179                Some(m) => Some(std::cmp::min(m, mapping_max)),
1180            };
1181        };
1182        if let Some(max) = max {
1183            if *length as u64 > max {
1184                let rem = (*length as u64 - max) as u32;
1185                *length = max as u32;
1186                return Some(match self {
1187                    Operation::Read {
1188                        device_block_offset: _,
1189                        block_count: _,
1190                        vmo_offset,
1191                        _unused,
1192                        options,
1193                    } => {
1194                        let mut options = *options;
1195                        options.inline_crypto.dun += max as u32;
1196                        Operation::Read {
1197                            device_block_offset: orig_offset + max,
1198                            block_count: rem,
1199                            vmo_offset: *vmo_offset + max * block_size as u64,
1200                            _unused: *_unused,
1201                            options: options,
1202                        }
1203                    }
1204                    Operation::Write {
1205                        device_block_offset: _,
1206                        block_count: _,
1207                        _unused,
1208                        vmo_offset,
1209                        options,
1210                    } => {
1211                        let mut options = *options;
1212                        options.inline_crypto.dun += max as u32;
1213                        Operation::Write {
1214                            device_block_offset: orig_offset + max,
1215                            block_count: rem,
1216                            _unused: *_unused,
1217                            vmo_offset: *vmo_offset + max * block_size as u64,
1218                            options: options,
1219                        }
1220                    }
1221                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1222                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1223                    }
1224                    _ => unreachable!(),
1225                });
1226            }
1227        }
1228        None
1229    }
1230
1231    /// Returns true if the specified write flags are set.
1232    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1233        if let Operation::Write { options, .. } = self {
1234            options.flags.contains(value)
1235        } else {
1236            false
1237        }
1238    }
1239
1240    /// Removes `value` from the request's write flags and returns true if the flag was set.
1241    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1242        if let Operation::Write { options, .. } = self {
1243            let result = options.flags.contains(value);
1244            options.flags.remove(value);
1245            result
1246        } else {
1247            false
1248        }
1249    }
1250}
1251
1252#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1253pub enum GroupOrRequest {
1254    Group(u16),
1255    Request(u32),
1256}
1257
1258impl GroupOrRequest {
1259    fn is_group(&self) -> bool {
1260        matches!(self, Self::Group(_))
1261    }
1262
1263    fn group_id(&self) -> Option<u16> {
1264        match self {
1265            Self::Group(id) => Some(*id),
1266            Self::Request(_) => None,
1267        }
1268    }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    use super::{
1274        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1275        TraceFlowId,
1276    };
1277    use assert_matches::assert_matches;
1278    use block_protocol::{
1279        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1280        WriteFlags, WriteOptions,
1281    };
1282    use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1283    use fuchsia_sync::Mutex;
1284    use futures::FutureExt as _;
1285    use futures::channel::oneshot;
1286    use futures::future::BoxFuture;
1287    use std::borrow::Cow;
1288    use std::future::poll_fn;
1289    use std::num::NonZero;
1290    use std::pin::pin;
1291    use std::sync::Arc;
1292    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1293    use std::task::{Context, Poll};
1294    use zx::HandleBased as _;
1295    use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1296
1297    #[derive(Default)]
1298    struct MockInterface {
1299        read_hook: Option<
1300            Box<
1301                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1302                    + Send
1303                    + Sync,
1304            >,
1305        >,
1306        write_hook:
1307            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1308        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1309    }
1310
1311    impl super::async_interface::Interface for MockInterface {
1312        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1313            Ok(())
1314        }
1315
1316        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1317            Cow::Owned(test_device_info())
1318        }
1319
1320        async fn read(
1321            &self,
1322            device_block_offset: u64,
1323            block_count: u32,
1324            vmo: &Arc<zx::Vmo>,
1325            vmo_offset: u64,
1326            _opts: ReadOptions,
1327            _trace_flow_id: TraceFlowId,
1328        ) -> Result<(), zx::Status> {
1329            if let Some(read_hook) = &self.read_hook {
1330                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1331            } else {
1332                unimplemented!();
1333            }
1334        }
1335
1336        async fn write(
1337            &self,
1338            device_block_offset: u64,
1339            _block_count: u32,
1340            _vmo: &Arc<zx::Vmo>,
1341            _vmo_offset: u64,
1342            opts: WriteOptions,
1343            _trace_flow_id: TraceFlowId,
1344        ) -> Result<(), zx::Status> {
1345            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1346                && let Some(barrier_hook) = &self.barrier_hook
1347            {
1348                barrier_hook()?;
1349            }
1350            if let Some(write_hook) = &self.write_hook {
1351                write_hook(device_block_offset).await
1352            } else {
1353                unimplemented!();
1354            }
1355        }
1356
1357        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1358            Ok(())
1359        }
1360
1361        async fn trim(
1362            &self,
1363            _device_block_offset: u64,
1364            _block_count: u32,
1365            _trace_flow_id: TraceFlowId,
1366        ) -> Result<(), zx::Status> {
1367            unreachable!();
1368        }
1369
1370        async fn get_volume_info(
1371            &self,
1372        ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1373            // Hang forever for the test_requests_dont_block_sessions test.
1374            let () = std::future::pending().await;
1375            unreachable!();
1376        }
1377    }
1378
1379    const BLOCK_SIZE: u32 = 512;
1380    const MAX_TRANSFER_BLOCKS: u32 = 10;
1381
1382    fn test_device_info() -> DeviceInfo {
1383        DeviceInfo::Partition(PartitionInfo {
1384            device_flags: fblock::DeviceFlag::READONLY
1385                | fblock::DeviceFlag::BARRIER_SUPPORT
1386                | fblock::DeviceFlag::FUA_SUPPORT,
1387            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1388            block_range: Some(0..100),
1389            type_guid: [1; 16],
1390            instance_guid: [2; 16],
1391            name: "foo".to_string(),
1392            flags: 0xabcd,
1393        })
1394    }
1395
1396    #[fuchsia::test]
1397    async fn test_barriers_ordering() {
1398        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1399        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1400        let barrier_called = Arc::new(AtomicBool::new(false));
1401
1402        futures::join!(
1403            async move {
1404                let barrier_called_clone = barrier_called.clone();
1405                let block_server = BlockServer::new(
1406                    BLOCK_SIZE,
1407                    Arc::new(MockInterface {
1408                        barrier_hook: Some(Box::new(move || {
1409                            barrier_called.store(true, Ordering::Relaxed);
1410                            Ok(())
1411                        })),
1412                        write_hook: Some(Box::new(move |device_block_offset| {
1413                            let barrier_called = barrier_called_clone.clone();
1414                            Box::pin(async move {
1415                                // The sleep allows the server to reorder the fifo requests.
1416                                if device_block_offset % 2 == 0 {
1417                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1418                                        zx::MonotonicDuration::from_millis(200),
1419                                    ))
1420                                    .await;
1421                                }
1422                                assert!(barrier_called.load(Ordering::Relaxed));
1423                                Ok(())
1424                            })
1425                        })),
1426                        ..MockInterface::default()
1427                    }),
1428                );
1429                block_server.handle_requests(stream).await.unwrap();
1430            },
1431            async move {
1432                let (session_proxy, server) = fidl::endpoints::create_proxy();
1433
1434                proxy.open_session(server).unwrap();
1435
1436                let vmo_id = session_proxy
1437                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1438                    .await
1439                    .unwrap()
1440                    .unwrap();
1441                assert_ne!(vmo_id.id, 0);
1442
1443                let mut fifo =
1444                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1445                let (mut reader, mut writer) = fifo.async_io();
1446
1447                writer
1448                    .write_entries(&BlockFifoRequest {
1449                        command: BlockFifoCommand {
1450                            opcode: BlockOpcode::Write.into_primitive(),
1451                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1452                            ..Default::default()
1453                        },
1454                        vmoid: vmo_id.id,
1455                        dev_offset: 0,
1456                        length: 5,
1457                        vmo_offset: 6,
1458                        ..Default::default()
1459                    })
1460                    .await
1461                    .unwrap();
1462
1463                for i in 0..10 {
1464                    writer
1465                        .write_entries(&BlockFifoRequest {
1466                            command: BlockFifoCommand {
1467                                opcode: BlockOpcode::Write.into_primitive(),
1468                                ..Default::default()
1469                            },
1470                            vmoid: vmo_id.id,
1471                            dev_offset: i + 1,
1472                            length: 5,
1473                            vmo_offset: 6,
1474                            ..Default::default()
1475                        })
1476                        .await
1477                        .unwrap();
1478                }
1479                for _ in 0..11 {
1480                    let mut response = BlockFifoResponse::default();
1481                    reader.read_entries(&mut response).await.unwrap();
1482                    assert_eq!(response.status, zx::sys::ZX_OK);
1483                }
1484
1485                std::mem::drop(proxy);
1486            }
1487        );
1488    }
1489
1490    #[fuchsia::test]
1491    async fn test_info() {
1492        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1493
1494        futures::join!(
1495            async {
1496                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1497                block_server.handle_requests(stream).await.unwrap();
1498            },
1499            async {
1500                let expected_info = test_device_info();
1501                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1502                    info
1503                } else {
1504                    unreachable!()
1505                };
1506
1507                let block_info = proxy.get_info().await.unwrap().unwrap();
1508                assert_eq!(
1509                    block_info.block_count,
1510                    partition_info.block_range.as_ref().unwrap().end
1511                        - partition_info.block_range.as_ref().unwrap().start
1512                );
1513                assert_eq!(
1514                    block_info.flags,
1515                    fblock::DeviceFlag::READONLY
1516                        | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1517                        | fblock::DeviceFlag::BARRIER_SUPPORT
1518                        | fblock::DeviceFlag::FUA_SUPPORT
1519                );
1520
1521                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1522
1523                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1524                assert_eq!(status, zx::sys::ZX_OK);
1525                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1526
1527                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1528                assert_eq!(status, zx::sys::ZX_OK);
1529                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1530
1531                let (status, name) = proxy.get_name().await.unwrap();
1532                assert_eq!(status, zx::sys::ZX_OK);
1533                assert_eq!(name.as_ref(), Some(&partition_info.name));
1534
1535                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1536                assert_eq!(metadata.name, name);
1537                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1538                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1539                assert_eq!(
1540                    metadata.start_block_offset,
1541                    Some(partition_info.block_range.as_ref().unwrap().start)
1542                );
1543                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1544                assert_eq!(metadata.flags, Some(partition_info.flags));
1545
1546                std::mem::drop(proxy);
1547            }
1548        );
1549    }
1550
1551    #[fuchsia::test]
1552    async fn test_attach_vmo() {
1553        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1554
1555        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1556        let koid = vmo.koid().unwrap();
1557
1558        futures::join!(
1559            async {
1560                let block_server = BlockServer::new(
1561                    BLOCK_SIZE,
1562                    Arc::new(MockInterface {
1563                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1564                            assert_eq!(vmo.koid().unwrap(), koid);
1565                            Box::pin(async { Ok(()) })
1566                        })),
1567                        ..MockInterface::default()
1568                    }),
1569                );
1570                block_server.handle_requests(stream).await.unwrap();
1571            },
1572            async move {
1573                let (session_proxy, server) = fidl::endpoints::create_proxy();
1574
1575                proxy.open_session(server).unwrap();
1576
1577                let vmo_id = session_proxy
1578                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1579                    .await
1580                    .unwrap()
1581                    .unwrap();
1582                assert_ne!(vmo_id.id, 0);
1583
1584                let mut fifo =
1585                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1586                let (mut reader, mut writer) = fifo.async_io();
1587
1588                // Keep attaching VMOs until we eventually hit the maximum.
1589                let mut count = 1;
1590                loop {
1591                    match session_proxy
1592                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1593                        .await
1594                        .unwrap()
1595                    {
1596                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1597                        Err(e) => {
1598                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1599                            break;
1600                        }
1601                    }
1602
1603                    // Only test every 10 to keep test time down.
1604                    if count % 10 == 0 {
1605                        writer
1606                            .write_entries(&BlockFifoRequest {
1607                                command: BlockFifoCommand {
1608                                    opcode: BlockOpcode::Read.into_primitive(),
1609                                    ..Default::default()
1610                                },
1611                                vmoid: vmo_id.id,
1612                                length: 1,
1613                                ..Default::default()
1614                            })
1615                            .await
1616                            .unwrap();
1617
1618                        let mut response = BlockFifoResponse::default();
1619                        reader.read_entries(&mut response).await.unwrap();
1620                        assert_eq!(response.status, zx::sys::ZX_OK);
1621                    }
1622
1623                    count += 1;
1624                }
1625
1626                assert_eq!(count, u16::MAX as u64);
1627
1628                // Detach the original VMO, and make sure we can then attach another one.
1629                writer
1630                    .write_entries(&BlockFifoRequest {
1631                        command: BlockFifoCommand {
1632                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1633                            ..Default::default()
1634                        },
1635                        vmoid: vmo_id.id,
1636                        ..Default::default()
1637                    })
1638                    .await
1639                    .unwrap();
1640
1641                let mut response = BlockFifoResponse::default();
1642                reader.read_entries(&mut response).await.unwrap();
1643                assert_eq!(response.status, zx::sys::ZX_OK);
1644
1645                let new_vmo_id = session_proxy
1646                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1647                    .await
1648                    .unwrap()
1649                    .unwrap();
1650                // It should reuse the same ID.
1651                assert_eq!(new_vmo_id.id, vmo_id.id);
1652
1653                std::mem::drop(proxy);
1654            }
1655        );
1656    }
1657
1658    #[fuchsia::test]
1659    async fn test_close() {
1660        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1661
1662        let mut server = std::pin::pin!(
1663            async {
1664                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1665                block_server.handle_requests(stream).await.unwrap();
1666            }
1667            .fuse()
1668        );
1669
1670        let mut client = std::pin::pin!(
1671            async {
1672                let (session_proxy, server) = fidl::endpoints::create_proxy();
1673
1674                proxy.open_session(server).unwrap();
1675
1676                // Dropping the proxy should not cause the session to terminate because the session is
1677                // still live.
1678                std::mem::drop(proxy);
1679
1680                session_proxy.close().await.unwrap().unwrap();
1681
1682                // Keep the session alive.  Calling `close` should cause the server to terminate.
1683                let _: () = std::future::pending().await;
1684            }
1685            .fuse()
1686        );
1687
1688        futures::select!(
1689            _ = server => {}
1690            _ = client => unreachable!(),
1691        );
1692    }
1693
1694    #[derive(Default)]
1695    struct IoMockInterface {
1696        do_checks: bool,
1697        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1698        return_errors: bool,
1699    }
1700
1701    #[derive(Debug)]
1702    enum ExpectedOp {
1703        Read(u64, u32, u64),
1704        Write(u64, u32, u64),
1705        Trim(u64, u32),
1706        Flush,
1707    }
1708
1709    impl super::async_interface::Interface for IoMockInterface {
1710        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1711            Ok(())
1712        }
1713
1714        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1715            Cow::Owned(test_device_info())
1716        }
1717
1718        async fn read(
1719            &self,
1720            device_block_offset: u64,
1721            block_count: u32,
1722            _vmo: &Arc<zx::Vmo>,
1723            vmo_offset: u64,
1724            _opts: ReadOptions,
1725            _trace_flow_id: TraceFlowId,
1726        ) -> Result<(), zx::Status> {
1727            if self.return_errors {
1728                Err(zx::Status::INTERNAL)
1729            } else {
1730                if self.do_checks {
1731                    assert_matches!(
1732                        self.expected_op.lock().take(),
1733                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1734                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1735                        "Read {device_block_offset} {block_count} {vmo_offset}"
1736                    );
1737                }
1738                Ok(())
1739            }
1740        }
1741
1742        async fn write(
1743            &self,
1744            device_block_offset: u64,
1745            block_count: u32,
1746            _vmo: &Arc<zx::Vmo>,
1747            vmo_offset: u64,
1748            _write_opts: WriteOptions,
1749            _trace_flow_id: TraceFlowId,
1750        ) -> Result<(), zx::Status> {
1751            if self.return_errors {
1752                Err(zx::Status::NOT_SUPPORTED)
1753            } else {
1754                if self.do_checks {
1755                    assert_matches!(
1756                        self.expected_op.lock().take(),
1757                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1758                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1759                        "Write {device_block_offset} {block_count} {vmo_offset}"
1760                    );
1761                }
1762                Ok(())
1763            }
1764        }
1765
1766        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1767            if self.return_errors {
1768                Err(zx::Status::NO_RESOURCES)
1769            } else {
1770                if self.do_checks {
1771                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1772                }
1773                Ok(())
1774            }
1775        }
1776
1777        async fn trim(
1778            &self,
1779            device_block_offset: u64,
1780            block_count: u32,
1781            _trace_flow_id: TraceFlowId,
1782        ) -> Result<(), zx::Status> {
1783            if self.return_errors {
1784                Err(zx::Status::NO_MEMORY)
1785            } else {
1786                if self.do_checks {
1787                    assert_matches!(
1788                        self.expected_op.lock().take(),
1789                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1790                            block_count == b,
1791                        "Trim {device_block_offset} {block_count}"
1792                    );
1793                }
1794                Ok(())
1795            }
1796        }
1797    }
1798
1799    #[fuchsia::test]
1800    async fn test_io() {
1801        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1802
1803        let expected_op = Arc::new(Mutex::new(None));
1804        let expected_op_clone = expected_op.clone();
1805
1806        let server = async {
1807            let block_server = BlockServer::new(
1808                BLOCK_SIZE,
1809                Arc::new(IoMockInterface {
1810                    return_errors: false,
1811                    do_checks: true,
1812                    expected_op: expected_op_clone,
1813                }),
1814            );
1815            block_server.handle_requests(stream).await.unwrap();
1816        };
1817
1818        let client = async move {
1819            let (session_proxy, server) = fidl::endpoints::create_proxy();
1820
1821            proxy.open_session(server).unwrap();
1822
1823            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1824            let vmo_id = session_proxy
1825                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1826                .await
1827                .unwrap()
1828                .unwrap();
1829
1830            let mut fifo =
1831                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1832            let (mut reader, mut writer) = fifo.async_io();
1833
1834            // READ
1835            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1836            writer
1837                .write_entries(&BlockFifoRequest {
1838                    command: BlockFifoCommand {
1839                        opcode: BlockOpcode::Read.into_primitive(),
1840                        ..Default::default()
1841                    },
1842                    vmoid: vmo_id.id,
1843                    dev_offset: 1,
1844                    length: 2,
1845                    vmo_offset: 3,
1846                    ..Default::default()
1847                })
1848                .await
1849                .unwrap();
1850
1851            let mut response = BlockFifoResponse::default();
1852            reader.read_entries(&mut response).await.unwrap();
1853            assert_eq!(response.status, zx::sys::ZX_OK);
1854
1855            // WRITE
1856            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1857            writer
1858                .write_entries(&BlockFifoRequest {
1859                    command: BlockFifoCommand {
1860                        opcode: BlockOpcode::Write.into_primitive(),
1861                        ..Default::default()
1862                    },
1863                    vmoid: vmo_id.id,
1864                    dev_offset: 4,
1865                    length: 5,
1866                    vmo_offset: 6,
1867                    ..Default::default()
1868                })
1869                .await
1870                .unwrap();
1871
1872            let mut response = BlockFifoResponse::default();
1873            reader.read_entries(&mut response).await.unwrap();
1874            assert_eq!(response.status, zx::sys::ZX_OK);
1875
1876            // FLUSH
1877            *expected_op.lock() = Some(ExpectedOp::Flush);
1878            writer
1879                .write_entries(&BlockFifoRequest {
1880                    command: BlockFifoCommand {
1881                        opcode: BlockOpcode::Flush.into_primitive(),
1882                        ..Default::default()
1883                    },
1884                    ..Default::default()
1885                })
1886                .await
1887                .unwrap();
1888
1889            reader.read_entries(&mut response).await.unwrap();
1890            assert_eq!(response.status, zx::sys::ZX_OK);
1891
1892            // TRIM
1893            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1894            writer
1895                .write_entries(&BlockFifoRequest {
1896                    command: BlockFifoCommand {
1897                        opcode: BlockOpcode::Trim.into_primitive(),
1898                        ..Default::default()
1899                    },
1900                    dev_offset: 7,
1901                    length: 8,
1902                    ..Default::default()
1903                })
1904                .await
1905                .unwrap();
1906
1907            reader.read_entries(&mut response).await.unwrap();
1908            assert_eq!(response.status, zx::sys::ZX_OK);
1909
1910            std::mem::drop(proxy);
1911        };
1912
1913        futures::join!(server, client);
1914    }
1915
1916    #[fuchsia::test]
1917    async fn test_io_errors() {
1918        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1919
1920        futures::join!(
1921            async {
1922                let block_server = BlockServer::new(
1923                    BLOCK_SIZE,
1924                    Arc::new(IoMockInterface {
1925                        return_errors: true,
1926                        do_checks: false,
1927                        expected_op: Arc::new(Mutex::new(None)),
1928                    }),
1929                );
1930                block_server.handle_requests(stream).await.unwrap();
1931            },
1932            async move {
1933                let (session_proxy, server) = fidl::endpoints::create_proxy();
1934
1935                proxy.open_session(server).unwrap();
1936
1937                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1938                let vmo_id = session_proxy
1939                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1940                    .await
1941                    .unwrap()
1942                    .unwrap();
1943
1944                let mut fifo =
1945                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1946                let (mut reader, mut writer) = fifo.async_io();
1947
1948                // READ
1949                writer
1950                    .write_entries(&BlockFifoRequest {
1951                        command: BlockFifoCommand {
1952                            opcode: BlockOpcode::Read.into_primitive(),
1953                            ..Default::default()
1954                        },
1955                        vmoid: vmo_id.id,
1956                        length: 1,
1957                        reqid: 1,
1958                        ..Default::default()
1959                    })
1960                    .await
1961                    .unwrap();
1962
1963                let mut response = BlockFifoResponse::default();
1964                reader.read_entries(&mut response).await.unwrap();
1965                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1966
1967                // WRITE
1968                writer
1969                    .write_entries(&BlockFifoRequest {
1970                        command: BlockFifoCommand {
1971                            opcode: BlockOpcode::Write.into_primitive(),
1972                            ..Default::default()
1973                        },
1974                        vmoid: vmo_id.id,
1975                        length: 1,
1976                        reqid: 2,
1977                        ..Default::default()
1978                    })
1979                    .await
1980                    .unwrap();
1981
1982                reader.read_entries(&mut response).await.unwrap();
1983                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1984
1985                // FLUSH
1986                writer
1987                    .write_entries(&BlockFifoRequest {
1988                        command: BlockFifoCommand {
1989                            opcode: BlockOpcode::Flush.into_primitive(),
1990                            ..Default::default()
1991                        },
1992                        reqid: 3,
1993                        ..Default::default()
1994                    })
1995                    .await
1996                    .unwrap();
1997
1998                reader.read_entries(&mut response).await.unwrap();
1999                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2000
2001                // TRIM
2002                writer
2003                    .write_entries(&BlockFifoRequest {
2004                        command: BlockFifoCommand {
2005                            opcode: BlockOpcode::Trim.into_primitive(),
2006                            ..Default::default()
2007                        },
2008                        reqid: 4,
2009                        length: 1,
2010                        ..Default::default()
2011                    })
2012                    .await
2013                    .unwrap();
2014
2015                reader.read_entries(&mut response).await.unwrap();
2016                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2017
2018                std::mem::drop(proxy);
2019            }
2020        );
2021    }
2022
2023    #[fuchsia::test]
2024    async fn test_invalid_args() {
2025        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2026
2027        futures::join!(
2028            async {
2029                let block_server = BlockServer::new(
2030                    BLOCK_SIZE,
2031                    Arc::new(IoMockInterface {
2032                        return_errors: false,
2033                        do_checks: false,
2034                        expected_op: Arc::new(Mutex::new(None)),
2035                    }),
2036                );
2037                block_server.handle_requests(stream).await.unwrap();
2038            },
2039            async move {
2040                let (session_proxy, server) = fidl::endpoints::create_proxy();
2041
2042                proxy.open_session(server).unwrap();
2043
2044                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2045                let vmo_id = session_proxy
2046                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2047                    .await
2048                    .unwrap()
2049                    .unwrap();
2050
2051                let mut fifo =
2052                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2053
2054                async fn test(
2055                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2056                    request: BlockFifoRequest,
2057                ) -> Result<(), zx::Status> {
2058                    let (mut reader, mut writer) = fifo.async_io();
2059                    writer.write_entries(&request).await.unwrap();
2060                    let mut response = BlockFifoResponse::default();
2061                    reader.read_entries(&mut response).await.unwrap();
2062                    zx::Status::ok(response.status)
2063                }
2064
2065                // READ
2066
2067                let good_read_request = || BlockFifoRequest {
2068                    command: BlockFifoCommand {
2069                        opcode: BlockOpcode::Read.into_primitive(),
2070                        ..Default::default()
2071                    },
2072                    length: 1,
2073                    vmoid: vmo_id.id,
2074                    ..Default::default()
2075                };
2076
2077                assert_eq!(
2078                    test(
2079                        &mut fifo,
2080                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2081                    )
2082                    .await,
2083                    Err(zx::Status::IO)
2084                );
2085
2086                assert_eq!(
2087                    test(
2088                        &mut fifo,
2089                        BlockFifoRequest {
2090                            vmo_offset: 0xffff_ffff_ffff_ffff,
2091                            ..good_read_request()
2092                        }
2093                    )
2094                    .await,
2095                    Err(zx::Status::OUT_OF_RANGE)
2096                );
2097
2098                assert_eq!(
2099                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2100                    Err(zx::Status::INVALID_ARGS)
2101                );
2102
2103                // WRITE
2104
2105                let good_write_request = || BlockFifoRequest {
2106                    command: BlockFifoCommand {
2107                        opcode: BlockOpcode::Write.into_primitive(),
2108                        ..Default::default()
2109                    },
2110                    length: 1,
2111                    vmoid: vmo_id.id,
2112                    ..Default::default()
2113                };
2114
2115                assert_eq!(
2116                    test(
2117                        &mut fifo,
2118                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2119                    )
2120                    .await,
2121                    Err(zx::Status::IO)
2122                );
2123
2124                assert_eq!(
2125                    test(
2126                        &mut fifo,
2127                        BlockFifoRequest {
2128                            vmo_offset: 0xffff_ffff_ffff_ffff,
2129                            ..good_write_request()
2130                        }
2131                    )
2132                    .await,
2133                    Err(zx::Status::OUT_OF_RANGE)
2134                );
2135
2136                assert_eq!(
2137                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2138                    Err(zx::Status::INVALID_ARGS)
2139                );
2140
2141                // CLOSE VMO
2142
2143                assert_eq!(
2144                    test(
2145                        &mut fifo,
2146                        BlockFifoRequest {
2147                            command: BlockFifoCommand {
2148                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2149                                ..Default::default()
2150                            },
2151                            vmoid: vmo_id.id + 1,
2152                            ..Default::default()
2153                        }
2154                    )
2155                    .await,
2156                    Err(zx::Status::IO)
2157                );
2158
2159                std::mem::drop(proxy);
2160            }
2161        );
2162    }
2163
2164    #[fuchsia::test]
2165    async fn test_concurrent_requests() {
2166        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2167
2168        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2169        let waiting_readers_clone = waiting_readers.clone();
2170
2171        futures::join!(
2172            async move {
2173                let block_server = BlockServer::new(
2174                    BLOCK_SIZE,
2175                    Arc::new(MockInterface {
2176                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2177                            let (tx, rx) = oneshot::channel();
2178                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2179                            Box::pin(async move {
2180                                let _ = rx.await;
2181                                Ok(())
2182                            })
2183                        })),
2184                        ..MockInterface::default()
2185                    }),
2186                );
2187                block_server.handle_requests(stream).await.unwrap();
2188            },
2189            async move {
2190                let (session_proxy, server) = fidl::endpoints::create_proxy();
2191
2192                proxy.open_session(server).unwrap();
2193
2194                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2195                let vmo_id = session_proxy
2196                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2197                    .await
2198                    .unwrap()
2199                    .unwrap();
2200
2201                let mut fifo =
2202                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2203                let (mut reader, mut writer) = fifo.async_io();
2204
2205                writer
2206                    .write_entries(&BlockFifoRequest {
2207                        command: BlockFifoCommand {
2208                            opcode: BlockOpcode::Read.into_primitive(),
2209                            ..Default::default()
2210                        },
2211                        reqid: 1,
2212                        dev_offset: 1, // Intentionally use the same as `reqid`.
2213                        vmoid: vmo_id.id,
2214                        length: 1,
2215                        ..Default::default()
2216                    })
2217                    .await
2218                    .unwrap();
2219
2220                writer
2221                    .write_entries(&BlockFifoRequest {
2222                        command: BlockFifoCommand {
2223                            opcode: BlockOpcode::Read.into_primitive(),
2224                            ..Default::default()
2225                        },
2226                        reqid: 2,
2227                        dev_offset: 2,
2228                        vmoid: vmo_id.id,
2229                        length: 1,
2230                        ..Default::default()
2231                    })
2232                    .await
2233                    .unwrap();
2234
2235                // Wait till both those entries are pending.
2236                poll_fn(|cx: &mut Context<'_>| {
2237                    if waiting_readers.lock().len() == 2 {
2238                        Poll::Ready(())
2239                    } else {
2240                        // Yield to the executor.
2241                        cx.waker().wake_by_ref();
2242                        Poll::Pending
2243                    }
2244                })
2245                .await;
2246
2247                let mut response = BlockFifoResponse::default();
2248                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2249
2250                let (id, tx) = waiting_readers.lock().pop().unwrap();
2251                tx.send(()).unwrap();
2252
2253                reader.read_entries(&mut response).await.unwrap();
2254                assert_eq!(response.status, zx::sys::ZX_OK);
2255                assert_eq!(response.reqid, id);
2256
2257                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2258
2259                let (id, tx) = waiting_readers.lock().pop().unwrap();
2260                tx.send(()).unwrap();
2261
2262                reader.read_entries(&mut response).await.unwrap();
2263                assert_eq!(response.status, zx::sys::ZX_OK);
2264                assert_eq!(response.reqid, id);
2265            }
2266        );
2267    }
2268
2269    #[fuchsia::test]
2270    async fn test_groups() {
2271        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2272
2273        futures::join!(
2274            async move {
2275                let block_server = BlockServer::new(
2276                    BLOCK_SIZE,
2277                    Arc::new(MockInterface {
2278                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2279                        ..MockInterface::default()
2280                    }),
2281                );
2282                block_server.handle_requests(stream).await.unwrap();
2283            },
2284            async move {
2285                let (session_proxy, server) = fidl::endpoints::create_proxy();
2286
2287                proxy.open_session(server).unwrap();
2288
2289                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2290                let vmo_id = session_proxy
2291                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2292                    .await
2293                    .unwrap()
2294                    .unwrap();
2295
2296                let mut fifo =
2297                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2298                let (mut reader, mut writer) = fifo.async_io();
2299
2300                writer
2301                    .write_entries(&BlockFifoRequest {
2302                        command: BlockFifoCommand {
2303                            opcode: BlockOpcode::Read.into_primitive(),
2304                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2305                            ..Default::default()
2306                        },
2307                        group: 1,
2308                        vmoid: vmo_id.id,
2309                        length: 1,
2310                        ..Default::default()
2311                    })
2312                    .await
2313                    .unwrap();
2314
2315                writer
2316                    .write_entries(&BlockFifoRequest {
2317                        command: BlockFifoCommand {
2318                            opcode: BlockOpcode::Read.into_primitive(),
2319                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2320                            ..Default::default()
2321                        },
2322                        reqid: 2,
2323                        group: 1,
2324                        vmoid: vmo_id.id,
2325                        length: 1,
2326                        ..Default::default()
2327                    })
2328                    .await
2329                    .unwrap();
2330
2331                let mut response = BlockFifoResponse::default();
2332                reader.read_entries(&mut response).await.unwrap();
2333                assert_eq!(response.status, zx::sys::ZX_OK);
2334                assert_eq!(response.reqid, 2);
2335                assert_eq!(response.group, 1);
2336            }
2337        );
2338    }
2339
2340    #[fuchsia::test]
2341    async fn test_group_error() {
2342        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2343
2344        let counter = Arc::new(AtomicU64::new(0));
2345        let counter_clone = counter.clone();
2346
2347        futures::join!(
2348            async move {
2349                let block_server = BlockServer::new(
2350                    BLOCK_SIZE,
2351                    Arc::new(MockInterface {
2352                        read_hook: Some(Box::new(move |_, _, _, _| {
2353                            counter_clone.fetch_add(1, Ordering::Relaxed);
2354                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2355                        })),
2356                        ..MockInterface::default()
2357                    }),
2358                );
2359                block_server.handle_requests(stream).await.unwrap();
2360            },
2361            async move {
2362                let (session_proxy, server) = fidl::endpoints::create_proxy();
2363
2364                proxy.open_session(server).unwrap();
2365
2366                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2367                let vmo_id = session_proxy
2368                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2369                    .await
2370                    .unwrap()
2371                    .unwrap();
2372
2373                let mut fifo =
2374                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2375                let (mut reader, mut writer) = fifo.async_io();
2376
2377                writer
2378                    .write_entries(&BlockFifoRequest {
2379                        command: BlockFifoCommand {
2380                            opcode: BlockOpcode::Read.into_primitive(),
2381                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2382                            ..Default::default()
2383                        },
2384                        group: 1,
2385                        vmoid: vmo_id.id,
2386                        length: 1,
2387                        ..Default::default()
2388                    })
2389                    .await
2390                    .unwrap();
2391
2392                // Wait until processed.
2393                poll_fn(|cx: &mut Context<'_>| {
2394                    if counter.load(Ordering::Relaxed) == 1 {
2395                        Poll::Ready(())
2396                    } else {
2397                        // Yield to the executor.
2398                        cx.waker().wake_by_ref();
2399                        Poll::Pending
2400                    }
2401                })
2402                .await;
2403
2404                let mut response = BlockFifoResponse::default();
2405                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2406
2407                writer
2408                    .write_entries(&BlockFifoRequest {
2409                        command: BlockFifoCommand {
2410                            opcode: BlockOpcode::Read.into_primitive(),
2411                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2412                            ..Default::default()
2413                        },
2414                        group: 1,
2415                        vmoid: vmo_id.id,
2416                        length: 1,
2417                        ..Default::default()
2418                    })
2419                    .await
2420                    .unwrap();
2421
2422                writer
2423                    .write_entries(&BlockFifoRequest {
2424                        command: BlockFifoCommand {
2425                            opcode: BlockOpcode::Read.into_primitive(),
2426                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2427                            ..Default::default()
2428                        },
2429                        reqid: 2,
2430                        group: 1,
2431                        vmoid: vmo_id.id,
2432                        length: 1,
2433                        ..Default::default()
2434                    })
2435                    .await
2436                    .unwrap();
2437
2438                reader.read_entries(&mut response).await.unwrap();
2439                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2440                assert_eq!(response.reqid, 2);
2441                assert_eq!(response.group, 1);
2442
2443                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2444
2445                // Only the first request should have been processed.
2446                assert_eq!(counter.load(Ordering::Relaxed), 1);
2447            }
2448        );
2449    }
2450
2451    #[fuchsia::test]
2452    async fn test_group_with_two_lasts() {
2453        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2454
2455        let (tx, rx) = oneshot::channel();
2456
2457        futures::join!(
2458            async move {
2459                let rx = Mutex::new(Some(rx));
2460                let block_server = BlockServer::new(
2461                    BLOCK_SIZE,
2462                    Arc::new(MockInterface {
2463                        read_hook: Some(Box::new(move |_, _, _, _| {
2464                            let rx = rx.lock().take().unwrap();
2465                            Box::pin(async {
2466                                let _ = rx.await;
2467                                Ok(())
2468                            })
2469                        })),
2470                        ..MockInterface::default()
2471                    }),
2472                );
2473                block_server.handle_requests(stream).await.unwrap();
2474            },
2475            async move {
2476                let (session_proxy, server) = fidl::endpoints::create_proxy();
2477
2478                proxy.open_session(server).unwrap();
2479
2480                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2481                let vmo_id = session_proxy
2482                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2483                    .await
2484                    .unwrap()
2485                    .unwrap();
2486
2487                let mut fifo =
2488                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2489                let (mut reader, mut writer) = fifo.async_io();
2490
2491                writer
2492                    .write_entries(&BlockFifoRequest {
2493                        command: BlockFifoCommand {
2494                            opcode: BlockOpcode::Read.into_primitive(),
2495                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2496                            ..Default::default()
2497                        },
2498                        reqid: 1,
2499                        group: 1,
2500                        vmoid: vmo_id.id,
2501                        length: 1,
2502                        ..Default::default()
2503                    })
2504                    .await
2505                    .unwrap();
2506
2507                writer
2508                    .write_entries(&BlockFifoRequest {
2509                        command: BlockFifoCommand {
2510                            opcode: BlockOpcode::Read.into_primitive(),
2511                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2512                            ..Default::default()
2513                        },
2514                        reqid: 2,
2515                        group: 1,
2516                        vmoid: vmo_id.id,
2517                        length: 1,
2518                        ..Default::default()
2519                    })
2520                    .await
2521                    .unwrap();
2522
2523                // Send an independent request to flush through the fifo.
2524                writer
2525                    .write_entries(&BlockFifoRequest {
2526                        command: BlockFifoCommand {
2527                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2528                            ..Default::default()
2529                        },
2530                        reqid: 3,
2531                        vmoid: vmo_id.id,
2532                        ..Default::default()
2533                    })
2534                    .await
2535                    .unwrap();
2536
2537                // It should succeed.
2538                let mut response = BlockFifoResponse::default();
2539                reader.read_entries(&mut response).await.unwrap();
2540                assert_eq!(response.status, zx::sys::ZX_OK);
2541                assert_eq!(response.reqid, 3);
2542
2543                // Now release the original request.
2544                tx.send(()).unwrap();
2545
2546                // The response should be for the first message tagged as last, and it should be
2547                // an error because we sent two messages with the LAST marker.
2548                let mut response = BlockFifoResponse::default();
2549                reader.read_entries(&mut response).await.unwrap();
2550                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2551                assert_eq!(response.reqid, 1);
2552                assert_eq!(response.group, 1);
2553            }
2554        );
2555    }
2556
2557    #[fuchsia::test(allow_stalls = false)]
2558    async fn test_requests_dont_block_sessions() {
2559        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2560
2561        let (tx, rx) = oneshot::channel();
2562
2563        fasync::Task::local(async move {
2564            let rx = Mutex::new(Some(rx));
2565            let block_server = BlockServer::new(
2566                BLOCK_SIZE,
2567                Arc::new(MockInterface {
2568                    read_hook: Some(Box::new(move |_, _, _, _| {
2569                        let rx = rx.lock().take().unwrap();
2570                        Box::pin(async {
2571                            let _ = rx.await;
2572                            Ok(())
2573                        })
2574                    })),
2575                    ..MockInterface::default()
2576                }),
2577            );
2578            block_server.handle_requests(stream).await.unwrap();
2579        })
2580        .detach();
2581
2582        let mut fut = pin!(async {
2583            let (session_proxy, server) = fidl::endpoints::create_proxy();
2584
2585            proxy.open_session(server).unwrap();
2586
2587            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2588            let vmo_id = session_proxy
2589                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2590                .await
2591                .unwrap()
2592                .unwrap();
2593
2594            let mut fifo =
2595                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2596            let (mut reader, mut writer) = fifo.async_io();
2597
2598            writer
2599                .write_entries(&BlockFifoRequest {
2600                    command: BlockFifoCommand {
2601                        opcode: BlockOpcode::Read.into_primitive(),
2602                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2603                        ..Default::default()
2604                    },
2605                    reqid: 1,
2606                    group: 1,
2607                    vmoid: vmo_id.id,
2608                    length: 1,
2609                    ..Default::default()
2610                })
2611                .await
2612                .unwrap();
2613
2614            let mut response = BlockFifoResponse::default();
2615            reader.read_entries(&mut response).await.unwrap();
2616            assert_eq!(response.status, zx::sys::ZX_OK);
2617        });
2618
2619        // The response won't come back until we send on `tx`.
2620        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2621
2622        let mut fut2 = pin!(proxy.get_volume_info());
2623
2624        // get_volume_info is set up to stall forever.
2625        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2626
2627        // If we now free up the first future, it should resolve; the stalled call to
2628        // get_volume_info should not block the fifo response.
2629        let _ = tx.send(());
2630
2631        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2632    }
2633
2634    #[fuchsia::test]
2635    async fn test_request_flow_control() {
2636        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2637
2638        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2639        // server will block until that happens.
2640        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2641        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2642        let event_clone = event.clone();
2643        futures::join!(
2644            async move {
2645                let block_server = BlockServer::new(
2646                    BLOCK_SIZE,
2647                    Arc::new(MockInterface {
2648                        read_hook: Some(Box::new(move |_, _, _, _| {
2649                            let event_clone = event_clone.clone();
2650                            Box::pin(async move {
2651                                if !event_clone.1.load(Ordering::SeqCst) {
2652                                    event_clone.0.listen().await;
2653                                }
2654                                Ok(())
2655                            })
2656                        })),
2657                        ..MockInterface::default()
2658                    }),
2659                );
2660                block_server.handle_requests(stream).await.unwrap();
2661            },
2662            async move {
2663                let (session_proxy, server) = fidl::endpoints::create_proxy();
2664
2665                proxy.open_session(server).unwrap();
2666
2667                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2668                let vmo_id = session_proxy
2669                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2670                    .await
2671                    .unwrap()
2672                    .unwrap();
2673
2674                let mut fifo =
2675                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2676                let (mut reader, mut writer) = fifo.async_io();
2677
2678                for i in 0..MAX_REQUESTS {
2679                    writer
2680                        .write_entries(&BlockFifoRequest {
2681                            command: BlockFifoCommand {
2682                                opcode: BlockOpcode::Read.into_primitive(),
2683                                ..Default::default()
2684                            },
2685                            reqid: (i + 1) as u32,
2686                            dev_offset: i,
2687                            vmoid: vmo_id.id,
2688                            length: 1,
2689                            ..Default::default()
2690                        })
2691                        .await
2692                        .unwrap();
2693                }
2694                assert!(
2695                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2696                        command: BlockFifoCommand {
2697                            opcode: BlockOpcode::Read.into_primitive(),
2698                            ..Default::default()
2699                        },
2700                        reqid: u32::MAX,
2701                        dev_offset: MAX_REQUESTS,
2702                        vmoid: vmo_id.id,
2703                        length: 1,
2704                        ..Default::default()
2705                    })))
2706                    .is_pending()
2707                );
2708                // OK, let the server start to process.
2709                event.1.store(true, Ordering::SeqCst);
2710                event.0.notify(usize::MAX);
2711                // For each entry we read, make sure we can write a new one in.
2712                let mut finished_reqids = vec![];
2713                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2714                    let mut response = BlockFifoResponse::default();
2715                    reader.read_entries(&mut response).await.unwrap();
2716                    assert_eq!(response.status, zx::sys::ZX_OK);
2717                    finished_reqids.push(response.reqid);
2718                    writer
2719                        .write_entries(&BlockFifoRequest {
2720                            command: BlockFifoCommand {
2721                                opcode: BlockOpcode::Read.into_primitive(),
2722                                ..Default::default()
2723                            },
2724                            reqid: (i + 1) as u32,
2725                            dev_offset: i,
2726                            vmoid: vmo_id.id,
2727                            length: 1,
2728                            ..Default::default()
2729                        })
2730                        .await
2731                        .unwrap();
2732                }
2733                let mut response = BlockFifoResponse::default();
2734                for _ in 0..MAX_REQUESTS {
2735                    reader.read_entries(&mut response).await.unwrap();
2736                    assert_eq!(response.status, zx::sys::ZX_OK);
2737                    finished_reqids.push(response.reqid);
2738                }
2739                // Verify that we got a response for each request.  Note that we can't assume FIFO
2740                // ordering.
2741                finished_reqids.sort();
2742                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2743                let mut i = 1;
2744                for reqid in finished_reqids {
2745                    assert_eq!(reqid, i);
2746                    i += 1;
2747                }
2748            }
2749        );
2750    }
2751
2752    #[fuchsia::test]
2753    async fn test_passthrough_io_with_fixed_map() {
2754        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2755
2756        let expected_op = Arc::new(Mutex::new(None));
2757        let expected_op_clone = expected_op.clone();
2758        futures::join!(
2759            async {
2760                let block_server = BlockServer::new(
2761                    BLOCK_SIZE,
2762                    Arc::new(IoMockInterface {
2763                        return_errors: false,
2764                        do_checks: true,
2765                        expected_op: expected_op_clone,
2766                    }),
2767                );
2768                block_server.handle_requests(stream).await.unwrap();
2769            },
2770            async move {
2771                let (session_proxy, server) = fidl::endpoints::create_proxy();
2772
2773                let mapping = fblock::BlockOffsetMapping {
2774                    source_block_offset: 0,
2775                    target_block_offset: 10,
2776                    length: 20,
2777                };
2778                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2779
2780                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2781                let vmo_id = session_proxy
2782                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2783                    .await
2784                    .unwrap()
2785                    .unwrap();
2786
2787                let mut fifo =
2788                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2789                let (mut reader, mut writer) = fifo.async_io();
2790
2791                // READ
2792                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2793                writer
2794                    .write_entries(&BlockFifoRequest {
2795                        command: BlockFifoCommand {
2796                            opcode: BlockOpcode::Read.into_primitive(),
2797                            ..Default::default()
2798                        },
2799                        vmoid: vmo_id.id,
2800                        dev_offset: 1,
2801                        length: 2,
2802                        vmo_offset: 3,
2803                        ..Default::default()
2804                    })
2805                    .await
2806                    .unwrap();
2807
2808                let mut response = BlockFifoResponse::default();
2809                reader.read_entries(&mut response).await.unwrap();
2810                assert_eq!(response.status, zx::sys::ZX_OK);
2811
2812                // WRITE
2813                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2814                writer
2815                    .write_entries(&BlockFifoRequest {
2816                        command: BlockFifoCommand {
2817                            opcode: BlockOpcode::Write.into_primitive(),
2818                            ..Default::default()
2819                        },
2820                        vmoid: vmo_id.id,
2821                        dev_offset: 4,
2822                        length: 5,
2823                        vmo_offset: 6,
2824                        ..Default::default()
2825                    })
2826                    .await
2827                    .unwrap();
2828
2829                reader.read_entries(&mut response).await.unwrap();
2830                assert_eq!(response.status, zx::sys::ZX_OK);
2831
2832                // FLUSH
2833                *expected_op.lock() = Some(ExpectedOp::Flush);
2834                writer
2835                    .write_entries(&BlockFifoRequest {
2836                        command: BlockFifoCommand {
2837                            opcode: BlockOpcode::Flush.into_primitive(),
2838                            ..Default::default()
2839                        },
2840                        ..Default::default()
2841                    })
2842                    .await
2843                    .unwrap();
2844
2845                reader.read_entries(&mut response).await.unwrap();
2846                assert_eq!(response.status, zx::sys::ZX_OK);
2847
2848                // TRIM
2849                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2850                writer
2851                    .write_entries(&BlockFifoRequest {
2852                        command: BlockFifoCommand {
2853                            opcode: BlockOpcode::Trim.into_primitive(),
2854                            ..Default::default()
2855                        },
2856                        dev_offset: 7,
2857                        length: 3,
2858                        ..Default::default()
2859                    })
2860                    .await
2861                    .unwrap();
2862
2863                reader.read_entries(&mut response).await.unwrap();
2864                assert_eq!(response.status, zx::sys::ZX_OK);
2865
2866                // READ past window
2867                *expected_op.lock() = None;
2868                writer
2869                    .write_entries(&BlockFifoRequest {
2870                        command: BlockFifoCommand {
2871                            opcode: BlockOpcode::Read.into_primitive(),
2872                            ..Default::default()
2873                        },
2874                        vmoid: vmo_id.id,
2875                        dev_offset: 19,
2876                        length: 2,
2877                        vmo_offset: 3,
2878                        ..Default::default()
2879                    })
2880                    .await
2881                    .unwrap();
2882
2883                reader.read_entries(&mut response).await.unwrap();
2884                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2885
2886                std::mem::drop(proxy);
2887            }
2888        );
2889    }
2890
2891    #[fuchsia::test]
2892    fn operation_map() {
2893        const BLOCK_SIZE: u32 = 512;
2894
2895        #[track_caller]
2896        fn expect_map_result(
2897            mut operation: Operation,
2898            mapping: Option<BlockOffsetMapping>,
2899            max_blocks: Option<NonZero<u32>>,
2900            expected_operations: Vec<Operation>,
2901        ) {
2902            let mut ops = vec![];
2903            while let Some(remainder) =
2904                operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2905            {
2906                ops.push(operation);
2907                operation = remainder;
2908            }
2909            ops.push(operation);
2910            assert_eq!(ops, expected_operations);
2911        }
2912
2913        // No limits
2914        expect_map_result(
2915            Operation::Read {
2916                device_block_offset: 10,
2917                block_count: 200,
2918                _unused: 0,
2919                vmo_offset: 0,
2920                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2921            },
2922            None,
2923            None,
2924            vec![Operation::Read {
2925                device_block_offset: 10,
2926                block_count: 200,
2927                _unused: 0,
2928                vmo_offset: 0,
2929                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2930            }],
2931        );
2932
2933        // Max block count
2934        expect_map_result(
2935            Operation::Read {
2936                device_block_offset: 10,
2937                block_count: 200,
2938                _unused: 0,
2939                vmo_offset: 0,
2940                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2941            },
2942            None,
2943            NonZero::new(120),
2944            vec![
2945                Operation::Read {
2946                    device_block_offset: 10,
2947                    block_count: 120,
2948                    _unused: 0,
2949                    vmo_offset: 0,
2950                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2951                },
2952                Operation::Read {
2953                    device_block_offset: 130,
2954                    block_count: 80,
2955                    _unused: 0,
2956                    vmo_offset: 120 * BLOCK_SIZE as u64,
2957                    options: ReadOptions {
2958                        // The DUN should be offset by the number of blocks in the first request.
2959                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
2960                    },
2961                },
2962            ],
2963        );
2964        expect_map_result(
2965            Operation::Trim { device_block_offset: 10, block_count: 200 },
2966            None,
2967            NonZero::new(120),
2968            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2969        );
2970
2971        // Remapping + Max block count
2972        expect_map_result(
2973            Operation::Read {
2974                device_block_offset: 10,
2975                block_count: 200,
2976                _unused: 0,
2977                vmo_offset: 0,
2978                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2979            },
2980            Some(BlockOffsetMapping {
2981                source_block_offset: 10,
2982                target_block_offset: 100,
2983                length: 200,
2984            }),
2985            NonZero::new(120),
2986            vec![
2987                Operation::Read {
2988                    device_block_offset: 100,
2989                    block_count: 120,
2990                    _unused: 0,
2991                    vmo_offset: 0,
2992                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2993                },
2994                Operation::Read {
2995                    device_block_offset: 220,
2996                    block_count: 80,
2997                    _unused: 0,
2998                    vmo_offset: 120 * BLOCK_SIZE as u64,
2999                    options: ReadOptions {
3000                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3001                    },
3002                },
3003            ],
3004        );
3005        expect_map_result(
3006            Operation::Trim { device_block_offset: 10, block_count: 200 },
3007            Some(BlockOffsetMapping {
3008                source_block_offset: 10,
3009                target_block_offset: 100,
3010                length: 200,
3011            }),
3012            NonZero::new(120),
3013            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3014        );
3015    }
3016
3017    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3018    #[fuchsia::test]
3019    async fn test_pre_barrier_flush_failure() {
3020        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3021
3022        struct NoBarrierInterface;
3023        impl super::async_interface::Interface for NoBarrierInterface {
3024            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3025                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3026                    device_flags: fblock::DeviceFlag::empty(), // No BARRIER_SUPPORT
3027                    max_transfer_blocks: NonZero::new(100),
3028                    block_range: Some(0..100),
3029                    type_guid: [0; 16],
3030                    instance_guid: [0; 16],
3031                    name: "test".to_string(),
3032                    flags: 0,
3033                }))
3034            }
3035            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3036                Ok(())
3037            }
3038            async fn read(
3039                &self,
3040                _: u64,
3041                _: u32,
3042                _: &Arc<zx::Vmo>,
3043                _: u64,
3044                _: ReadOptions,
3045                _: TraceFlowId,
3046            ) -> Result<(), zx::Status> {
3047                unreachable!()
3048            }
3049            async fn write(
3050                &self,
3051                _: u64,
3052                _: u32,
3053                _: &Arc<zx::Vmo>,
3054                _: u64,
3055                _: WriteOptions,
3056                _: TraceFlowId,
3057            ) -> Result<(), zx::Status> {
3058                panic!("Write should not be called");
3059            }
3060            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3061                Err(zx::Status::IO)
3062            }
3063            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3064                unreachable!()
3065            }
3066        }
3067
3068        futures::join!(
3069            async move {
3070                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3071                block_server.handle_requests(stream).await.unwrap();
3072            },
3073            async move {
3074                let (session_proxy, server) = fidl::endpoints::create_proxy();
3075                proxy.open_session(server).unwrap();
3076                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3077                let vmo_id = session_proxy
3078                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3079                    .await
3080                    .unwrap()
3081                    .unwrap();
3082
3083                let mut fifo =
3084                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3085                let (mut reader, mut writer) = fifo.async_io();
3086
3087                writer
3088                    .write_entries(&BlockFifoRequest {
3089                        command: BlockFifoCommand {
3090                            opcode: BlockOpcode::Write.into_primitive(),
3091                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3092                            ..Default::default()
3093                        },
3094                        vmoid: vmo_id.id,
3095                        length: 1,
3096                        ..Default::default()
3097                    })
3098                    .await
3099                    .unwrap();
3100
3101                let mut response = BlockFifoResponse::default();
3102                reader.read_entries(&mut response).await.unwrap();
3103                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3104            }
3105        );
3106    }
3107
3108    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3109    // post-flush is not executed.
3110    #[fuchsia::test]
3111    async fn test_post_barrier_write_failure() {
3112        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3113
3114        struct NoBarrierInterface;
3115        impl super::async_interface::Interface for NoBarrierInterface {
3116            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3117                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3118                    device_flags: fblock::DeviceFlag::empty(), // No FUA_SUPPORT
3119                    max_transfer_blocks: NonZero::new(100),
3120                    block_range: Some(0..100),
3121                    type_guid: [0; 16],
3122                    instance_guid: [0; 16],
3123                    name: "test".to_string(),
3124                    flags: 0,
3125                }))
3126            }
3127            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3128                Ok(())
3129            }
3130            async fn read(
3131                &self,
3132                _: u64,
3133                _: u32,
3134                _: &Arc<zx::Vmo>,
3135                _: u64,
3136                _: ReadOptions,
3137                _: TraceFlowId,
3138            ) -> Result<(), zx::Status> {
3139                unreachable!()
3140            }
3141            async fn write(
3142                &self,
3143                _: u64,
3144                _: u32,
3145                _: &Arc<zx::Vmo>,
3146                _: u64,
3147                _: WriteOptions,
3148                _: TraceFlowId,
3149            ) -> Result<(), zx::Status> {
3150                Err(zx::Status::IO)
3151            }
3152            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3153                panic!("Flush should not be called")
3154            }
3155            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3156                unreachable!()
3157            }
3158        }
3159
3160        futures::join!(
3161            async move {
3162                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3163                block_server.handle_requests(stream).await.unwrap();
3164            },
3165            async move {
3166                let (session_proxy, server) = fidl::endpoints::create_proxy();
3167                proxy.open_session(server).unwrap();
3168                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3169                let vmo_id = session_proxy
3170                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3171                    .await
3172                    .unwrap()
3173                    .unwrap();
3174
3175                let mut fifo =
3176                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3177                let (mut reader, mut writer) = fifo.async_io();
3178
3179                writer
3180                    .write_entries(&BlockFifoRequest {
3181                        command: BlockFifoCommand {
3182                            opcode: BlockOpcode::Write.into_primitive(),
3183                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3184                            ..Default::default()
3185                        },
3186                        vmoid: vmo_id.id,
3187                        length: 1,
3188                        ..Default::default()
3189                    })
3190                    .await
3191                    .unwrap();
3192
3193                let mut response = BlockFifoResponse::default();
3194                reader.read_entries(&mut response).await.unwrap();
3195                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3196            }
3197        );
3198    }
3199}