block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[cfg(test)]
25mod decompression_tests;
26
27pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
28
29type TraceFlowId = Option<NonZero<u64>>;
30
31#[derive(Clone)]
32pub enum DeviceInfo {
33    Block(BlockInfo),
34    Partition(PartitionInfo),
35}
36
37impl DeviceInfo {
38    pub fn device_flags(&self) -> fblock::DeviceFlag {
39        match self {
40            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
41            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
42        }
43    }
44
45    pub fn block_count(&self) -> Option<u64> {
46        match self {
47            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
48            Self::Partition(PartitionInfo { block_range, .. }) => {
49                block_range.as_ref().map(|range| range.end - range.start)
50            }
51        }
52    }
53
54    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
55        match self {
56            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
57            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
58                max_transfer_blocks.clone()
59            }
60        }
61    }
62
63    fn max_transfer_size(&self, block_size: u32) -> u32 {
64        if let Some(max_blocks) = self.max_transfer_blocks() {
65            max_blocks.get() * block_size
66        } else {
67            MAX_TRANSFER_UNBOUNDED
68        }
69    }
70}
71
72/// Information associated with non-partition block devices.
73#[derive(Clone, Default)]
74pub struct BlockInfo {
75    pub device_flags: fblock::DeviceFlag,
76    pub block_count: u64,
77    pub max_transfer_blocks: Option<NonZero<u32>>,
78}
79
80/// Information associated with a block device that is also a partition.
81#[derive(Clone, Default)]
82pub struct PartitionInfo {
83    /// The device flags reported by the underlying device.
84    pub device_flags: fblock::DeviceFlag,
85    pub max_transfer_blocks: Option<NonZero<u32>>,
86    /// If `block_range` is None, the partition is a volume and may not be contiguous.
87    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
88    /// slices and use that (along with the slice and block sizes) to determine the block count.
89    pub block_range: Option<Range<u64>>,
90    pub type_guid: [u8; 16],
91    pub instance_guid: [u8; 16],
92    pub name: String,
93    pub flags: u64,
94}
95
96/// We internally keep track of active requests, so that when the server is torn down, we can
97/// deallocate all of the resources for pending requests.
98struct ActiveRequest<S> {
99    session: S,
100    group_or_request: GroupOrRequest,
101    trace_flow_id: TraceFlowId,
102    _epoch_guard: EpochGuard<'static>,
103    status: zx::Status,
104    count: u32,
105    req_id: Option<u32>,
106    decompression_info: Option<DecompressionInfo>,
107}
108
109struct DecompressionInfo {
110    // This is the range of compressed bytes in receiving buffer.
111    compressed_range: Range<usize>,
112
113    // This is the range in the target VMO where we will write uncompressed bytes.
114    uncompressed_range: Range<u64>,
115
116    bytes_so_far: u64,
117    mapping: Arc<VmoMapping>,
118    buffer: Option<Buffer<'static>>,
119}
120
121impl DecompressionInfo {
122    /// Returns the uncompressed slice.
123    fn uncompressed_slice(&self) -> *mut [u8] {
124        std::ptr::slice_from_raw_parts_mut(
125            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
126            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
127        )
128    }
129}
130
131pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
132
133impl<S> Default for ActiveRequests<S> {
134    fn default() -> Self {
135        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
136    }
137}
138
139impl<S> ActiveRequests<S> {
140    fn complete_and_take_response(
141        &self,
142        request_id: RequestId,
143        status: zx::Status,
144    ) -> Option<(S, BlockFifoResponse)> {
145        self.0.lock().complete_and_take_response(request_id, status)
146    }
147
148    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
149        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
150    }
151}
152
153struct ActiveRequestsInner<S> {
154    requests: Slab<ActiveRequest<S>>,
155}
156
157// Keeps track of all the requests that are currently being processed
158impl<S> ActiveRequestsInner<S> {
159    /// Completes a request.
160    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
161        let group = &mut self.requests[request_id.0];
162
163        group.count = group.count.checked_sub(1).unwrap();
164        if status != zx::Status::OK && group.status == zx::Status::OK {
165            group.status = status
166        }
167
168        fuchsia_trace::duration!(
169            "storage",
170            "block_server::finish_transaction",
171            "request_id" => request_id.0,
172            "group_completed" => group.count == 0,
173            "status" => status.into_raw());
174        if let Some(trace_flow_id) = group.trace_flow_id {
175            fuchsia_trace::flow_step!(
176                "storage",
177                "block_server::finish_request",
178                trace_flow_id.get().into()
179            );
180        }
181
182        if group.count == 0
183            && group.status == zx::Status::OK
184            && let Some(info) = &mut group.decompression_info
185        {
186            thread_local! {
187                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
188                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
189            }
190            DECOMPRESSOR.with(|decompressor| {
191                // SAFETY: We verified `uncompressed_range` fits within our mapping.
192                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
193                let mut decompressor = decompressor.borrow_mut();
194                if let Err(error) = decompressor.decompress_to_buffer(
195                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
196                    target_slice,
197                ) {
198                    log::warn!(error:?; "Decompression error");
199                    group.status = zx::Status::IO_DATA_INTEGRITY;
200                };
201            });
202        }
203    }
204
205    /// Takes the response if all requests are finished.
206    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
207        let group = &self.requests[request_id.0];
208        match group.req_id {
209            Some(reqid) if group.count == 0 => {
210                let group = self.requests.remove(request_id.0);
211                Some((
212                    group.session,
213                    BlockFifoResponse {
214                        status: group.status.into_raw(),
215                        reqid,
216                        group: group.group_or_request.group_id().unwrap_or(0),
217                        ..Default::default()
218                    },
219                ))
220            }
221            _ => None,
222        }
223    }
224
225    /// Competes the request and returns a response if the request group is finished.
226    fn complete_and_take_response(
227        &mut self,
228        request_id: RequestId,
229        status: zx::Status,
230    ) -> Option<(S, BlockFifoResponse)> {
231        self.complete(request_id, status);
232        self.take_response(request_id)
233    }
234}
235
236/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
237/// cbindgen:no-export
238pub struct BlockServer<SM> {
239    block_size: u32,
240    session_manager: Arc<SM>,
241}
242
243/// A single entry in `[OffsetMap]`.
244#[derive(Debug)]
245pub struct BlockOffsetMapping {
246    source_block_offset: u64,
247    target_block_offset: u64,
248    length: u64,
249}
250
251impl BlockOffsetMapping {
252    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
253        blocks.0 >= self.source_block_offset
254            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
255    }
256}
257
258impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
259    type Error = zx::Status;
260
261    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
262        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
263        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
264        Ok(Self {
265            source_block_offset: wire.source_block_offset,
266            target_block_offset: wire.target_block_offset,
267            length: wire.length,
268        })
269    }
270}
271
272/// Remaps the offset of block requests based on an internal map, and truncates long requests.
273pub struct OffsetMap {
274    mapping: Option<BlockOffsetMapping>,
275    max_transfer_blocks: Option<NonZero<u32>>,
276}
277
278impl OffsetMap {
279    /// An OffsetMap that remaps requests.
280    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
281        Self { mapping: Some(mapping), max_transfer_blocks }
282    }
283
284    /// An OffsetMap that just enforces maximum request sizes.
285    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
286        Self { mapping: None, max_transfer_blocks }
287    }
288
289    pub fn is_empty(&self) -> bool {
290        self.mapping.is_none()
291    }
292
293    fn mapping(&self) -> Option<&BlockOffsetMapping> {
294        self.mapping.as_ref()
295    }
296
297    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
298        self.max_transfer_blocks
299    }
300}
301
302// Methods take Arc<Self> rather than &self because of
303// https://github.com/rust-lang/rust/issues/42940.
304pub trait SessionManager: 'static {
305    const SUPPORTS_DECOMPRESSION: bool;
306
307    type Session;
308
309    fn on_attach_vmo(
310        self: Arc<Self>,
311        vmo: &Arc<zx::Vmo>,
312    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
313
314    /// Creates a new session to handle `stream`.
315    /// The returned future should run until the session completes, for example when the client end
316    /// closes.
317    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
318    fn open_session(
319        self: Arc<Self>,
320        stream: fblock::SessionRequestStream,
321        offset_map: OffsetMap,
322        block_size: u32,
323    ) -> impl Future<Output = Result<(), Error>> + Send;
324
325    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
326    fn get_info(&self) -> Cow<'_, DeviceInfo>;
327
328    /// Called to handle the GetVolumeInfo FIDL call.
329    fn get_volume_info(
330        &self,
331    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
332    {
333        async { Err(zx::Status::NOT_SUPPORTED) }
334    }
335
336    /// Called to handle the QuerySlices FIDL call.
337    fn query_slices(
338        &self,
339        _start_slices: &[u64],
340    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
341        async { Err(zx::Status::NOT_SUPPORTED) }
342    }
343
344    /// Called to handle the Shrink FIDL call.
345    fn extend(
346        &self,
347        _start_slice: u64,
348        _slice_count: u64,
349    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
350        async { Err(zx::Status::NOT_SUPPORTED) }
351    }
352
353    /// Called to handle the Shrink FIDL call.
354    fn shrink(
355        &self,
356        _start_slice: u64,
357        _slice_count: u64,
358    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
359        async { Err(zx::Status::NOT_SUPPORTED) }
360    }
361
362    /// Returns the active requests.
363    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
364}
365
366pub trait IntoSessionManager {
367    type SM: SessionManager;
368
369    fn into_session_manager(self) -> Arc<Self::SM>;
370}
371
372impl<SM: SessionManager> BlockServer<SM> {
373    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
374        Self { block_size, session_manager: session_manager.into_session_manager() }
375    }
376
377    pub fn session_manager(&self) -> &SM {
378        self.session_manager.as_ref()
379    }
380
381    /// Called to process requests for fuchsia.storage.block.Block.
382    pub async fn handle_requests(
383        &self,
384        mut requests: fblock::BlockRequestStream,
385    ) -> Result<(), Error> {
386        let scope = fasync::Scope::new();
387        loop {
388            match requests.try_next().await {
389                Ok(Some(request)) => {
390                    if let Some(session) = self.handle_request(request).await? {
391                        scope.spawn(session.map(|_| ()));
392                    }
393                }
394                Ok(None) => break,
395                Err(err) => log::warn!(err:?; "Invalid request"),
396            }
397        }
398        scope.await;
399        Ok(())
400    }
401
402    /// Processes a Block request.  If a new session task is created in response to the request,
403    /// it is returned.
404    async fn handle_request(
405        &self,
406        request: fblock::BlockRequest,
407    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
408        match request {
409            fblock::BlockRequest::GetInfo { responder } => {
410                let info = self.device_info();
411                let max_transfer_size = info.max_transfer_size(self.block_size);
412                let (block_count, mut flags) = match info.as_ref() {
413                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
414                        (*block_count, *device_flags)
415                    }
416                    DeviceInfo::Partition(partition_info) => {
417                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
418                            range.end - range.start
419                        } else {
420                            let volume_info = self.session_manager.get_volume_info().await?;
421                            volume_info.0.slice_size * volume_info.1.partition_slice_count
422                                / self.block_size as u64
423                        };
424                        (block_count, partition_info.device_flags)
425                    }
426                };
427                if SM::SUPPORTS_DECOMPRESSION {
428                    flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
429                }
430                responder.send(Ok(&fblock::BlockInfo {
431                    block_count,
432                    block_size: self.block_size,
433                    max_transfer_size,
434                    flags,
435                }))?;
436            }
437            fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
438                let info = self.device_info();
439                return Ok(Some(self.session_manager.clone().open_session(
440                    session.into_stream(),
441                    OffsetMap::empty(info.max_transfer_blocks()),
442                    self.block_size,
443                )));
444            }
445            fblock::BlockRequest::OpenSessionWithOffsetMap {
446                session,
447                mapping,
448                control_handle: _,
449            } => {
450                let info = self.device_info();
451                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
452                    Ok(m) => m,
453                    Err(status) => {
454                        session.close_with_epitaph(status)?;
455                        return Ok(None);
456                    }
457                };
458                if let Some(max) = info.block_count() {
459                    if initial_mapping.target_block_offset + initial_mapping.length > max {
460                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
461                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
462                        return Ok(None);
463                    }
464                }
465                return Ok(Some(self.session_manager.clone().open_session(
466                    session.into_stream(),
467                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
468                    self.block_size,
469                )));
470            }
471            fblock::BlockRequest::GetTypeGuid { responder } => {
472                let info = self.device_info();
473                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
474                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
475                    guid.value.copy_from_slice(&partition_info.type_guid);
476                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
477                } else {
478                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
479                }
480            }
481            fblock::BlockRequest::GetInstanceGuid { responder } => {
482                let info = self.device_info();
483                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
484                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
485                    guid.value.copy_from_slice(&partition_info.instance_guid);
486                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
487                } else {
488                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
489                }
490            }
491            fblock::BlockRequest::GetName { responder } => {
492                let info = self.device_info();
493                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
494                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
495                } else {
496                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497                }
498            }
499            fblock::BlockRequest::GetMetadata { responder } => {
500                let info = self.device_info();
501                if let DeviceInfo::Partition(info) = info.as_ref() {
502                    let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503                    type_guid.value.copy_from_slice(&info.type_guid);
504                    let mut instance_guid =
505                        fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
506                    instance_guid.value.copy_from_slice(&info.instance_guid);
507                    responder.send(Ok(&fblock::BlockGetMetadataResponse {
508                        name: Some(info.name.clone()),
509                        type_guid: Some(type_guid),
510                        instance_guid: Some(instance_guid),
511                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
512                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
513                        flags: Some(info.flags),
514                        ..Default::default()
515                    }))?;
516                } else {
517                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
518                }
519            }
520            fblock::BlockRequest::QuerySlices { responder, start_slices } => {
521                match self.session_manager.query_slices(&start_slices).await {
522                    Ok(mut results) => {
523                        let results_len = results.len();
524                        assert!(results_len <= 16);
525                        results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
526                        responder.send(
527                            zx::sys::ZX_OK,
528                            &results.try_into().unwrap(),
529                            results_len as u64,
530                        )?;
531                    }
532                    Err(s) => {
533                        responder.send(
534                            s.into_raw(),
535                            &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
536                            0,
537                        )?;
538                    }
539                }
540            }
541            fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
542                match self.session_manager.get_volume_info().await {
543                    Ok((manager_info, volume_info)) => {
544                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
545                    }
546                    Err(s) => responder.send(s.into_raw(), None, None)?,
547                }
548            }
549            fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
550                responder.send(
551                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
552                        .into_raw(),
553                )?;
554            }
555            fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
556                responder.send(
557                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
558                        .into_raw(),
559                )?;
560            }
561            fblock::BlockRequest::Destroy { responder, .. } => {
562                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
563            }
564        }
565        Ok(None)
566    }
567
568    fn device_info(&self) -> Cow<'_, DeviceInfo> {
569        self.session_manager.get_info()
570    }
571}
572
573struct SessionHelper<SM: SessionManager> {
574    session_manager: Arc<SM>,
575    offset_map: OffsetMap,
576    block_size: u32,
577    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
578    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
579}
580
581struct VmoMapping {
582    base: usize,
583    size: usize,
584}
585
586impl VmoMapping {
587    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
588        let size = vmo.get_size().unwrap() as usize;
589        Ok(Arc::new(Self {
590            base: fuchsia_runtime::vmar_root_self()
591                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
592                .inspect_err(|error| {
593                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
594                })?,
595            size,
596        }))
597    }
598}
599
600impl Drop for VmoMapping {
601    fn drop(&mut self) {
602        // SAFETY: We mapped this in `VmoMapping::new`.
603        unsafe {
604            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
605        }
606    }
607}
608
609impl<SM: SessionManager> SessionHelper<SM> {
610    fn new(
611        session_manager: Arc<SM>,
612        offset_map: OffsetMap,
613        block_size: u32,
614    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
615        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
616        Ok((
617            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
618            fifo,
619        ))
620    }
621
622    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
623        match request {
624            fblock::SessionRequest::GetFifo { responder } => {
625                let rights = zx::Rights::TRANSFER
626                    | zx::Rights::READ
627                    | zx::Rights::WRITE
628                    | zx::Rights::SIGNAL
629                    | zx::Rights::WAIT;
630                match self.peer_fifo.duplicate_handle(rights) {
631                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
632                    Err(s) => responder.send(Err(s.into_raw()))?,
633                }
634                Ok(())
635            }
636            fblock::SessionRequest::AttachVmo { vmo, responder } => {
637                let vmo = Arc::new(vmo);
638                let vmo_id = {
639                    let mut vmos = self.vmos.lock();
640                    if vmos.len() == u16::MAX as usize {
641                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
642                        return Ok(());
643                    } else {
644                        let vmo_id = match vmos.last_entry() {
645                            None => 1,
646                            Some(o) => {
647                                o.key().checked_add(1).unwrap_or_else(|| {
648                                    let mut vmo_id = 1;
649                                    // Find the first gap...
650                                    for (&id, _) in &*vmos {
651                                        if id > vmo_id {
652                                            break;
653                                        }
654                                        vmo_id = id + 1;
655                                    }
656                                    vmo_id
657                                })
658                            }
659                        };
660                        vmos.insert(vmo_id, (vmo.clone(), None));
661                        vmo_id
662                    }
663                };
664                self.session_manager.clone().on_attach_vmo(&vmo).await?;
665                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
666                Ok(())
667            }
668            fblock::SessionRequest::Close { responder } => {
669                responder.send(Ok(()))?;
670                Err(anyhow!("Closed"))
671            }
672        }
673    }
674
675    /// Decodes `request`.
676    fn decode_fifo_request(
677        &self,
678        session: SM::Session,
679        request: &BlockFifoRequest,
680    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
681        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
682
683        let request_bytes = request.length as u64 * self.block_size as u64;
684
685        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
686            .ok_or(zx::Status::INVALID_ARGS)
687            .and_then(|code| {
688                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
689                    if code != BlockOpcode::Read {
690                        return Err(zx::Status::INVALID_ARGS);
691                    }
692                    if !SM::SUPPORTS_DECOMPRESSION {
693                        return Err(zx::Status::NOT_SUPPORTED);
694                    }
695                }
696                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
697                    if request.length == 0 {
698                        return Err(zx::Status::INVALID_ARGS);
699                    }
700                    // Make sure the end offsets won't wrap.
701                    if request.dev_offset.checked_add(request.length as u64).is_none()
702                        || (code != BlockOpcode::Trim
703                            && request_bytes.checked_add(request.vmo_offset).is_none())
704                    {
705                        return Err(zx::Status::OUT_OF_RANGE);
706                    }
707                }
708                Ok(match code {
709                    BlockOpcode::Read => Operation::Read {
710                        device_block_offset: request.dev_offset,
711                        block_count: request.length,
712                        _unused: 0,
713                        vmo_offset: request
714                            .vmo_offset
715                            .checked_mul(self.block_size as u64)
716                            .ok_or(zx::Status::OUT_OF_RANGE)?,
717                        options: ReadOptions {
718                            inline_crypto: InlineCryptoOptions {
719                                dun: request.dun,
720                                slot: request.slot,
721                            },
722                        },
723                    },
724                    BlockOpcode::Write => {
725                        let mut options = WriteOptions {
726                            inline_crypto: InlineCryptoOptions {
727                                dun: request.dun,
728                                slot: request.slot,
729                            },
730                            ..WriteOptions::default()
731                        };
732                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
733                            options.flags |= WriteFlags::FORCE_ACCESS;
734                        }
735                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
736                            options.flags |= WriteFlags::PRE_BARRIER;
737                        }
738                        Operation::Write {
739                            device_block_offset: request.dev_offset,
740                            block_count: request.length,
741                            _unused: 0,
742                            options,
743                            vmo_offset: request
744                                .vmo_offset
745                                .checked_mul(self.block_size as u64)
746                                .ok_or(zx::Status::OUT_OF_RANGE)?,
747                        }
748                    }
749                    BlockOpcode::Flush => Operation::Flush,
750                    BlockOpcode::Trim => Operation::Trim {
751                        device_block_offset: request.dev_offset,
752                        block_count: request.length,
753                    },
754                    BlockOpcode::CloseVmo => Operation::CloseVmo,
755                })
756            });
757
758        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
759            GroupOrRequest::Group(request.group)
760        } else {
761            GroupOrRequest::Request(request.reqid)
762        };
763
764        let mut active_requests = self.session_manager.active_requests().0.lock();
765        let mut request_id = None;
766
767        // Multiple Block I/O request may be sent as a group.
768        // Notes:
769        // - the group is identified by the group id in the request
770        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
771        //   flag is set.
772        // - when processing a request of a group fails, subsequent requests of that
773        //   group will not be processed.
774        // - decompression is a special case, see block-fifo.h for semantics.
775        //
776        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
777        if group_or_request.is_group() {
778            // Search for an existing entry that matches this group.  NOTE: This is a potentially
779            // expensive way to find a group (it's iterating over all slots in the active-requests
780            // slab).  This can be optimised easily should we need to.
781            for (key, group) in &mut active_requests.requests {
782                if group.group_or_request == group_or_request {
783                    if group.req_id.is_some() {
784                        // We have already received a request tagged as last.
785                        if group.status == zx::Status::OK {
786                            group.status = zx::Status::INVALID_ARGS;
787                        }
788                        // Ignore this request.
789                        return Err(None);
790                    }
791                    // See if this is a continuation of a decompressed read.
792                    if group.status == zx::Status::OK
793                        && let Some(info) = &mut group.decompression_info
794                    {
795                        if let Ok(Operation::Read {
796                            device_block_offset,
797                            mut block_count,
798                            options,
799                            vmo_offset: 0,
800                            ..
801                        }) = operation
802                        {
803                            let remaining_bytes = info
804                                .compressed_range
805                                .end
806                                .next_multiple_of(self.block_size as usize)
807                                as u64
808                                - info.bytes_so_far;
809                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
810                                || request.total_compressed_bytes != 0
811                                || request.uncompressed_bytes != 0
812                                || request.compressed_prefix_bytes != 0
813                                || (flags.contains(BlockIoFlag::GROUP_LAST)
814                                    && info.bytes_so_far + request_bytes
815                                        < info.compressed_range.end as u64)
816                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
817                                    && request_bytes >= remaining_bytes)
818                            {
819                                group.status = zx::Status::INVALID_ARGS;
820                            } else {
821                                // We are tolerant of `block_count` being more than we actually
822                                // need.  This can happen if the client is working with a larger
823                                // block size than the device block size.  For example, if Blobfs
824                                // has a 8192 byte block size, but the device might has a 512 byte
825                                // block size, it can ask for a multiple of 16 blocks, when fewer
826                                // than that might actually be required to hold the compressed data.
827                                // It is easier for us to tolerate this here than to get Blobfs to
828                                // change to pass only the blocks that are required.
829                                if request_bytes > remaining_bytes {
830                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
831                                }
832
833                                operation = Ok(Operation::ContinueDecompressedRead {
834                                    offset: info.bytes_so_far,
835                                    device_block_offset,
836                                    block_count,
837                                    options,
838                                });
839
840                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
841                            }
842                        } else {
843                            group.status = zx::Status::INVALID_ARGS;
844                        }
845                    }
846                    if flags.contains(BlockIoFlag::GROUP_LAST) {
847                        group.req_id = Some(request.reqid);
848                        // If the group has had an error, there is no point trying to issue this
849                        // request.
850                        if group.status != zx::Status::OK {
851                            operation = Err(group.status);
852                        }
853                    } else if group.status != zx::Status::OK {
854                        // The group has already encountered an error, so there is no point trying
855                        // to issue this request.
856                        return Err(None);
857                    }
858                    request_id = Some(RequestId(key));
859                    group.count += 1;
860                    break;
861                }
862            }
863        }
864
865        let is_single_request =
866            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
867
868        let mut decompression_info = None;
869        let vmo = match operation {
870            Ok(Operation::Read {
871                device_block_offset,
872                mut block_count,
873                options,
874                vmo_offset,
875                ..
876            }) => match self.vmos.lock().get_mut(&request.vmoid) {
877                Some((vmo, mapping)) => {
878                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
879                        let compressed_range = request.compressed_prefix_bytes as usize
880                            ..request.compressed_prefix_bytes as usize
881                                + request.total_compressed_bytes as usize;
882                        let required_buffer_size =
883                            compressed_range.end.next_multiple_of(self.block_size as usize);
884
885                        // Validate the initial decompression request.
886                        if compressed_range.start >= compressed_range.end
887                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
888                            || (is_single_request && request_bytes < compressed_range.end as u64)
889                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
890                        {
891                            Err(zx::Status::INVALID_ARGS)
892                        } else {
893                            // We are tolerant of `block_count` being more than we actually need.
894                            // This can happen if the client is working in a larger block size than
895                            // the device block size.  For example, Blobfs has a 8192 byte block
896                            // size, but the device might have a 512 byte block size.  It is easier
897                            // for us to tolerate this here than to get Blobfs to change to pass
898                            // only the blocks that are required.
899                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
900                                block_count =
901                                    (required_buffer_size / self.block_size as usize) as u32;
902                                required_buffer_size as u64
903                            } else {
904                                request_bytes
905                            };
906
907                            // To decompress, we need to have the target VMO mapped (cached).
908                            match mapping {
909                                Some(mapping) => Ok(mapping.clone()),
910                                None => {
911                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
912                                }
913                            }
914                            .and_then(|mapping| {
915                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
916                                // range.
917                                if vmo_offset
918                                    .checked_add(request.uncompressed_bytes as u64)
919                                    .is_some_and(|end| end <= mapping.size as u64)
920                                {
921                                    Ok(mapping)
922                                } else {
923                                    Err(zx::Status::OUT_OF_RANGE)
924                                }
925                            })
926                            .map(|mapping| {
927                                // Convert the operation into a `StartDecompressedRead`
928                                // operation. For non-fragmented requests, this will be the only
929                                // operation, but if it's a fragmented read,
930                                // `ContinueDecompressedRead` operations will follow.
931                                operation = Ok(Operation::StartDecompressedRead {
932                                    required_buffer_size,
933                                    device_block_offset,
934                                    block_count,
935                                    options,
936                                });
937                                // Record sufficient information so that we can decompress when all
938                                // the requests complete.
939                                decompression_info = Some(DecompressionInfo {
940                                    compressed_range,
941                                    bytes_so_far,
942                                    mapping,
943                                    uncompressed_range: vmo_offset
944                                        ..vmo_offset + request.uncompressed_bytes as u64,
945                                    buffer: None,
946                                });
947                                None
948                            })
949                        }
950                    } else {
951                        Ok(Some(vmo.clone()))
952                    }
953                }
954                None => Err(zx::Status::IO),
955            },
956            Ok(Operation::Write { .. }) => self
957                .vmos
958                .lock()
959                .get(&request.vmoid)
960                .cloned()
961                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
962            Ok(Operation::CloseVmo) => {
963                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
964                    let vmo_clone = vmo.clone();
965                    // Make sure the VMO is dropped after all current Epoch guards have been
966                    // dropped.
967                    Epoch::global().defer(move || drop(vmo_clone));
968                    Ok(Some(vmo))
969                })
970            }
971            _ => Ok(None),
972        }
973        .unwrap_or_else(|e| {
974            operation = Err(e);
975            None
976        });
977
978        let trace_flow_id = NonZero::new(request.trace_flow_id);
979        let request_id = request_id.unwrap_or_else(|| {
980            RequestId(active_requests.requests.insert(ActiveRequest {
981                session,
982                group_or_request,
983                trace_flow_id,
984                _epoch_guard: Epoch::global().guard(),
985                status: zx::Status::OK,
986                count: 1,
987                req_id: is_single_request.then_some(request.reqid),
988                decompression_info,
989            }))
990        });
991
992        Ok(DecodedRequest {
993            request_id,
994            trace_flow_id,
995            operation: operation.map_err(|status| {
996                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
997            })?,
998            vmo,
999        })
1000    }
1001
1002    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1003        std::mem::take(&mut *self.vmos.lock())
1004    }
1005
1006    /// Maps the request and returns the mapped request with an optional remainder.
1007    fn map_request(
1008        &self,
1009        mut request: DecodedRequest,
1010        active_request: &mut ActiveRequest<SM::Session>,
1011    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1012        if active_request.status != zx::Status::OK {
1013            return Err(zx::Status::BAD_STATE);
1014        }
1015        let mapping = self.offset_map.mapping();
1016        match (mapping, request.operation.blocks()) {
1017            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1018                return Err(zx::Status::OUT_OF_RANGE);
1019            }
1020            _ => {}
1021        }
1022        let remainder = request.operation.map(
1023            self.offset_map.mapping(),
1024            self.offset_map.max_transfer_blocks(),
1025            self.block_size,
1026        );
1027        if remainder.is_some() {
1028            active_request.count += 1;
1029        }
1030        static CACHE: AtomicU64 = AtomicU64::new(0);
1031        if let Some(context) =
1032            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1033        {
1034            use fuchsia_trace::ArgValue;
1035            let trace_args = [
1036                ArgValue::of("request_id", request.request_id.0),
1037                ArgValue::of("opcode", request.operation.trace_label()),
1038            ];
1039            let _scope =
1040                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1041            if let Some(trace_flow_id) = active_request.trace_flow_id {
1042                fuchsia_trace::flow_step(
1043                    &context,
1044                    "block_server::start_transaction",
1045                    trace_flow_id.get().into(),
1046                    &[],
1047                );
1048            }
1049        }
1050        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1051        Ok((request, remainder))
1052    }
1053
1054    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1055        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1056    }
1057}
1058
1059#[repr(transparent)]
1060#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1061pub struct RequestId(usize);
1062
1063#[derive(Clone, Debug)]
1064struct DecodedRequest {
1065    request_id: RequestId,
1066    trace_flow_id: TraceFlowId,
1067    operation: Operation,
1068    vmo: Option<Arc<zx::Vmo>>,
1069}
1070
1071/// cbindgen:no-export
1072pub type WriteFlags = block_protocol::WriteFlags;
1073pub type WriteOptions = block_protocol::WriteOptions;
1074pub type ReadOptions = block_protocol::ReadOptions;
1075
1076#[repr(C)]
1077#[derive(Clone, Debug, PartialEq, Eq)]
1078pub enum Operation {
1079    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1080    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1081    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1082    // write variant).
1083    Read {
1084        device_block_offset: u64,
1085        block_count: u32,
1086        _unused: u32,
1087        vmo_offset: u64,
1088        options: ReadOptions,
1089    },
1090    Write {
1091        device_block_offset: u64,
1092        block_count: u32,
1093        _unused: u32,
1094        vmo_offset: u64,
1095        options: WriteOptions,
1096    },
1097    Flush,
1098    Trim {
1099        device_block_offset: u64,
1100        block_count: u32,
1101    },
1102    /// This will never be seen by the C interface.
1103    CloseVmo,
1104    StartDecompressedRead {
1105        required_buffer_size: usize,
1106        device_block_offset: u64,
1107        block_count: u32,
1108        options: ReadOptions,
1109    },
1110    ContinueDecompressedRead {
1111        offset: u64,
1112        device_block_offset: u64,
1113        block_count: u32,
1114        options: ReadOptions,
1115    },
1116}
1117
1118impl Operation {
1119    fn trace_label(&self) -> &'static str {
1120        match self {
1121            Operation::Read { .. } => "read",
1122            Operation::Write { .. } => "write",
1123            Operation::Flush { .. } => "flush",
1124            Operation::Trim { .. } => "trim",
1125            Operation::CloseVmo { .. } => "close_vmo",
1126            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1127            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1128        }
1129    }
1130
1131    /// Returns (offset, length).
1132    fn blocks(&self) -> Option<(u64, u32)> {
1133        match self {
1134            Operation::Read { device_block_offset, block_count, .. }
1135            | Operation::Write { device_block_offset, block_count, .. }
1136            | Operation::Trim { device_block_offset, block_count, .. } => {
1137                Some((*device_block_offset, *block_count))
1138            }
1139            _ => None,
1140        }
1141    }
1142
1143    /// Returns mutable references to (offset, length).
1144    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1145        match self {
1146            Operation::Read { device_block_offset, block_count, .. }
1147            | Operation::Write { device_block_offset, block_count, .. }
1148            | Operation::Trim { device_block_offset, block_count, .. } => {
1149                Some((device_block_offset, block_count))
1150            }
1151            _ => None,
1152        }
1153    }
1154
1155    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1156    /// start of the operation.
1157    fn map(
1158        &mut self,
1159        mapping: Option<&BlockOffsetMapping>,
1160        max_blocks: Option<NonZero<u32>>,
1161        block_size: u32,
1162    ) -> Option<Self> {
1163        let mut max = match self {
1164            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1165            _ => None,
1166        };
1167        let (offset, length) = self.blocks_mut()?;
1168        let orig_offset = *offset;
1169        if let Some(mapping) = mapping {
1170            let delta = *offset - mapping.source_block_offset;
1171            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1172            *offset = mapping.target_block_offset + delta;
1173            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1174            max = match max {
1175                None => Some(mapping_max),
1176                Some(m) => Some(std::cmp::min(m, mapping_max)),
1177            };
1178        };
1179        if let Some(max) = max {
1180            if *length as u64 > max {
1181                let rem = (*length as u64 - max) as u32;
1182                *length = max as u32;
1183                return Some(match self {
1184                    Operation::Read {
1185                        device_block_offset: _,
1186                        block_count: _,
1187                        vmo_offset,
1188                        _unused,
1189                        options,
1190                    } => {
1191                        let mut options = *options;
1192                        options.inline_crypto.dun += max as u32;
1193                        Operation::Read {
1194                            device_block_offset: orig_offset + max,
1195                            block_count: rem,
1196                            vmo_offset: *vmo_offset + max * block_size as u64,
1197                            _unused: *_unused,
1198                            options: options,
1199                        }
1200                    }
1201                    Operation::Write {
1202                        device_block_offset: _,
1203                        block_count: _,
1204                        _unused,
1205                        vmo_offset,
1206                        options,
1207                    } => {
1208                        let mut options = *options;
1209                        options.inline_crypto.dun += max as u32;
1210                        Operation::Write {
1211                            device_block_offset: orig_offset + max,
1212                            block_count: rem,
1213                            _unused: *_unused,
1214                            vmo_offset: *vmo_offset + max * block_size as u64,
1215                            options: options,
1216                        }
1217                    }
1218                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1219                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1220                    }
1221                    _ => unreachable!(),
1222                });
1223            }
1224        }
1225        None
1226    }
1227
1228    /// Returns true if the specified write flags are set.
1229    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1230        if let Operation::Write { options, .. } = self {
1231            options.flags.contains(value)
1232        } else {
1233            false
1234        }
1235    }
1236
1237    /// Removes `value` from the request's write flags and returns true if the flag was set.
1238    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1239        if let Operation::Write { options, .. } = self {
1240            let result = options.flags.contains(value);
1241            options.flags.remove(value);
1242            result
1243        } else {
1244            false
1245        }
1246    }
1247}
1248
1249#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1250pub enum GroupOrRequest {
1251    Group(u16),
1252    Request(u32),
1253}
1254
1255impl GroupOrRequest {
1256    fn is_group(&self) -> bool {
1257        matches!(self, Self::Group(_))
1258    }
1259
1260    fn group_id(&self) -> Option<u16> {
1261        match self {
1262            Self::Group(id) => Some(*id),
1263            Self::Request(_) => None,
1264        }
1265    }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270    use super::{
1271        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1272        TraceFlowId,
1273    };
1274    use assert_matches::assert_matches;
1275    use block_protocol::{
1276        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1277        WriteFlags, WriteOptions,
1278    };
1279    use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1280    use fuchsia_sync::Mutex;
1281    use futures::FutureExt as _;
1282    use futures::channel::oneshot;
1283    use futures::future::BoxFuture;
1284    use std::borrow::Cow;
1285    use std::future::poll_fn;
1286    use std::num::NonZero;
1287    use std::pin::pin;
1288    use std::sync::Arc;
1289    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1290    use std::task::{Context, Poll};
1291    use zx::HandleBased as _;
1292    use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1293
1294    #[derive(Default)]
1295    struct MockInterface {
1296        read_hook: Option<
1297            Box<
1298                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1299                    + Send
1300                    + Sync,
1301            >,
1302        >,
1303        write_hook:
1304            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1305        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1306    }
1307
1308    impl super::async_interface::Interface for MockInterface {
1309        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1310            Ok(())
1311        }
1312
1313        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1314            Cow::Owned(test_device_info())
1315        }
1316
1317        async fn read(
1318            &self,
1319            device_block_offset: u64,
1320            block_count: u32,
1321            vmo: &Arc<zx::Vmo>,
1322            vmo_offset: u64,
1323            _opts: ReadOptions,
1324            _trace_flow_id: TraceFlowId,
1325        ) -> Result<(), zx::Status> {
1326            if let Some(read_hook) = &self.read_hook {
1327                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1328            } else {
1329                unimplemented!();
1330            }
1331        }
1332
1333        async fn write(
1334            &self,
1335            device_block_offset: u64,
1336            _block_count: u32,
1337            _vmo: &Arc<zx::Vmo>,
1338            _vmo_offset: u64,
1339            opts: WriteOptions,
1340            _trace_flow_id: TraceFlowId,
1341        ) -> Result<(), zx::Status> {
1342            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1343                && let Some(barrier_hook) = &self.barrier_hook
1344            {
1345                barrier_hook()?;
1346            }
1347            if let Some(write_hook) = &self.write_hook {
1348                write_hook(device_block_offset).await
1349            } else {
1350                unimplemented!();
1351            }
1352        }
1353
1354        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1355            Ok(())
1356        }
1357
1358        async fn trim(
1359            &self,
1360            _device_block_offset: u64,
1361            _block_count: u32,
1362            _trace_flow_id: TraceFlowId,
1363        ) -> Result<(), zx::Status> {
1364            unreachable!();
1365        }
1366
1367        async fn get_volume_info(
1368            &self,
1369        ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1370            // Hang forever for the test_requests_dont_block_sessions test.
1371            let () = std::future::pending().await;
1372            unreachable!();
1373        }
1374    }
1375
1376    const BLOCK_SIZE: u32 = 512;
1377    const MAX_TRANSFER_BLOCKS: u32 = 10;
1378
1379    fn test_device_info() -> DeviceInfo {
1380        DeviceInfo::Partition(PartitionInfo {
1381            device_flags: fblock::DeviceFlag::READONLY
1382                | fblock::DeviceFlag::BARRIER_SUPPORT
1383                | fblock::DeviceFlag::FUA_SUPPORT,
1384            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1385            block_range: Some(0..100),
1386            type_guid: [1; 16],
1387            instance_guid: [2; 16],
1388            name: "foo".to_string(),
1389            flags: 0xabcd,
1390        })
1391    }
1392
1393    #[fuchsia::test]
1394    async fn test_barriers_ordering() {
1395        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1396        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1397        let barrier_called = Arc::new(AtomicBool::new(false));
1398
1399        futures::join!(
1400            async move {
1401                let barrier_called_clone = barrier_called.clone();
1402                let block_server = BlockServer::new(
1403                    BLOCK_SIZE,
1404                    Arc::new(MockInterface {
1405                        barrier_hook: Some(Box::new(move || {
1406                            barrier_called.store(true, Ordering::Relaxed);
1407                            Ok(())
1408                        })),
1409                        write_hook: Some(Box::new(move |device_block_offset| {
1410                            let barrier_called = barrier_called_clone.clone();
1411                            Box::pin(async move {
1412                                // The sleep allows the server to reorder the fifo requests.
1413                                if device_block_offset % 2 == 0 {
1414                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1415                                        zx::MonotonicDuration::from_millis(200),
1416                                    ))
1417                                    .await;
1418                                }
1419                                assert!(barrier_called.load(Ordering::Relaxed));
1420                                Ok(())
1421                            })
1422                        })),
1423                        ..MockInterface::default()
1424                    }),
1425                );
1426                block_server.handle_requests(stream).await.unwrap();
1427            },
1428            async move {
1429                let (session_proxy, server) = fidl::endpoints::create_proxy();
1430
1431                proxy.open_session(server).unwrap();
1432
1433                let vmo_id = session_proxy
1434                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1435                    .await
1436                    .unwrap()
1437                    .unwrap();
1438                assert_ne!(vmo_id.id, 0);
1439
1440                let mut fifo =
1441                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1442                let (mut reader, mut writer) = fifo.async_io();
1443
1444                writer
1445                    .write_entries(&BlockFifoRequest {
1446                        command: BlockFifoCommand {
1447                            opcode: BlockOpcode::Write.into_primitive(),
1448                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1449                            ..Default::default()
1450                        },
1451                        vmoid: vmo_id.id,
1452                        dev_offset: 0,
1453                        length: 5,
1454                        vmo_offset: 6,
1455                        ..Default::default()
1456                    })
1457                    .await
1458                    .unwrap();
1459
1460                for i in 0..10 {
1461                    writer
1462                        .write_entries(&BlockFifoRequest {
1463                            command: BlockFifoCommand {
1464                                opcode: BlockOpcode::Write.into_primitive(),
1465                                ..Default::default()
1466                            },
1467                            vmoid: vmo_id.id,
1468                            dev_offset: i + 1,
1469                            length: 5,
1470                            vmo_offset: 6,
1471                            ..Default::default()
1472                        })
1473                        .await
1474                        .unwrap();
1475                }
1476                for _ in 0..11 {
1477                    let mut response = BlockFifoResponse::default();
1478                    reader.read_entries(&mut response).await.unwrap();
1479                    assert_eq!(response.status, zx::sys::ZX_OK);
1480                }
1481
1482                std::mem::drop(proxy);
1483            }
1484        );
1485    }
1486
1487    #[fuchsia::test]
1488    async fn test_info() {
1489        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1490
1491        futures::join!(
1492            async {
1493                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1494                block_server.handle_requests(stream).await.unwrap();
1495            },
1496            async {
1497                let expected_info = test_device_info();
1498                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1499                    info
1500                } else {
1501                    unreachable!()
1502                };
1503
1504                let block_info = proxy.get_info().await.unwrap().unwrap();
1505                assert_eq!(
1506                    block_info.block_count,
1507                    partition_info.block_range.as_ref().unwrap().end
1508                        - partition_info.block_range.as_ref().unwrap().start
1509                );
1510                assert_eq!(
1511                    block_info.flags,
1512                    fblock::DeviceFlag::READONLY
1513                        | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1514                        | fblock::DeviceFlag::BARRIER_SUPPORT
1515                        | fblock::DeviceFlag::FUA_SUPPORT
1516                );
1517
1518                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1519
1520                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1521                assert_eq!(status, zx::sys::ZX_OK);
1522                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1523
1524                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1525                assert_eq!(status, zx::sys::ZX_OK);
1526                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1527
1528                let (status, name) = proxy.get_name().await.unwrap();
1529                assert_eq!(status, zx::sys::ZX_OK);
1530                assert_eq!(name.as_ref(), Some(&partition_info.name));
1531
1532                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1533                assert_eq!(metadata.name, name);
1534                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1535                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1536                assert_eq!(
1537                    metadata.start_block_offset,
1538                    Some(partition_info.block_range.as_ref().unwrap().start)
1539                );
1540                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1541                assert_eq!(metadata.flags, Some(partition_info.flags));
1542
1543                std::mem::drop(proxy);
1544            }
1545        );
1546    }
1547
1548    #[fuchsia::test]
1549    async fn test_attach_vmo() {
1550        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1551
1552        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1553        let koid = vmo.koid().unwrap();
1554
1555        futures::join!(
1556            async {
1557                let block_server = BlockServer::new(
1558                    BLOCK_SIZE,
1559                    Arc::new(MockInterface {
1560                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1561                            assert_eq!(vmo.koid().unwrap(), koid);
1562                            Box::pin(async { Ok(()) })
1563                        })),
1564                        ..MockInterface::default()
1565                    }),
1566                );
1567                block_server.handle_requests(stream).await.unwrap();
1568            },
1569            async move {
1570                let (session_proxy, server) = fidl::endpoints::create_proxy();
1571
1572                proxy.open_session(server).unwrap();
1573
1574                let vmo_id = session_proxy
1575                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1576                    .await
1577                    .unwrap()
1578                    .unwrap();
1579                assert_ne!(vmo_id.id, 0);
1580
1581                let mut fifo =
1582                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1583                let (mut reader, mut writer) = fifo.async_io();
1584
1585                // Keep attaching VMOs until we eventually hit the maximum.
1586                let mut count = 1;
1587                loop {
1588                    match session_proxy
1589                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1590                        .await
1591                        .unwrap()
1592                    {
1593                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1594                        Err(e) => {
1595                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1596                            break;
1597                        }
1598                    }
1599
1600                    // Only test every 10 to keep test time down.
1601                    if count % 10 == 0 {
1602                        writer
1603                            .write_entries(&BlockFifoRequest {
1604                                command: BlockFifoCommand {
1605                                    opcode: BlockOpcode::Read.into_primitive(),
1606                                    ..Default::default()
1607                                },
1608                                vmoid: vmo_id.id,
1609                                length: 1,
1610                                ..Default::default()
1611                            })
1612                            .await
1613                            .unwrap();
1614
1615                        let mut response = BlockFifoResponse::default();
1616                        reader.read_entries(&mut response).await.unwrap();
1617                        assert_eq!(response.status, zx::sys::ZX_OK);
1618                    }
1619
1620                    count += 1;
1621                }
1622
1623                assert_eq!(count, u16::MAX as u64);
1624
1625                // Detach the original VMO, and make sure we can then attach another one.
1626                writer
1627                    .write_entries(&BlockFifoRequest {
1628                        command: BlockFifoCommand {
1629                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1630                            ..Default::default()
1631                        },
1632                        vmoid: vmo_id.id,
1633                        ..Default::default()
1634                    })
1635                    .await
1636                    .unwrap();
1637
1638                let mut response = BlockFifoResponse::default();
1639                reader.read_entries(&mut response).await.unwrap();
1640                assert_eq!(response.status, zx::sys::ZX_OK);
1641
1642                let new_vmo_id = session_proxy
1643                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1644                    .await
1645                    .unwrap()
1646                    .unwrap();
1647                // It should reuse the same ID.
1648                assert_eq!(new_vmo_id.id, vmo_id.id);
1649
1650                std::mem::drop(proxy);
1651            }
1652        );
1653    }
1654
1655    #[fuchsia::test]
1656    async fn test_close() {
1657        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1658
1659        let mut server = std::pin::pin!(
1660            async {
1661                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1662                block_server.handle_requests(stream).await.unwrap();
1663            }
1664            .fuse()
1665        );
1666
1667        let mut client = std::pin::pin!(
1668            async {
1669                let (session_proxy, server) = fidl::endpoints::create_proxy();
1670
1671                proxy.open_session(server).unwrap();
1672
1673                // Dropping the proxy should not cause the session to terminate because the session is
1674                // still live.
1675                std::mem::drop(proxy);
1676
1677                session_proxy.close().await.unwrap().unwrap();
1678
1679                // Keep the session alive.  Calling `close` should cause the server to terminate.
1680                let _: () = std::future::pending().await;
1681            }
1682            .fuse()
1683        );
1684
1685        futures::select!(
1686            _ = server => {}
1687            _ = client => unreachable!(),
1688        );
1689    }
1690
1691    #[derive(Default)]
1692    struct IoMockInterface {
1693        do_checks: bool,
1694        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1695        return_errors: bool,
1696    }
1697
1698    #[derive(Debug)]
1699    enum ExpectedOp {
1700        Read(u64, u32, u64),
1701        Write(u64, u32, u64),
1702        Trim(u64, u32),
1703        Flush,
1704    }
1705
1706    impl super::async_interface::Interface for IoMockInterface {
1707        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1708            Ok(())
1709        }
1710
1711        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1712            Cow::Owned(test_device_info())
1713        }
1714
1715        async fn read(
1716            &self,
1717            device_block_offset: u64,
1718            block_count: u32,
1719            _vmo: &Arc<zx::Vmo>,
1720            vmo_offset: u64,
1721            _opts: ReadOptions,
1722            _trace_flow_id: TraceFlowId,
1723        ) -> Result<(), zx::Status> {
1724            if self.return_errors {
1725                Err(zx::Status::INTERNAL)
1726            } else {
1727                if self.do_checks {
1728                    assert_matches!(
1729                        self.expected_op.lock().take(),
1730                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1731                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1732                        "Read {device_block_offset} {block_count} {vmo_offset}"
1733                    );
1734                }
1735                Ok(())
1736            }
1737        }
1738
1739        async fn write(
1740            &self,
1741            device_block_offset: u64,
1742            block_count: u32,
1743            _vmo: &Arc<zx::Vmo>,
1744            vmo_offset: u64,
1745            _write_opts: WriteOptions,
1746            _trace_flow_id: TraceFlowId,
1747        ) -> Result<(), zx::Status> {
1748            if self.return_errors {
1749                Err(zx::Status::NOT_SUPPORTED)
1750            } else {
1751                if self.do_checks {
1752                    assert_matches!(
1753                        self.expected_op.lock().take(),
1754                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1755                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1756                        "Write {device_block_offset} {block_count} {vmo_offset}"
1757                    );
1758                }
1759                Ok(())
1760            }
1761        }
1762
1763        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1764            if self.return_errors {
1765                Err(zx::Status::NO_RESOURCES)
1766            } else {
1767                if self.do_checks {
1768                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1769                }
1770                Ok(())
1771            }
1772        }
1773
1774        async fn trim(
1775            &self,
1776            device_block_offset: u64,
1777            block_count: u32,
1778            _trace_flow_id: TraceFlowId,
1779        ) -> Result<(), zx::Status> {
1780            if self.return_errors {
1781                Err(zx::Status::NO_MEMORY)
1782            } else {
1783                if self.do_checks {
1784                    assert_matches!(
1785                        self.expected_op.lock().take(),
1786                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1787                            block_count == b,
1788                        "Trim {device_block_offset} {block_count}"
1789                    );
1790                }
1791                Ok(())
1792            }
1793        }
1794    }
1795
1796    #[fuchsia::test]
1797    async fn test_io() {
1798        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1799
1800        let expected_op = Arc::new(Mutex::new(None));
1801        let expected_op_clone = expected_op.clone();
1802
1803        let server = async {
1804            let block_server = BlockServer::new(
1805                BLOCK_SIZE,
1806                Arc::new(IoMockInterface {
1807                    return_errors: false,
1808                    do_checks: true,
1809                    expected_op: expected_op_clone,
1810                }),
1811            );
1812            block_server.handle_requests(stream).await.unwrap();
1813        };
1814
1815        let client = async move {
1816            let (session_proxy, server) = fidl::endpoints::create_proxy();
1817
1818            proxy.open_session(server).unwrap();
1819
1820            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1821            let vmo_id = session_proxy
1822                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1823                .await
1824                .unwrap()
1825                .unwrap();
1826
1827            let mut fifo =
1828                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1829            let (mut reader, mut writer) = fifo.async_io();
1830
1831            // READ
1832            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1833            writer
1834                .write_entries(&BlockFifoRequest {
1835                    command: BlockFifoCommand {
1836                        opcode: BlockOpcode::Read.into_primitive(),
1837                        ..Default::default()
1838                    },
1839                    vmoid: vmo_id.id,
1840                    dev_offset: 1,
1841                    length: 2,
1842                    vmo_offset: 3,
1843                    ..Default::default()
1844                })
1845                .await
1846                .unwrap();
1847
1848            let mut response = BlockFifoResponse::default();
1849            reader.read_entries(&mut response).await.unwrap();
1850            assert_eq!(response.status, zx::sys::ZX_OK);
1851
1852            // WRITE
1853            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1854            writer
1855                .write_entries(&BlockFifoRequest {
1856                    command: BlockFifoCommand {
1857                        opcode: BlockOpcode::Write.into_primitive(),
1858                        ..Default::default()
1859                    },
1860                    vmoid: vmo_id.id,
1861                    dev_offset: 4,
1862                    length: 5,
1863                    vmo_offset: 6,
1864                    ..Default::default()
1865                })
1866                .await
1867                .unwrap();
1868
1869            let mut response = BlockFifoResponse::default();
1870            reader.read_entries(&mut response).await.unwrap();
1871            assert_eq!(response.status, zx::sys::ZX_OK);
1872
1873            // FLUSH
1874            *expected_op.lock() = Some(ExpectedOp::Flush);
1875            writer
1876                .write_entries(&BlockFifoRequest {
1877                    command: BlockFifoCommand {
1878                        opcode: BlockOpcode::Flush.into_primitive(),
1879                        ..Default::default()
1880                    },
1881                    ..Default::default()
1882                })
1883                .await
1884                .unwrap();
1885
1886            reader.read_entries(&mut response).await.unwrap();
1887            assert_eq!(response.status, zx::sys::ZX_OK);
1888
1889            // TRIM
1890            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1891            writer
1892                .write_entries(&BlockFifoRequest {
1893                    command: BlockFifoCommand {
1894                        opcode: BlockOpcode::Trim.into_primitive(),
1895                        ..Default::default()
1896                    },
1897                    dev_offset: 7,
1898                    length: 8,
1899                    ..Default::default()
1900                })
1901                .await
1902                .unwrap();
1903
1904            reader.read_entries(&mut response).await.unwrap();
1905            assert_eq!(response.status, zx::sys::ZX_OK);
1906
1907            std::mem::drop(proxy);
1908        };
1909
1910        futures::join!(server, client);
1911    }
1912
1913    #[fuchsia::test]
1914    async fn test_io_errors() {
1915        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1916
1917        futures::join!(
1918            async {
1919                let block_server = BlockServer::new(
1920                    BLOCK_SIZE,
1921                    Arc::new(IoMockInterface {
1922                        return_errors: true,
1923                        do_checks: false,
1924                        expected_op: Arc::new(Mutex::new(None)),
1925                    }),
1926                );
1927                block_server.handle_requests(stream).await.unwrap();
1928            },
1929            async move {
1930                let (session_proxy, server) = fidl::endpoints::create_proxy();
1931
1932                proxy.open_session(server).unwrap();
1933
1934                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1935                let vmo_id = session_proxy
1936                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1937                    .await
1938                    .unwrap()
1939                    .unwrap();
1940
1941                let mut fifo =
1942                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1943                let (mut reader, mut writer) = fifo.async_io();
1944
1945                // READ
1946                writer
1947                    .write_entries(&BlockFifoRequest {
1948                        command: BlockFifoCommand {
1949                            opcode: BlockOpcode::Read.into_primitive(),
1950                            ..Default::default()
1951                        },
1952                        vmoid: vmo_id.id,
1953                        length: 1,
1954                        reqid: 1,
1955                        ..Default::default()
1956                    })
1957                    .await
1958                    .unwrap();
1959
1960                let mut response = BlockFifoResponse::default();
1961                reader.read_entries(&mut response).await.unwrap();
1962                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1963
1964                // WRITE
1965                writer
1966                    .write_entries(&BlockFifoRequest {
1967                        command: BlockFifoCommand {
1968                            opcode: BlockOpcode::Write.into_primitive(),
1969                            ..Default::default()
1970                        },
1971                        vmoid: vmo_id.id,
1972                        length: 1,
1973                        reqid: 2,
1974                        ..Default::default()
1975                    })
1976                    .await
1977                    .unwrap();
1978
1979                reader.read_entries(&mut response).await.unwrap();
1980                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1981
1982                // FLUSH
1983                writer
1984                    .write_entries(&BlockFifoRequest {
1985                        command: BlockFifoCommand {
1986                            opcode: BlockOpcode::Flush.into_primitive(),
1987                            ..Default::default()
1988                        },
1989                        reqid: 3,
1990                        ..Default::default()
1991                    })
1992                    .await
1993                    .unwrap();
1994
1995                reader.read_entries(&mut response).await.unwrap();
1996                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1997
1998                // TRIM
1999                writer
2000                    .write_entries(&BlockFifoRequest {
2001                        command: BlockFifoCommand {
2002                            opcode: BlockOpcode::Trim.into_primitive(),
2003                            ..Default::default()
2004                        },
2005                        reqid: 4,
2006                        length: 1,
2007                        ..Default::default()
2008                    })
2009                    .await
2010                    .unwrap();
2011
2012                reader.read_entries(&mut response).await.unwrap();
2013                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2014
2015                std::mem::drop(proxy);
2016            }
2017        );
2018    }
2019
2020    #[fuchsia::test]
2021    async fn test_invalid_args() {
2022        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2023
2024        futures::join!(
2025            async {
2026                let block_server = BlockServer::new(
2027                    BLOCK_SIZE,
2028                    Arc::new(IoMockInterface {
2029                        return_errors: false,
2030                        do_checks: false,
2031                        expected_op: Arc::new(Mutex::new(None)),
2032                    }),
2033                );
2034                block_server.handle_requests(stream).await.unwrap();
2035            },
2036            async move {
2037                let (session_proxy, server) = fidl::endpoints::create_proxy();
2038
2039                proxy.open_session(server).unwrap();
2040
2041                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2042                let vmo_id = session_proxy
2043                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2044                    .await
2045                    .unwrap()
2046                    .unwrap();
2047
2048                let mut fifo =
2049                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2050
2051                async fn test(
2052                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2053                    request: BlockFifoRequest,
2054                ) -> Result<(), zx::Status> {
2055                    let (mut reader, mut writer) = fifo.async_io();
2056                    writer.write_entries(&request).await.unwrap();
2057                    let mut response = BlockFifoResponse::default();
2058                    reader.read_entries(&mut response).await.unwrap();
2059                    zx::Status::ok(response.status)
2060                }
2061
2062                // READ
2063
2064                let good_read_request = || BlockFifoRequest {
2065                    command: BlockFifoCommand {
2066                        opcode: BlockOpcode::Read.into_primitive(),
2067                        ..Default::default()
2068                    },
2069                    length: 1,
2070                    vmoid: vmo_id.id,
2071                    ..Default::default()
2072                };
2073
2074                assert_eq!(
2075                    test(
2076                        &mut fifo,
2077                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2078                    )
2079                    .await,
2080                    Err(zx::Status::IO)
2081                );
2082
2083                assert_eq!(
2084                    test(
2085                        &mut fifo,
2086                        BlockFifoRequest {
2087                            vmo_offset: 0xffff_ffff_ffff_ffff,
2088                            ..good_read_request()
2089                        }
2090                    )
2091                    .await,
2092                    Err(zx::Status::OUT_OF_RANGE)
2093                );
2094
2095                assert_eq!(
2096                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2097                    Err(zx::Status::INVALID_ARGS)
2098                );
2099
2100                // WRITE
2101
2102                let good_write_request = || BlockFifoRequest {
2103                    command: BlockFifoCommand {
2104                        opcode: BlockOpcode::Write.into_primitive(),
2105                        ..Default::default()
2106                    },
2107                    length: 1,
2108                    vmoid: vmo_id.id,
2109                    ..Default::default()
2110                };
2111
2112                assert_eq!(
2113                    test(
2114                        &mut fifo,
2115                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2116                    )
2117                    .await,
2118                    Err(zx::Status::IO)
2119                );
2120
2121                assert_eq!(
2122                    test(
2123                        &mut fifo,
2124                        BlockFifoRequest {
2125                            vmo_offset: 0xffff_ffff_ffff_ffff,
2126                            ..good_write_request()
2127                        }
2128                    )
2129                    .await,
2130                    Err(zx::Status::OUT_OF_RANGE)
2131                );
2132
2133                assert_eq!(
2134                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2135                    Err(zx::Status::INVALID_ARGS)
2136                );
2137
2138                // CLOSE VMO
2139
2140                assert_eq!(
2141                    test(
2142                        &mut fifo,
2143                        BlockFifoRequest {
2144                            command: BlockFifoCommand {
2145                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2146                                ..Default::default()
2147                            },
2148                            vmoid: vmo_id.id + 1,
2149                            ..Default::default()
2150                        }
2151                    )
2152                    .await,
2153                    Err(zx::Status::IO)
2154                );
2155
2156                std::mem::drop(proxy);
2157            }
2158        );
2159    }
2160
2161    #[fuchsia::test]
2162    async fn test_concurrent_requests() {
2163        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2164
2165        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2166        let waiting_readers_clone = waiting_readers.clone();
2167
2168        futures::join!(
2169            async move {
2170                let block_server = BlockServer::new(
2171                    BLOCK_SIZE,
2172                    Arc::new(MockInterface {
2173                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2174                            let (tx, rx) = oneshot::channel();
2175                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2176                            Box::pin(async move {
2177                                let _ = rx.await;
2178                                Ok(())
2179                            })
2180                        })),
2181                        ..MockInterface::default()
2182                    }),
2183                );
2184                block_server.handle_requests(stream).await.unwrap();
2185            },
2186            async move {
2187                let (session_proxy, server) = fidl::endpoints::create_proxy();
2188
2189                proxy.open_session(server).unwrap();
2190
2191                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2192                let vmo_id = session_proxy
2193                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2194                    .await
2195                    .unwrap()
2196                    .unwrap();
2197
2198                let mut fifo =
2199                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2200                let (mut reader, mut writer) = fifo.async_io();
2201
2202                writer
2203                    .write_entries(&BlockFifoRequest {
2204                        command: BlockFifoCommand {
2205                            opcode: BlockOpcode::Read.into_primitive(),
2206                            ..Default::default()
2207                        },
2208                        reqid: 1,
2209                        dev_offset: 1, // Intentionally use the same as `reqid`.
2210                        vmoid: vmo_id.id,
2211                        length: 1,
2212                        ..Default::default()
2213                    })
2214                    .await
2215                    .unwrap();
2216
2217                writer
2218                    .write_entries(&BlockFifoRequest {
2219                        command: BlockFifoCommand {
2220                            opcode: BlockOpcode::Read.into_primitive(),
2221                            ..Default::default()
2222                        },
2223                        reqid: 2,
2224                        dev_offset: 2,
2225                        vmoid: vmo_id.id,
2226                        length: 1,
2227                        ..Default::default()
2228                    })
2229                    .await
2230                    .unwrap();
2231
2232                // Wait till both those entries are pending.
2233                poll_fn(|cx: &mut Context<'_>| {
2234                    if waiting_readers.lock().len() == 2 {
2235                        Poll::Ready(())
2236                    } else {
2237                        // Yield to the executor.
2238                        cx.waker().wake_by_ref();
2239                        Poll::Pending
2240                    }
2241                })
2242                .await;
2243
2244                let mut response = BlockFifoResponse::default();
2245                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2246
2247                let (id, tx) = waiting_readers.lock().pop().unwrap();
2248                tx.send(()).unwrap();
2249
2250                reader.read_entries(&mut response).await.unwrap();
2251                assert_eq!(response.status, zx::sys::ZX_OK);
2252                assert_eq!(response.reqid, id);
2253
2254                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2255
2256                let (id, tx) = waiting_readers.lock().pop().unwrap();
2257                tx.send(()).unwrap();
2258
2259                reader.read_entries(&mut response).await.unwrap();
2260                assert_eq!(response.status, zx::sys::ZX_OK);
2261                assert_eq!(response.reqid, id);
2262            }
2263        );
2264    }
2265
2266    #[fuchsia::test]
2267    async fn test_groups() {
2268        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2269
2270        futures::join!(
2271            async move {
2272                let block_server = BlockServer::new(
2273                    BLOCK_SIZE,
2274                    Arc::new(MockInterface {
2275                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2276                        ..MockInterface::default()
2277                    }),
2278                );
2279                block_server.handle_requests(stream).await.unwrap();
2280            },
2281            async move {
2282                let (session_proxy, server) = fidl::endpoints::create_proxy();
2283
2284                proxy.open_session(server).unwrap();
2285
2286                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2287                let vmo_id = session_proxy
2288                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2289                    .await
2290                    .unwrap()
2291                    .unwrap();
2292
2293                let mut fifo =
2294                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2295                let (mut reader, mut writer) = fifo.async_io();
2296
2297                writer
2298                    .write_entries(&BlockFifoRequest {
2299                        command: BlockFifoCommand {
2300                            opcode: BlockOpcode::Read.into_primitive(),
2301                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2302                            ..Default::default()
2303                        },
2304                        group: 1,
2305                        vmoid: vmo_id.id,
2306                        length: 1,
2307                        ..Default::default()
2308                    })
2309                    .await
2310                    .unwrap();
2311
2312                writer
2313                    .write_entries(&BlockFifoRequest {
2314                        command: BlockFifoCommand {
2315                            opcode: BlockOpcode::Read.into_primitive(),
2316                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2317                            ..Default::default()
2318                        },
2319                        reqid: 2,
2320                        group: 1,
2321                        vmoid: vmo_id.id,
2322                        length: 1,
2323                        ..Default::default()
2324                    })
2325                    .await
2326                    .unwrap();
2327
2328                let mut response = BlockFifoResponse::default();
2329                reader.read_entries(&mut response).await.unwrap();
2330                assert_eq!(response.status, zx::sys::ZX_OK);
2331                assert_eq!(response.reqid, 2);
2332                assert_eq!(response.group, 1);
2333            }
2334        );
2335    }
2336
2337    #[fuchsia::test]
2338    async fn test_group_error() {
2339        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2340
2341        let counter = Arc::new(AtomicU64::new(0));
2342        let counter_clone = counter.clone();
2343
2344        futures::join!(
2345            async move {
2346                let block_server = BlockServer::new(
2347                    BLOCK_SIZE,
2348                    Arc::new(MockInterface {
2349                        read_hook: Some(Box::new(move |_, _, _, _| {
2350                            counter_clone.fetch_add(1, Ordering::Relaxed);
2351                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2352                        })),
2353                        ..MockInterface::default()
2354                    }),
2355                );
2356                block_server.handle_requests(stream).await.unwrap();
2357            },
2358            async move {
2359                let (session_proxy, server) = fidl::endpoints::create_proxy();
2360
2361                proxy.open_session(server).unwrap();
2362
2363                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2364                let vmo_id = session_proxy
2365                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2366                    .await
2367                    .unwrap()
2368                    .unwrap();
2369
2370                let mut fifo =
2371                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2372                let (mut reader, mut writer) = fifo.async_io();
2373
2374                writer
2375                    .write_entries(&BlockFifoRequest {
2376                        command: BlockFifoCommand {
2377                            opcode: BlockOpcode::Read.into_primitive(),
2378                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2379                            ..Default::default()
2380                        },
2381                        group: 1,
2382                        vmoid: vmo_id.id,
2383                        length: 1,
2384                        ..Default::default()
2385                    })
2386                    .await
2387                    .unwrap();
2388
2389                // Wait until processed.
2390                poll_fn(|cx: &mut Context<'_>| {
2391                    if counter.load(Ordering::Relaxed) == 1 {
2392                        Poll::Ready(())
2393                    } else {
2394                        // Yield to the executor.
2395                        cx.waker().wake_by_ref();
2396                        Poll::Pending
2397                    }
2398                })
2399                .await;
2400
2401                let mut response = BlockFifoResponse::default();
2402                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2403
2404                writer
2405                    .write_entries(&BlockFifoRequest {
2406                        command: BlockFifoCommand {
2407                            opcode: BlockOpcode::Read.into_primitive(),
2408                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2409                            ..Default::default()
2410                        },
2411                        group: 1,
2412                        vmoid: vmo_id.id,
2413                        length: 1,
2414                        ..Default::default()
2415                    })
2416                    .await
2417                    .unwrap();
2418
2419                writer
2420                    .write_entries(&BlockFifoRequest {
2421                        command: BlockFifoCommand {
2422                            opcode: BlockOpcode::Read.into_primitive(),
2423                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2424                            ..Default::default()
2425                        },
2426                        reqid: 2,
2427                        group: 1,
2428                        vmoid: vmo_id.id,
2429                        length: 1,
2430                        ..Default::default()
2431                    })
2432                    .await
2433                    .unwrap();
2434
2435                reader.read_entries(&mut response).await.unwrap();
2436                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2437                assert_eq!(response.reqid, 2);
2438                assert_eq!(response.group, 1);
2439
2440                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2441
2442                // Only the first request should have been processed.
2443                assert_eq!(counter.load(Ordering::Relaxed), 1);
2444            }
2445        );
2446    }
2447
2448    #[fuchsia::test]
2449    async fn test_group_with_two_lasts() {
2450        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2451
2452        let (tx, rx) = oneshot::channel();
2453
2454        futures::join!(
2455            async move {
2456                let rx = Mutex::new(Some(rx));
2457                let block_server = BlockServer::new(
2458                    BLOCK_SIZE,
2459                    Arc::new(MockInterface {
2460                        read_hook: Some(Box::new(move |_, _, _, _| {
2461                            let rx = rx.lock().take().unwrap();
2462                            Box::pin(async {
2463                                let _ = rx.await;
2464                                Ok(())
2465                            })
2466                        })),
2467                        ..MockInterface::default()
2468                    }),
2469                );
2470                block_server.handle_requests(stream).await.unwrap();
2471            },
2472            async move {
2473                let (session_proxy, server) = fidl::endpoints::create_proxy();
2474
2475                proxy.open_session(server).unwrap();
2476
2477                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2478                let vmo_id = session_proxy
2479                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2480                    .await
2481                    .unwrap()
2482                    .unwrap();
2483
2484                let mut fifo =
2485                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2486                let (mut reader, mut writer) = fifo.async_io();
2487
2488                writer
2489                    .write_entries(&BlockFifoRequest {
2490                        command: BlockFifoCommand {
2491                            opcode: BlockOpcode::Read.into_primitive(),
2492                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2493                            ..Default::default()
2494                        },
2495                        reqid: 1,
2496                        group: 1,
2497                        vmoid: vmo_id.id,
2498                        length: 1,
2499                        ..Default::default()
2500                    })
2501                    .await
2502                    .unwrap();
2503
2504                writer
2505                    .write_entries(&BlockFifoRequest {
2506                        command: BlockFifoCommand {
2507                            opcode: BlockOpcode::Read.into_primitive(),
2508                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2509                            ..Default::default()
2510                        },
2511                        reqid: 2,
2512                        group: 1,
2513                        vmoid: vmo_id.id,
2514                        length: 1,
2515                        ..Default::default()
2516                    })
2517                    .await
2518                    .unwrap();
2519
2520                // Send an independent request to flush through the fifo.
2521                writer
2522                    .write_entries(&BlockFifoRequest {
2523                        command: BlockFifoCommand {
2524                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2525                            ..Default::default()
2526                        },
2527                        reqid: 3,
2528                        vmoid: vmo_id.id,
2529                        ..Default::default()
2530                    })
2531                    .await
2532                    .unwrap();
2533
2534                // It should succeed.
2535                let mut response = BlockFifoResponse::default();
2536                reader.read_entries(&mut response).await.unwrap();
2537                assert_eq!(response.status, zx::sys::ZX_OK);
2538                assert_eq!(response.reqid, 3);
2539
2540                // Now release the original request.
2541                tx.send(()).unwrap();
2542
2543                // The response should be for the first message tagged as last, and it should be
2544                // an error because we sent two messages with the LAST marker.
2545                let mut response = BlockFifoResponse::default();
2546                reader.read_entries(&mut response).await.unwrap();
2547                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2548                assert_eq!(response.reqid, 1);
2549                assert_eq!(response.group, 1);
2550            }
2551        );
2552    }
2553
2554    #[fuchsia::test(allow_stalls = false)]
2555    async fn test_requests_dont_block_sessions() {
2556        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2557
2558        let (tx, rx) = oneshot::channel();
2559
2560        fasync::Task::local(async move {
2561            let rx = Mutex::new(Some(rx));
2562            let block_server = BlockServer::new(
2563                BLOCK_SIZE,
2564                Arc::new(MockInterface {
2565                    read_hook: Some(Box::new(move |_, _, _, _| {
2566                        let rx = rx.lock().take().unwrap();
2567                        Box::pin(async {
2568                            let _ = rx.await;
2569                            Ok(())
2570                        })
2571                    })),
2572                    ..MockInterface::default()
2573                }),
2574            );
2575            block_server.handle_requests(stream).await.unwrap();
2576        })
2577        .detach();
2578
2579        let mut fut = pin!(async {
2580            let (session_proxy, server) = fidl::endpoints::create_proxy();
2581
2582            proxy.open_session(server).unwrap();
2583
2584            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2585            let vmo_id = session_proxy
2586                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2587                .await
2588                .unwrap()
2589                .unwrap();
2590
2591            let mut fifo =
2592                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2593            let (mut reader, mut writer) = fifo.async_io();
2594
2595            writer
2596                .write_entries(&BlockFifoRequest {
2597                    command: BlockFifoCommand {
2598                        opcode: BlockOpcode::Read.into_primitive(),
2599                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2600                        ..Default::default()
2601                    },
2602                    reqid: 1,
2603                    group: 1,
2604                    vmoid: vmo_id.id,
2605                    length: 1,
2606                    ..Default::default()
2607                })
2608                .await
2609                .unwrap();
2610
2611            let mut response = BlockFifoResponse::default();
2612            reader.read_entries(&mut response).await.unwrap();
2613            assert_eq!(response.status, zx::sys::ZX_OK);
2614        });
2615
2616        // The response won't come back until we send on `tx`.
2617        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2618
2619        let mut fut2 = pin!(proxy.get_volume_info());
2620
2621        // get_volume_info is set up to stall forever.
2622        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2623
2624        // If we now free up the first future, it should resolve; the stalled call to
2625        // get_volume_info should not block the fifo response.
2626        let _ = tx.send(());
2627
2628        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2629    }
2630
2631    #[fuchsia::test]
2632    async fn test_request_flow_control() {
2633        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2634
2635        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2636        // server will block until that happens.
2637        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2638        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2639        let event_clone = event.clone();
2640        futures::join!(
2641            async move {
2642                let block_server = BlockServer::new(
2643                    BLOCK_SIZE,
2644                    Arc::new(MockInterface {
2645                        read_hook: Some(Box::new(move |_, _, _, _| {
2646                            let event_clone = event_clone.clone();
2647                            Box::pin(async move {
2648                                if !event_clone.1.load(Ordering::SeqCst) {
2649                                    event_clone.0.listen().await;
2650                                }
2651                                Ok(())
2652                            })
2653                        })),
2654                        ..MockInterface::default()
2655                    }),
2656                );
2657                block_server.handle_requests(stream).await.unwrap();
2658            },
2659            async move {
2660                let (session_proxy, server) = fidl::endpoints::create_proxy();
2661
2662                proxy.open_session(server).unwrap();
2663
2664                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2665                let vmo_id = session_proxy
2666                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2667                    .await
2668                    .unwrap()
2669                    .unwrap();
2670
2671                let mut fifo =
2672                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2673                let (mut reader, mut writer) = fifo.async_io();
2674
2675                for i in 0..MAX_REQUESTS {
2676                    writer
2677                        .write_entries(&BlockFifoRequest {
2678                            command: BlockFifoCommand {
2679                                opcode: BlockOpcode::Read.into_primitive(),
2680                                ..Default::default()
2681                            },
2682                            reqid: (i + 1) as u32,
2683                            dev_offset: i,
2684                            vmoid: vmo_id.id,
2685                            length: 1,
2686                            ..Default::default()
2687                        })
2688                        .await
2689                        .unwrap();
2690                }
2691                assert!(
2692                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2693                        command: BlockFifoCommand {
2694                            opcode: BlockOpcode::Read.into_primitive(),
2695                            ..Default::default()
2696                        },
2697                        reqid: u32::MAX,
2698                        dev_offset: MAX_REQUESTS,
2699                        vmoid: vmo_id.id,
2700                        length: 1,
2701                        ..Default::default()
2702                    })))
2703                    .is_pending()
2704                );
2705                // OK, let the server start to process.
2706                event.1.store(true, Ordering::SeqCst);
2707                event.0.notify(usize::MAX);
2708                // For each entry we read, make sure we can write a new one in.
2709                let mut finished_reqids = vec![];
2710                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2711                    let mut response = BlockFifoResponse::default();
2712                    reader.read_entries(&mut response).await.unwrap();
2713                    assert_eq!(response.status, zx::sys::ZX_OK);
2714                    finished_reqids.push(response.reqid);
2715                    writer
2716                        .write_entries(&BlockFifoRequest {
2717                            command: BlockFifoCommand {
2718                                opcode: BlockOpcode::Read.into_primitive(),
2719                                ..Default::default()
2720                            },
2721                            reqid: (i + 1) as u32,
2722                            dev_offset: i,
2723                            vmoid: vmo_id.id,
2724                            length: 1,
2725                            ..Default::default()
2726                        })
2727                        .await
2728                        .unwrap();
2729                }
2730                let mut response = BlockFifoResponse::default();
2731                for _ in 0..MAX_REQUESTS {
2732                    reader.read_entries(&mut response).await.unwrap();
2733                    assert_eq!(response.status, zx::sys::ZX_OK);
2734                    finished_reqids.push(response.reqid);
2735                }
2736                // Verify that we got a response for each request.  Note that we can't assume FIFO
2737                // ordering.
2738                finished_reqids.sort();
2739                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2740                let mut i = 1;
2741                for reqid in finished_reqids {
2742                    assert_eq!(reqid, i);
2743                    i += 1;
2744                }
2745            }
2746        );
2747    }
2748
2749    #[fuchsia::test]
2750    async fn test_passthrough_io_with_fixed_map() {
2751        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2752
2753        let expected_op = Arc::new(Mutex::new(None));
2754        let expected_op_clone = expected_op.clone();
2755        futures::join!(
2756            async {
2757                let block_server = BlockServer::new(
2758                    BLOCK_SIZE,
2759                    Arc::new(IoMockInterface {
2760                        return_errors: false,
2761                        do_checks: true,
2762                        expected_op: expected_op_clone,
2763                    }),
2764                );
2765                block_server.handle_requests(stream).await.unwrap();
2766            },
2767            async move {
2768                let (session_proxy, server) = fidl::endpoints::create_proxy();
2769
2770                let mapping = fblock::BlockOffsetMapping {
2771                    source_block_offset: 0,
2772                    target_block_offset: 10,
2773                    length: 20,
2774                };
2775                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2776
2777                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2778                let vmo_id = session_proxy
2779                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2780                    .await
2781                    .unwrap()
2782                    .unwrap();
2783
2784                let mut fifo =
2785                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2786                let (mut reader, mut writer) = fifo.async_io();
2787
2788                // READ
2789                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2790                writer
2791                    .write_entries(&BlockFifoRequest {
2792                        command: BlockFifoCommand {
2793                            opcode: BlockOpcode::Read.into_primitive(),
2794                            ..Default::default()
2795                        },
2796                        vmoid: vmo_id.id,
2797                        dev_offset: 1,
2798                        length: 2,
2799                        vmo_offset: 3,
2800                        ..Default::default()
2801                    })
2802                    .await
2803                    .unwrap();
2804
2805                let mut response = BlockFifoResponse::default();
2806                reader.read_entries(&mut response).await.unwrap();
2807                assert_eq!(response.status, zx::sys::ZX_OK);
2808
2809                // WRITE
2810                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2811                writer
2812                    .write_entries(&BlockFifoRequest {
2813                        command: BlockFifoCommand {
2814                            opcode: BlockOpcode::Write.into_primitive(),
2815                            ..Default::default()
2816                        },
2817                        vmoid: vmo_id.id,
2818                        dev_offset: 4,
2819                        length: 5,
2820                        vmo_offset: 6,
2821                        ..Default::default()
2822                    })
2823                    .await
2824                    .unwrap();
2825
2826                reader.read_entries(&mut response).await.unwrap();
2827                assert_eq!(response.status, zx::sys::ZX_OK);
2828
2829                // FLUSH
2830                *expected_op.lock() = Some(ExpectedOp::Flush);
2831                writer
2832                    .write_entries(&BlockFifoRequest {
2833                        command: BlockFifoCommand {
2834                            opcode: BlockOpcode::Flush.into_primitive(),
2835                            ..Default::default()
2836                        },
2837                        ..Default::default()
2838                    })
2839                    .await
2840                    .unwrap();
2841
2842                reader.read_entries(&mut response).await.unwrap();
2843                assert_eq!(response.status, zx::sys::ZX_OK);
2844
2845                // TRIM
2846                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2847                writer
2848                    .write_entries(&BlockFifoRequest {
2849                        command: BlockFifoCommand {
2850                            opcode: BlockOpcode::Trim.into_primitive(),
2851                            ..Default::default()
2852                        },
2853                        dev_offset: 7,
2854                        length: 3,
2855                        ..Default::default()
2856                    })
2857                    .await
2858                    .unwrap();
2859
2860                reader.read_entries(&mut response).await.unwrap();
2861                assert_eq!(response.status, zx::sys::ZX_OK);
2862
2863                // READ past window
2864                *expected_op.lock() = None;
2865                writer
2866                    .write_entries(&BlockFifoRequest {
2867                        command: BlockFifoCommand {
2868                            opcode: BlockOpcode::Read.into_primitive(),
2869                            ..Default::default()
2870                        },
2871                        vmoid: vmo_id.id,
2872                        dev_offset: 19,
2873                        length: 2,
2874                        vmo_offset: 3,
2875                        ..Default::default()
2876                    })
2877                    .await
2878                    .unwrap();
2879
2880                reader.read_entries(&mut response).await.unwrap();
2881                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2882
2883                std::mem::drop(proxy);
2884            }
2885        );
2886    }
2887
2888    #[fuchsia::test]
2889    fn operation_map() {
2890        const BLOCK_SIZE: u32 = 512;
2891
2892        #[track_caller]
2893        fn expect_map_result(
2894            mut operation: Operation,
2895            mapping: Option<BlockOffsetMapping>,
2896            max_blocks: Option<NonZero<u32>>,
2897            expected_operations: Vec<Operation>,
2898        ) {
2899            let mut ops = vec![];
2900            while let Some(remainder) =
2901                operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2902            {
2903                ops.push(operation);
2904                operation = remainder;
2905            }
2906            ops.push(operation);
2907            assert_eq!(ops, expected_operations);
2908        }
2909
2910        // No limits
2911        expect_map_result(
2912            Operation::Read {
2913                device_block_offset: 10,
2914                block_count: 200,
2915                _unused: 0,
2916                vmo_offset: 0,
2917                options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2918            },
2919            None,
2920            None,
2921            vec![Operation::Read {
2922                device_block_offset: 10,
2923                block_count: 200,
2924                _unused: 0,
2925                vmo_offset: 0,
2926                options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2927            }],
2928        );
2929
2930        // Max block count
2931        expect_map_result(
2932            Operation::Read {
2933                device_block_offset: 10,
2934                block_count: 200,
2935                _unused: 0,
2936                vmo_offset: 0,
2937                options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2938            },
2939            None,
2940            NonZero::new(120),
2941            vec![
2942                Operation::Read {
2943                    device_block_offset: 10,
2944                    block_count: 120,
2945                    _unused: 0,
2946                    vmo_offset: 0,
2947                    options: ReadOptions {
2948                        inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 },
2949                    },
2950                },
2951                Operation::Read {
2952                    device_block_offset: 130,
2953                    block_count: 80,
2954                    _unused: 0,
2955                    vmo_offset: 120 * BLOCK_SIZE as u64,
2956                    options: ReadOptions {
2957                        // The DUN should be offset by the number of blocks in the first request.
2958                        inline_crypto: InlineCryptoOptions { dun: 1000 + 120, slot: 1 },
2959                    },
2960                },
2961            ],
2962        );
2963        expect_map_result(
2964            Operation::Trim { device_block_offset: 10, block_count: 200 },
2965            None,
2966            NonZero::new(120),
2967            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2968        );
2969
2970        // Remapping + Max block count
2971        expect_map_result(
2972            Operation::Read {
2973                device_block_offset: 10,
2974                block_count: 200,
2975                _unused: 0,
2976                vmo_offset: 0,
2977                options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2978            },
2979            Some(BlockOffsetMapping {
2980                source_block_offset: 10,
2981                target_block_offset: 100,
2982                length: 200,
2983            }),
2984            NonZero::new(120),
2985            vec![
2986                Operation::Read {
2987                    device_block_offset: 100,
2988                    block_count: 120,
2989                    _unused: 0,
2990                    vmo_offset: 0,
2991                    options: ReadOptions {
2992                        inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 },
2993                    },
2994                },
2995                Operation::Read {
2996                    device_block_offset: 220,
2997                    block_count: 80,
2998                    _unused: 0,
2999                    vmo_offset: 120 * BLOCK_SIZE as u64,
3000                    options: ReadOptions {
3001                        inline_crypto: InlineCryptoOptions { dun: 1000 + 120, slot: 1 },
3002                    },
3003                },
3004            ],
3005        );
3006        expect_map_result(
3007            Operation::Trim { device_block_offset: 10, block_count: 200 },
3008            Some(BlockOffsetMapping {
3009                source_block_offset: 10,
3010                target_block_offset: 100,
3011                length: 200,
3012            }),
3013            NonZero::new(120),
3014            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3015        );
3016    }
3017
3018    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3019    #[fuchsia::test]
3020    async fn test_pre_barrier_flush_failure() {
3021        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3022
3023        struct NoBarrierInterface;
3024        impl super::async_interface::Interface for NoBarrierInterface {
3025            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3026                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3027                    device_flags: fblock::DeviceFlag::empty(), // No BARRIER_SUPPORT
3028                    max_transfer_blocks: NonZero::new(100),
3029                    block_range: Some(0..100),
3030                    type_guid: [0; 16],
3031                    instance_guid: [0; 16],
3032                    name: "test".to_string(),
3033                    flags: 0,
3034                }))
3035            }
3036            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3037                Ok(())
3038            }
3039            async fn read(
3040                &self,
3041                _: u64,
3042                _: u32,
3043                _: &Arc<zx::Vmo>,
3044                _: u64,
3045                _: ReadOptions,
3046                _: TraceFlowId,
3047            ) -> Result<(), zx::Status> {
3048                unreachable!()
3049            }
3050            async fn write(
3051                &self,
3052                _: u64,
3053                _: u32,
3054                _: &Arc<zx::Vmo>,
3055                _: u64,
3056                _: WriteOptions,
3057                _: TraceFlowId,
3058            ) -> Result<(), zx::Status> {
3059                panic!("Write should not be called");
3060            }
3061            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3062                Err(zx::Status::IO)
3063            }
3064            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3065                unreachable!()
3066            }
3067        }
3068
3069        futures::join!(
3070            async move {
3071                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3072                block_server.handle_requests(stream).await.unwrap();
3073            },
3074            async move {
3075                let (session_proxy, server) = fidl::endpoints::create_proxy();
3076                proxy.open_session(server).unwrap();
3077                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3078                let vmo_id = session_proxy
3079                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3080                    .await
3081                    .unwrap()
3082                    .unwrap();
3083
3084                let mut fifo =
3085                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3086                let (mut reader, mut writer) = fifo.async_io();
3087
3088                writer
3089                    .write_entries(&BlockFifoRequest {
3090                        command: BlockFifoCommand {
3091                            opcode: BlockOpcode::Write.into_primitive(),
3092                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3093                            ..Default::default()
3094                        },
3095                        vmoid: vmo_id.id,
3096                        length: 1,
3097                        ..Default::default()
3098                    })
3099                    .await
3100                    .unwrap();
3101
3102                let mut response = BlockFifoResponse::default();
3103                reader.read_entries(&mut response).await.unwrap();
3104                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3105            }
3106        );
3107    }
3108
3109    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3110    // post-flush is not executed.
3111    #[fuchsia::test]
3112    async fn test_post_barrier_write_failure() {
3113        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3114
3115        struct NoBarrierInterface;
3116        impl super::async_interface::Interface for NoBarrierInterface {
3117            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3118                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3119                    device_flags: fblock::DeviceFlag::empty(), // No FUA_SUPPORT
3120                    max_transfer_blocks: NonZero::new(100),
3121                    block_range: Some(0..100),
3122                    type_guid: [0; 16],
3123                    instance_guid: [0; 16],
3124                    name: "test".to_string(),
3125                    flags: 0,
3126                }))
3127            }
3128            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3129                Ok(())
3130            }
3131            async fn read(
3132                &self,
3133                _: u64,
3134                _: u32,
3135                _: &Arc<zx::Vmo>,
3136                _: u64,
3137                _: ReadOptions,
3138                _: TraceFlowId,
3139            ) -> Result<(), zx::Status> {
3140                unreachable!()
3141            }
3142            async fn write(
3143                &self,
3144                _: u64,
3145                _: u32,
3146                _: &Arc<zx::Vmo>,
3147                _: u64,
3148                _: WriteOptions,
3149                _: TraceFlowId,
3150            ) -> Result<(), zx::Status> {
3151                Err(zx::Status::IO)
3152            }
3153            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3154                panic!("Flush should not be called")
3155            }
3156            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3157                unreachable!()
3158            }
3159        }
3160
3161        futures::join!(
3162            async move {
3163                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3164                block_server.handle_requests(stream).await.unwrap();
3165            },
3166            async move {
3167                let (session_proxy, server) = fidl::endpoints::create_proxy();
3168                proxy.open_session(server).unwrap();
3169                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3170                let vmo_id = session_proxy
3171                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3172                    .await
3173                    .unwrap()
3174                    .unwrap();
3175
3176                let mut fifo =
3177                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3178                let (mut reader, mut writer) = fifo.async_io();
3179
3180                writer
3181                    .write_entries(&BlockFifoRequest {
3182                        command: BlockFifoCommand {
3183                            opcode: BlockOpcode::Write.into_primitive(),
3184                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3185                            ..Default::default()
3186                        },
3187                        vmoid: vmo_id.id,
3188                        length: 1,
3189                        ..Default::default()
3190                    })
3191                    .await
3192                    .unwrap();
3193
3194                let mut response = BlockFifoResponse::default();
3195                reader.read_entries(&mut response).await.unwrap();
3196                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3197            }
3198        );
3199    }
3200}