Skip to main content

block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::Error;
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fidl_fuchsia_storage_block as fblock;
8use fuchsia_async as fasync;
9use fuchsia_async::epoch::{Epoch, EpochGuard};
10use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
11use futures::{Future, FutureExt as _, TryStreamExt as _};
12use slab::Slab;
13use std::borrow::{Borrow, Cow};
14use std::collections::BTreeMap;
15use std::num::NonZero;
16use std::ops::Range;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use storage_device::buffer::Buffer;
20
21pub mod async_interface;
22pub mod c_interface;
23pub mod callback_interface;
24
25#[cfg(test)]
26mod decompression_tests;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn label(&self) -> &str {
40        match self {
41            Self::Block(BlockInfo { .. }) => "",
42            Self::Partition(PartitionInfo { name, .. }) => name,
43        }
44    }
45    pub fn device_flags(&self) -> fblock::DeviceFlag {
46        match self {
47            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
48            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
49        }
50    }
51
52    pub fn block_count(&self) -> Option<u64> {
53        match self {
54            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
55            Self::Partition(PartitionInfo { block_range, .. }) => {
56                block_range.as_ref().map(|range| range.end - range.start)
57            }
58        }
59    }
60
61    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
62        match self {
63            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
64            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
65                max_transfer_blocks.clone()
66            }
67        }
68    }
69
70    fn max_transfer_size(&self, block_size: u32) -> u32 {
71        if let Some(max_blocks) = self.max_transfer_blocks() {
72            max_blocks.get() * block_size
73        } else {
74            MAX_TRANSFER_UNBOUNDED
75        }
76    }
77}
78
79/// Information associated with non-partition block devices.
80#[derive(Clone, Default)]
81pub struct BlockInfo {
82    pub device_flags: fblock::DeviceFlag,
83    pub block_count: u64,
84    pub max_transfer_blocks: Option<NonZero<u32>>,
85}
86
87/// Information associated with a block device that is also a partition.
88#[derive(Clone, Default)]
89pub struct PartitionInfo {
90    /// The device flags reported by the underlying device.
91    pub device_flags: fblock::DeviceFlag,
92    pub max_transfer_blocks: Option<NonZero<u32>>,
93    /// If `block_range` is None, the partition is a volume and may not be contiguous.
94    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
95    /// slices and use that (along with the slice and block sizes) to determine the block count.
96    pub block_range: Option<Range<u64>>,
97    pub type_guid: [u8; 16],
98    pub instance_guid: [u8; 16],
99    pub name: String,
100    pub flags: u64,
101}
102
103/// We internally keep track of active requests, so that when the server is torn down, we can
104/// deallocate all of the resources for pending requests.
105struct ActiveRequest<S> {
106    session: S,
107    group_or_request: GroupOrRequest,
108    trace_flow_id: TraceFlowId,
109    _epoch_guard: EpochGuard<'static>,
110    status: zx::Status,
111    count: u32,
112    req_id: Option<u32>,
113    decompression_info: Option<DecompressionInfo>,
114}
115
116struct DecompressionInfo {
117    // This is the range of compressed bytes in receiving buffer.
118    compressed_range: Range<usize>,
119
120    // This is the range in the target VMO where we will write uncompressed bytes.
121    uncompressed_range: Range<u64>,
122
123    bytes_so_far: u64,
124    mapping: Arc<VmoMapping>,
125    buffer: Option<Buffer<'static>>,
126}
127
128impl DecompressionInfo {
129    /// Returns the uncompressed slice.
130    fn uncompressed_slice(&self) -> *mut [u8] {
131        std::ptr::slice_from_raw_parts_mut(
132            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
133            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
134        )
135    }
136}
137
138pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
139
140impl<S> Default for ActiveRequests<S> {
141    fn default() -> Self {
142        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
143    }
144}
145
146impl<S> ActiveRequests<S> {
147    fn complete_and_take_response(
148        &self,
149        request_id: RequestId,
150        status: zx::Status,
151    ) -> Option<(S, BlockFifoResponse)> {
152        self.0.lock().complete_and_take_response(request_id, status)
153    }
154
155    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
156        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
157    }
158}
159
160struct ActiveRequestsInner<S> {
161    requests: Slab<ActiveRequest<S>>,
162}
163
164// Keeps track of all the requests that are currently being processed
165impl<S> ActiveRequestsInner<S> {
166    /// Completes a request.
167    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
168        let group = &mut self.requests[request_id.0];
169
170        group.count = group.count.checked_sub(1).unwrap();
171        if status != zx::Status::OK && group.status == zx::Status::OK {
172            group.status = status
173        }
174
175        fuchsia_trace::duration!(
176            "storage",
177            "block_server::finish_transaction",
178            "request_id" => request_id.0,
179            "group_completed" => group.count == 0,
180            "status" => status.into_raw());
181        if let Some(trace_flow_id) = group.trace_flow_id {
182            fuchsia_trace::flow_step!(
183                "storage",
184                "block_server::finish_request",
185                trace_flow_id.get().into()
186            );
187        }
188
189        if group.count == 0
190            && group.status == zx::Status::OK
191            && let Some(info) = &mut group.decompression_info
192        {
193            thread_local! {
194                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
195                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
196            }
197            DECOMPRESSOR.with(|decompressor| {
198                // SAFETY: We verified `uncompressed_range` fits within our mapping.
199                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
200                let mut decompressor = decompressor.borrow_mut();
201                if let Err(error) = decompressor.decompress_to_buffer(
202                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
203                    target_slice,
204                ) {
205                    log::warn!(error:?; "Decompression error");
206                    group.status = zx::Status::IO_DATA_INTEGRITY;
207                };
208            });
209        }
210    }
211
212    /// Takes the response if all requests are finished.
213    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
214        let group = &self.requests[request_id.0];
215        match group.req_id {
216            Some(reqid) if group.count == 0 => {
217                let group = self.requests.remove(request_id.0);
218                Some((
219                    group.session,
220                    BlockFifoResponse {
221                        status: group.status.into_raw(),
222                        reqid,
223                        group: group.group_or_request.group_id().unwrap_or(0),
224                        ..Default::default()
225                    },
226                ))
227            }
228            _ => None,
229        }
230    }
231
232    /// Competes the request and returns a response if the request group is finished.
233    fn complete_and_take_response(
234        &mut self,
235        request_id: RequestId,
236        status: zx::Status,
237    ) -> Option<(S, BlockFifoResponse)> {
238        self.complete(request_id, status);
239        self.take_response(request_id)
240    }
241}
242
243/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
244/// cbindgen:no-export
245pub struct BlockServer<SM: SessionManager> {
246    block_size: u32,
247    orchestrator: Arc<SM::Orchestrator>,
248}
249
250/// A single entry in `[OffsetMap]`.
251#[derive(Debug)]
252pub struct BlockOffsetMapping {
253    source_block_offset: u64,
254    target_block_offset: u64,
255    length: u64,
256}
257
258impl BlockOffsetMapping {
259    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
260        blocks.0 >= self.source_block_offset
261            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
262    }
263}
264
265impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
266    type Error = zx::Status;
267
268    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
269        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
270        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271        Ok(Self {
272            source_block_offset: wire.source_block_offset,
273            target_block_offset: wire.target_block_offset,
274            length: wire.length,
275        })
276    }
277}
278
279/// Remaps the offset of block requests based on an internal map, and truncates long requests.
280pub struct OffsetMap {
281    mapping: Option<BlockOffsetMapping>,
282    max_transfer_blocks: Option<NonZero<u32>>,
283}
284
285impl OffsetMap {
286    /// An OffsetMap that remaps requests.
287    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
288        Self { mapping: Some(mapping), max_transfer_blocks }
289    }
290
291    /// An OffsetMap that just enforces maximum request sizes.
292    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
293        Self { mapping: None, max_transfer_blocks }
294    }
295
296    pub fn is_empty(&self) -> bool {
297        self.mapping.is_none()
298    }
299
300    fn mapping(&self) -> Option<&BlockOffsetMapping> {
301        self.mapping.as_ref()
302    }
303
304    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
305        self.max_transfer_blocks
306    }
307}
308
309// Methods take Arc<Self> rather than &self because of
310// https://github.com/rust-lang/rust/issues/42940.
311pub trait SessionManager: 'static {
312    /// The Orchestrator is an object that holds the `SessionManager` and any other state that needs
313    /// to be shared between sessions.  It is responsible for keeping the `SessionManager` alive.
314    /// We use this type instead of directly holding an Arc<SessionManager> in BlockServer, to avoid
315    /// nested Arcs in concrete implementations which need to keep additional state.
316    type Orchestrator: Borrow<Self> + Send + Sync;
317
318    const SUPPORTS_DECOMPRESSION: bool;
319
320    type Session;
321
322    fn on_attach_vmo(
323        orchestrator: Arc<Self::Orchestrator>,
324        vmo: &Arc<zx::Vmo>,
325    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
326
327    /// Creates a new session to handle `stream`.
328    /// The returned future should run until the session completes, for example when the client end
329    /// closes.
330    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
331    fn open_session(
332        orchestrator: Arc<Self::Orchestrator>,
333        stream: fblock::SessionRequestStream,
334        offset_map: OffsetMap,
335        block_size: u32,
336    ) -> impl Future<Output = Result<(), Error>> + Send;
337
338    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
339    fn get_info(&self) -> Cow<'_, DeviceInfo>;
340
341    /// Called to handle the GetVolumeInfo FIDL call.
342    fn get_volume_info(
343        &self,
344    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
345    {
346        async { Err(zx::Status::NOT_SUPPORTED) }
347    }
348
349    /// Called to handle the QuerySlices FIDL call.
350    fn query_slices(
351        &self,
352        _start_slices: &[u64],
353    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
354        async { Err(zx::Status::NOT_SUPPORTED) }
355    }
356
357    /// Called to handle the Shrink FIDL call.
358    fn extend(
359        &self,
360        _start_slice: u64,
361        _slice_count: u64,
362    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363        async { Err(zx::Status::NOT_SUPPORTED) }
364    }
365
366    /// Called to handle the Shrink FIDL call.
367    fn shrink(
368        &self,
369        _start_slice: u64,
370        _slice_count: u64,
371    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
372        async { Err(zx::Status::NOT_SUPPORTED) }
373    }
374
375    /// Returns the active requests.
376    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
377}
378
379/// A helper trait for converting various types into an `Orchestrator`.
380///
381/// This exists to simplify [`BlockServer::new`].
382pub trait IntoOrchestrator {
383    type SM: SessionManager;
384
385    fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
386}
387
388impl<SM: SessionManager> BlockServer<SM> {
389    pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
390        Self { block_size, orchestrator: orchestrator.into_orchestrator() }
391    }
392
393    pub fn session_manager(&self) -> &SM {
394        self.orchestrator.as_ref().borrow()
395    }
396
397    /// Called to process requests for fuchsia.storage.block.Block.
398    pub async fn handle_requests(
399        &self,
400        mut requests: fblock::BlockRequestStream,
401    ) -> Result<(), Error> {
402        let scope = fasync::Scope::new();
403        loop {
404            match requests.try_next().await {
405                Ok(Some(request)) => {
406                    if let Some(session) = self.handle_request(request).await? {
407                        scope.spawn(session.map(|_| ()));
408                    }
409                }
410                Ok(None) => break,
411                Err(err) => log::warn!(err:?; "Invalid request"),
412            }
413        }
414        scope.await;
415        Ok(())
416    }
417
418    /// Processes a Block request.  If a new session task is created in response to the request,
419    /// it is returned.
420    async fn handle_request(
421        &self,
422        request: fblock::BlockRequest,
423    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
424        match request {
425            fblock::BlockRequest::GetInfo { responder } => {
426                let info = self.device_info();
427                let max_transfer_size = info.max_transfer_size(self.block_size);
428                let (block_count, mut flags) = match info.as_ref() {
429                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
430                        (*block_count, *device_flags)
431                    }
432                    DeviceInfo::Partition(partition_info) => {
433                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
434                            range.end - range.start
435                        } else {
436                            let volume_info = self.session_manager().get_volume_info().await?;
437                            volume_info.0.slice_size * volume_info.1.partition_slice_count
438                                / self.block_size as u64
439                        };
440                        (block_count, partition_info.device_flags)
441                    }
442                };
443                if SM::SUPPORTS_DECOMPRESSION {
444                    flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
445                }
446                responder.send(Ok(&fblock::BlockInfo {
447                    block_count,
448                    block_size: self.block_size,
449                    max_transfer_size,
450                    flags,
451                }))?;
452            }
453            fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
454                let info = self.device_info();
455                return Ok(Some(SM::open_session(
456                    self.orchestrator.clone(),
457                    session.into_stream(),
458                    OffsetMap::empty(info.max_transfer_blocks()),
459                    self.block_size,
460                )));
461            }
462            fblock::BlockRequest::OpenSessionWithOffsetMap {
463                session,
464                mapping,
465                control_handle: _,
466            } => {
467                let info = self.device_info();
468                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
469                    Ok(m) => m,
470                    Err(status) => {
471                        session.close_with_epitaph(status)?;
472                        return Ok(None);
473                    }
474                };
475                if let Some(max) = info.block_count() {
476                    if initial_mapping.target_block_offset + initial_mapping.length > max {
477                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
478                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
479                        return Ok(None);
480                    }
481                }
482                return Ok(Some(SM::open_session(
483                    self.orchestrator.clone(),
484                    session.into_stream(),
485                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
486                    self.block_size,
487                )));
488            }
489            fblock::BlockRequest::GetTypeGuid { responder } => {
490                let info = self.device_info();
491                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
492                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
493                    guid.value.copy_from_slice(&partition_info.type_guid);
494                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
495                } else {
496                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497                }
498            }
499            fblock::BlockRequest::GetInstanceGuid { responder } => {
500                let info = self.device_info();
501                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
502                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503                    guid.value.copy_from_slice(&partition_info.instance_guid);
504                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
505                } else {
506                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
507                }
508            }
509            fblock::BlockRequest::GetName { responder } => {
510                let info = self.device_info();
511                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
512                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
513                } else {
514                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
515                }
516            }
517            fblock::BlockRequest::GetMetadata { responder } => {
518                let info = self.device_info();
519                if let DeviceInfo::Partition(info) = info.as_ref() {
520                    let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
521                    type_guid.value.copy_from_slice(&info.type_guid);
522                    let mut instance_guid =
523                        fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
524                    instance_guid.value.copy_from_slice(&info.instance_guid);
525                    responder.send(Ok(&fblock::BlockGetMetadataResponse {
526                        name: Some(info.name.clone()),
527                        type_guid: Some(type_guid),
528                        instance_guid: Some(instance_guid),
529                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
530                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
531                        flags: Some(info.flags),
532                        ..Default::default()
533                    }))?;
534                } else {
535                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
536                }
537            }
538            fblock::BlockRequest::QuerySlices { responder, start_slices } => {
539                match self.session_manager().query_slices(&start_slices).await {
540                    Ok(mut results) => {
541                        let results_len = results.len();
542                        assert!(results_len <= 16);
543                        results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
544                        responder.send(
545                            zx::sys::ZX_OK,
546                            &results.try_into().unwrap(),
547                            results_len as u64,
548                        )?;
549                    }
550                    Err(s) => {
551                        responder.send(
552                            s.into_raw(),
553                            &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
554                            0,
555                        )?;
556                    }
557                }
558            }
559            fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
560                match self.session_manager().get_volume_info().await {
561                    Ok((manager_info, volume_info)) => {
562                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
563                    }
564                    Err(s) => responder.send(s.into_raw(), None, None)?,
565                }
566            }
567            fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
568                responder.send(
569                    zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
570                        .into_raw(),
571                )?;
572            }
573            fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
574                responder.send(
575                    zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
576                        .into_raw(),
577                )?;
578            }
579            fblock::BlockRequest::Destroy { responder, .. } => {
580                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
581            }
582        }
583        Ok(None)
584    }
585
586    fn device_info(&self) -> Cow<'_, DeviceInfo> {
587        self.session_manager().get_info()
588    }
589}
590
591struct SessionHelper<SM: SessionManager> {
592    orchestrator: Arc<SM::Orchestrator>,
593    offset_map: OffsetMap,
594    block_size: u32,
595    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
596    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
597}
598
599struct VmoMapping {
600    base: usize,
601    size: usize,
602}
603
604impl VmoMapping {
605    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
606        let size = vmo.get_size().unwrap() as usize;
607        Ok(Arc::new(Self {
608            base: fuchsia_runtime::vmar_root_self()
609                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
610                .inspect_err(|error| {
611                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
612                })?,
613            size,
614        }))
615    }
616}
617
618impl Drop for VmoMapping {
619    fn drop(&mut self) {
620        // SAFETY: We mapped this in `VmoMapping::new`.
621        unsafe {
622            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
623        }
624    }
625}
626
627enum HandleRequestResult {
628    /// The request was handled successfully.
629    Ok,
630    /// The request closed the stream.  The caller must shut down the session, and must call the
631    /// provided callback after the session is completely shut down.  The caller should assume that
632    /// no further requests need to be handled once this is received.
633    Closed(Box<dyn FnOnce() + Send + 'static>),
634}
635
636impl<SM: SessionManager> SessionHelper<SM> {
637    fn new(
638        orchestrator: Arc<SM::Orchestrator>,
639        offset_map: OffsetMap,
640        block_size: u32,
641    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
642        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
643        Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
644    }
645
646    fn session_manager(&self) -> &SM {
647        self.orchestrator.as_ref().borrow()
648    }
649
650    async fn handle_request(
651        &self,
652        request: fblock::SessionRequest,
653    ) -> Result<HandleRequestResult, Error> {
654        match request {
655            fblock::SessionRequest::GetFifo { responder } => {
656                let rights = zx::Rights::TRANSFER
657                    | zx::Rights::READ
658                    | zx::Rights::WRITE
659                    | zx::Rights::SIGNAL
660                    | zx::Rights::WAIT;
661                match self.peer_fifo.duplicate_handle(rights) {
662                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
663                    Err(s) => responder.send(Err(s.into_raw()))?,
664                }
665                Ok(HandleRequestResult::Ok)
666            }
667            fblock::SessionRequest::AttachVmo { vmo, responder } => {
668                let vmo = Arc::new(vmo);
669                let vmo_id = {
670                    let mut vmos = self.vmos.lock();
671                    if vmos.len() == u16::MAX as usize {
672                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
673                        return Ok(HandleRequestResult::Ok);
674                    } else {
675                        let vmo_id = match vmos.last_entry() {
676                            None => 1,
677                            Some(o) => {
678                                o.key().checked_add(1).unwrap_or_else(|| {
679                                    let mut vmo_id = 1;
680                                    // Find the first gap...
681                                    for (&id, _) in &*vmos {
682                                        if id > vmo_id {
683                                            break;
684                                        }
685                                        vmo_id = id + 1;
686                                    }
687                                    vmo_id
688                                })
689                            }
690                        };
691                        vmos.insert(vmo_id, (vmo.clone(), None));
692                        vmo_id
693                    }
694                };
695                SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
696                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
697                Ok(HandleRequestResult::Ok)
698            }
699            fblock::SessionRequest::Close { responder } => {
700                Ok(HandleRequestResult::Closed(Box::new(move || {
701                    if let Err(err) = responder.send(Ok(())) {
702                        log::warn!(err:?; "Error sending close response");
703                    }
704                })))
705            }
706        }
707    }
708
709    /// Decodes `request`.
710    fn decode_fifo_request(
711        &self,
712        session: SM::Session,
713        request: &BlockFifoRequest,
714    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
715        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
716
717        let request_bytes = request.length as u64 * self.block_size as u64;
718
719        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
720            .ok_or(zx::Status::INVALID_ARGS)
721            .and_then(|code| {
722                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
723                    if code != BlockOpcode::Read {
724                        return Err(zx::Status::INVALID_ARGS);
725                    }
726                    if !SM::SUPPORTS_DECOMPRESSION {
727                        return Err(zx::Status::NOT_SUPPORTED);
728                    }
729                }
730                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
731                    if request.length == 0 {
732                        return Err(zx::Status::INVALID_ARGS);
733                    }
734                    // Make sure the end offsets won't wrap.
735                    if request.dev_offset.checked_add(request.length as u64).is_none()
736                        || (code != BlockOpcode::Trim
737                            && request_bytes.checked_add(request.vmo_offset).is_none())
738                    {
739                        return Err(zx::Status::OUT_OF_RANGE);
740                    }
741                }
742                Ok(match code {
743                    BlockOpcode::Read => Operation::Read {
744                        device_block_offset: request.dev_offset,
745                        block_count: request.length,
746                        _unused: 0,
747                        vmo_offset: request
748                            .vmo_offset
749                            .checked_mul(self.block_size as u64)
750                            .ok_or(zx::Status::OUT_OF_RANGE)?,
751                        options: ReadOptions {
752                            inline_crypto: InlineCryptoOptions {
753                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
754                                dun: request.dun,
755                                slot: request.slot,
756                            },
757                        },
758                    },
759                    BlockOpcode::Write => {
760                        let mut options = WriteOptions {
761                            inline_crypto: InlineCryptoOptions {
762                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
763                                dun: request.dun,
764                                slot: request.slot,
765                            },
766                            ..WriteOptions::default()
767                        };
768                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
769                            options.flags |= WriteFlags::FORCE_ACCESS;
770                        }
771                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
772                            options.flags |= WriteFlags::PRE_BARRIER;
773                        }
774                        Operation::Write {
775                            device_block_offset: request.dev_offset,
776                            block_count: request.length,
777                            _unused: 0,
778                            options,
779                            vmo_offset: request
780                                .vmo_offset
781                                .checked_mul(self.block_size as u64)
782                                .ok_or(zx::Status::OUT_OF_RANGE)?,
783                        }
784                    }
785                    BlockOpcode::Flush => Operation::Flush,
786                    BlockOpcode::Trim => Operation::Trim {
787                        device_block_offset: request.dev_offset,
788                        block_count: request.length,
789                    },
790                    BlockOpcode::CloseVmo => Operation::CloseVmo,
791                })
792            });
793
794        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
795            GroupOrRequest::Group(request.group)
796        } else {
797            GroupOrRequest::Request(request.reqid)
798        };
799
800        let mut active_requests = self.session_manager().active_requests().0.lock();
801        let mut request_id = None;
802
803        // Multiple Block I/O request may be sent as a group.
804        // Notes:
805        // - the group is identified by the group id in the request
806        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
807        //   flag is set.
808        // - when processing a request of a group fails, subsequent requests of that
809        //   group will not be processed.
810        // - decompression is a special case, see block-fifo.h for semantics.
811        //
812        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
813        if group_or_request.is_group() {
814            // Search for an existing entry that matches this group.  NOTE: This is a potentially
815            // expensive way to find a group (it's iterating over all slots in the active-requests
816            // slab).  This can be optimised easily should we need to.
817            for (key, group) in &mut active_requests.requests {
818                if group.group_or_request == group_or_request {
819                    if group.req_id.is_some() {
820                        // We have already received a request tagged as last.
821                        if group.status == zx::Status::OK {
822                            group.status = zx::Status::INVALID_ARGS;
823                        }
824                        // Ignore this request.
825                        return Err(None);
826                    }
827                    // See if this is a continuation of a decompressed read.
828                    if group.status == zx::Status::OK
829                        && let Some(info) = &mut group.decompression_info
830                    {
831                        if let Ok(Operation::Read {
832                            device_block_offset,
833                            mut block_count,
834                            options,
835                            vmo_offset: 0,
836                            ..
837                        }) = operation
838                        {
839                            let remaining_bytes = info
840                                .compressed_range
841                                .end
842                                .next_multiple_of(self.block_size as usize)
843                                as u64
844                                - info.bytes_so_far;
845                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
846                                || request.total_compressed_bytes != 0
847                                || request.uncompressed_bytes != 0
848                                || request.compressed_prefix_bytes != 0
849                                || (flags.contains(BlockIoFlag::GROUP_LAST)
850                                    && info.bytes_so_far + request_bytes
851                                        < info.compressed_range.end as u64)
852                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
853                                    && request_bytes >= remaining_bytes)
854                            {
855                                group.status = zx::Status::INVALID_ARGS;
856                            } else {
857                                // We are tolerant of `block_count` being more than we actually
858                                // need.  This can happen if the client is working with a larger
859                                // block size than the device block size.  For example, if Blobfs
860                                // has a 8192 byte block size, but the device might has a 512 byte
861                                // block size, it can ask for a multiple of 16 blocks, when fewer
862                                // than that might actually be required to hold the compressed data.
863                                // It is easier for us to tolerate this here than to get Blobfs to
864                                // change to pass only the blocks that are required.
865                                if request_bytes > remaining_bytes {
866                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
867                                }
868
869                                operation = Ok(Operation::ContinueDecompressedRead {
870                                    offset: info.bytes_so_far,
871                                    device_block_offset,
872                                    block_count,
873                                    options,
874                                });
875
876                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
877                            }
878                        } else {
879                            group.status = zx::Status::INVALID_ARGS;
880                        }
881                    }
882                    if flags.contains(BlockIoFlag::GROUP_LAST) {
883                        group.req_id = Some(request.reqid);
884                        // If the group has had an error, there is no point trying to issue this
885                        // request.
886                        if group.status != zx::Status::OK {
887                            operation = Err(group.status);
888                        }
889                    } else if group.status != zx::Status::OK {
890                        // The group has already encountered an error, so there is no point trying
891                        // to issue this request.
892                        return Err(None);
893                    }
894                    request_id = Some(RequestId(key));
895                    group.count += 1;
896                    break;
897                }
898            }
899        }
900
901        let is_single_request =
902            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
903
904        let mut decompression_info = None;
905        let vmo = match operation {
906            Ok(Operation::Read {
907                device_block_offset,
908                mut block_count,
909                options,
910                vmo_offset,
911                ..
912            }) => match self.vmos.lock().get_mut(&request.vmoid) {
913                Some((vmo, mapping)) => {
914                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
915                        let compressed_range = request.compressed_prefix_bytes as usize
916                            ..request.compressed_prefix_bytes as usize
917                                + request.total_compressed_bytes as usize;
918                        let required_buffer_size =
919                            compressed_range.end.next_multiple_of(self.block_size as usize);
920
921                        // Validate the initial decompression request.
922                        if compressed_range.start >= compressed_range.end
923                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
924                            || (is_single_request && request_bytes < compressed_range.end as u64)
925                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
926                        {
927                            Err(zx::Status::INVALID_ARGS)
928                        } else {
929                            // We are tolerant of `block_count` being more than we actually need.
930                            // This can happen if the client is working in a larger block size than
931                            // the device block size.  For example, Blobfs has a 8192 byte block
932                            // size, but the device might have a 512 byte block size.  It is easier
933                            // for us to tolerate this here than to get Blobfs to change to pass
934                            // only the blocks that are required.
935                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
936                                block_count =
937                                    (required_buffer_size / self.block_size as usize) as u32;
938                                required_buffer_size as u64
939                            } else {
940                                request_bytes
941                            };
942
943                            // To decompress, we need to have the target VMO mapped (cached).
944                            match mapping {
945                                Some(mapping) => Ok(mapping.clone()),
946                                None => {
947                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
948                                }
949                            }
950                            .and_then(|mapping| {
951                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
952                                // range.
953                                if vmo_offset
954                                    .checked_add(request.uncompressed_bytes as u64)
955                                    .is_some_and(|end| end <= mapping.size as u64)
956                                {
957                                    Ok(mapping)
958                                } else {
959                                    Err(zx::Status::OUT_OF_RANGE)
960                                }
961                            })
962                            .map(|mapping| {
963                                // Convert the operation into a `StartDecompressedRead`
964                                // operation. For non-fragmented requests, this will be the only
965                                // operation, but if it's a fragmented read,
966                                // `ContinueDecompressedRead` operations will follow.
967                                operation = Ok(Operation::StartDecompressedRead {
968                                    required_buffer_size,
969                                    device_block_offset,
970                                    block_count,
971                                    options,
972                                });
973                                // Record sufficient information so that we can decompress when all
974                                // the requests complete.
975                                decompression_info = Some(DecompressionInfo {
976                                    compressed_range,
977                                    bytes_so_far,
978                                    mapping,
979                                    uncompressed_range: vmo_offset
980                                        ..vmo_offset + request.uncompressed_bytes as u64,
981                                    buffer: None,
982                                });
983                                None
984                            })
985                        }
986                    } else {
987                        Ok(Some(vmo.clone()))
988                    }
989                }
990                None => Err(zx::Status::IO),
991            },
992            Ok(Operation::Write { .. }) => self
993                .vmos
994                .lock()
995                .get(&request.vmoid)
996                .cloned()
997                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
998            Ok(Operation::CloseVmo) => {
999                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
1000                    let vmo_clone = vmo.clone();
1001                    // Make sure the VMO is dropped after all current Epoch guards have been
1002                    // dropped.
1003                    Epoch::global().defer(move || drop(vmo_clone));
1004                    Ok(Some(vmo))
1005                })
1006            }
1007            _ => Ok(None),
1008        }
1009        .unwrap_or_else(|e| {
1010            operation = Err(e);
1011            None
1012        });
1013
1014        let trace_flow_id = NonZero::new(request.trace_flow_id);
1015        let request_id = request_id.unwrap_or_else(|| {
1016            RequestId(active_requests.requests.insert(ActiveRequest {
1017                session,
1018                group_or_request,
1019                trace_flow_id,
1020                _epoch_guard: Epoch::global().guard(),
1021                status: zx::Status::OK,
1022                count: 1,
1023                req_id: is_single_request.then_some(request.reqid),
1024                decompression_info,
1025            }))
1026        });
1027
1028        Ok(DecodedRequest {
1029            request_id,
1030            trace_flow_id,
1031            operation: operation.map_err(|status| {
1032                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1033            })?,
1034            vmo,
1035        })
1036    }
1037
1038    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1039        std::mem::take(&mut *self.vmos.lock())
1040    }
1041
1042    /// Maps the request and returns the mapped request with an optional remainder.
1043    fn map_request(
1044        &self,
1045        mut request: DecodedRequest,
1046        active_request: &mut ActiveRequest<SM::Session>,
1047    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1048        if active_request.status != zx::Status::OK {
1049            return Err(zx::Status::BAD_STATE);
1050        }
1051        let mapping = self.offset_map.mapping();
1052        match (mapping, request.operation.blocks()) {
1053            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1054                return Err(zx::Status::OUT_OF_RANGE);
1055            }
1056            _ => {}
1057        }
1058        let remainder = request.operation.map(
1059            self.offset_map.mapping(),
1060            self.offset_map.max_transfer_blocks(),
1061            self.block_size,
1062        );
1063        if remainder.is_some() {
1064            active_request.count += 1;
1065        }
1066        static CACHE: AtomicU64 = AtomicU64::new(0);
1067        if let Some(context) =
1068            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1069        {
1070            use fuchsia_trace::ArgValue;
1071            let trace_args = [
1072                ArgValue::of("request_id", request.request_id.0),
1073                ArgValue::of("opcode", request.operation.trace_label()),
1074            ];
1075            let _scope =
1076                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1077            if let Some(trace_flow_id) = active_request.trace_flow_id {
1078                fuchsia_trace::flow_step(
1079                    &context,
1080                    "block_server::start_transaction",
1081                    trace_flow_id.get().into(),
1082                    &[],
1083                );
1084            }
1085        }
1086        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1087        Ok((request, remainder))
1088    }
1089
1090    /// Drops all requests for which `pred` is true.
1091    ///
1092    /// NOTE: This should only be called once we are certain that the requests will not be
1093    /// completed asynchronously  Otherwise, requests might be completed twice.
1094    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1095        self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1096    }
1097
1098    /// Closes all grouped requests for which `pred` is true and which are held open pending the
1099    /// completion of their group.
1100    ///
1101    /// Normally, a request is dropped from ActiveRequests when it is completed.  However, if a
1102    /// request is part of a group, it will not be dropped until a request with GROUP_LAST arrives.
1103    /// If we're shutting down a session, the client may not ever send the GROUP_LAST, so we need to
1104    /// be sure to close these grouped requests.
1105    ///
1106    /// This is called during session shutdown in situations where [`Self::drop_active_requests`]
1107    /// cannot be used (e.g. for the callback interface, which hands off the responsibility of
1108    /// completing requests to its concrete implementation and cannot control when requests are
1109    /// completed relative to session shutdown).
1110    fn close_active_groups(&self, pred: impl Fn(&SM::Session) -> bool) {
1111        self.session_manager().active_requests().0.lock().requests.retain(|_, request| {
1112            if !pred(&request.session) || request.req_id.is_some() {
1113                return true;
1114            }
1115            // Mark the group as completed, and immediately drop any which have no outstanding
1116            // requests (since they will otherwise never be dropped).
1117            request.req_id = Some(u32::MAX);
1118            request.count > 0
1119        });
1120    }
1121}
1122
1123#[repr(transparent)]
1124#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1125pub struct RequestId(usize);
1126
1127#[derive(Clone, Debug)]
1128struct DecodedRequest {
1129    request_id: RequestId,
1130    trace_flow_id: TraceFlowId,
1131    operation: Operation,
1132    vmo: Option<Arc<zx::Vmo>>,
1133}
1134
1135/// cbindgen:no-export
1136pub type WriteFlags = block_protocol::WriteFlags;
1137pub type WriteOptions = block_protocol::WriteOptions;
1138pub type ReadOptions = block_protocol::ReadOptions;
1139pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1140
1141#[repr(C)]
1142#[derive(Clone, Debug, PartialEq, Eq)]
1143pub enum Operation {
1144    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1145    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1146    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1147    // write variant).
1148    Read {
1149        device_block_offset: u64,
1150        block_count: u32,
1151        _unused: u32,
1152        vmo_offset: u64,
1153        options: ReadOptions,
1154    },
1155    Write {
1156        device_block_offset: u64,
1157        block_count: u32,
1158        _unused: u32,
1159        vmo_offset: u64,
1160        options: WriteOptions,
1161    },
1162    Flush,
1163    Trim {
1164        device_block_offset: u64,
1165        block_count: u32,
1166    },
1167    /// This will never be seen by the C interface.
1168    CloseVmo,
1169    /// This will never be seen by the C interface.
1170    StartDecompressedRead {
1171        required_buffer_size: usize,
1172        device_block_offset: u64,
1173        block_count: u32,
1174        options: ReadOptions,
1175    },
1176    /// This will never be seen by the C interface.
1177    ContinueDecompressedRead {
1178        offset: u64,
1179        device_block_offset: u64,
1180        block_count: u32,
1181        options: ReadOptions,
1182    },
1183}
1184
1185impl Operation {
1186    fn trace_label(&self) -> &'static str {
1187        match self {
1188            Operation::Read { .. } => "read",
1189            Operation::Write { .. } => "write",
1190            Operation::Flush { .. } => "flush",
1191            Operation::Trim { .. } => "trim",
1192            Operation::CloseVmo { .. } => "close_vmo",
1193            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1194            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1195        }
1196    }
1197
1198    /// Returns (offset, length).
1199    fn blocks(&self) -> Option<(u64, u32)> {
1200        match self {
1201            Operation::Read { device_block_offset, block_count, .. }
1202            | Operation::Write { device_block_offset, block_count, .. }
1203            | Operation::Trim { device_block_offset, block_count, .. } => {
1204                Some((*device_block_offset, *block_count))
1205            }
1206            _ => None,
1207        }
1208    }
1209
1210    /// Returns mutable references to (offset, length).
1211    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1212        match self {
1213            Operation::Read { device_block_offset, block_count, .. }
1214            | Operation::Write { device_block_offset, block_count, .. }
1215            | Operation::Trim { device_block_offset, block_count, .. } => {
1216                Some((device_block_offset, block_count))
1217            }
1218            _ => None,
1219        }
1220    }
1221
1222    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1223    /// start of the operation.
1224    fn map(
1225        &mut self,
1226        mapping: Option<&BlockOffsetMapping>,
1227        max_blocks: Option<NonZero<u32>>,
1228        block_size: u32,
1229    ) -> Option<Self> {
1230        let mut max = match self {
1231            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1232            _ => None,
1233        };
1234        let (offset, length) = self.blocks_mut()?;
1235        let orig_offset = *offset;
1236        if let Some(mapping) = mapping {
1237            let delta = *offset - mapping.source_block_offset;
1238            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1239            *offset = mapping.target_block_offset + delta;
1240            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1241            max = match max {
1242                None => Some(mapping_max),
1243                Some(m) => Some(std::cmp::min(m, mapping_max)),
1244            };
1245        };
1246        if let Some(max) = max {
1247            if *length as u64 > max {
1248                let rem = (*length as u64 - max) as u32;
1249                *length = max as u32;
1250                return Some(match self {
1251                    Operation::Read {
1252                        device_block_offset: _,
1253                        block_count: _,
1254                        vmo_offset,
1255                        _unused,
1256                        options,
1257                    } => {
1258                        let mut options = *options;
1259                        options.inline_crypto.dun += max as u32;
1260                        Operation::Read {
1261                            device_block_offset: orig_offset + max,
1262                            block_count: rem,
1263                            vmo_offset: *vmo_offset + max * block_size as u64,
1264                            _unused: *_unused,
1265                            options: options,
1266                        }
1267                    }
1268                    Operation::Write {
1269                        device_block_offset: _,
1270                        block_count: _,
1271                        _unused,
1272                        vmo_offset,
1273                        options,
1274                    } => {
1275                        let mut options = *options;
1276                        options.inline_crypto.dun += max as u32;
1277                        Operation::Write {
1278                            device_block_offset: orig_offset + max,
1279                            block_count: rem,
1280                            _unused: *_unused,
1281                            vmo_offset: *vmo_offset + max * block_size as u64,
1282                            options: options,
1283                        }
1284                    }
1285                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1286                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1287                    }
1288                    _ => unreachable!(),
1289                });
1290            }
1291        }
1292        None
1293    }
1294
1295    /// Returns true if the specified write flags are set.
1296    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1297        if let Operation::Write { options, .. } = self {
1298            options.flags.contains(value)
1299        } else {
1300            false
1301        }
1302    }
1303
1304    /// Removes `value` from the request's write flags and returns true if the flag was set.
1305    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1306        if let Operation::Write { options, .. } = self {
1307            let result = options.flags.contains(value);
1308            options.flags.remove(value);
1309            result
1310        } else {
1311            false
1312        }
1313    }
1314}
1315
1316#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1317pub enum GroupOrRequest {
1318    Group(u16),
1319    Request(u32),
1320}
1321
1322impl GroupOrRequest {
1323    fn is_group(&self) -> bool {
1324        matches!(self, Self::Group(_))
1325    }
1326
1327    fn group_id(&self) -> Option<u16> {
1328        match self {
1329            Self::Group(id) => Some(*id),
1330            Self::Request(_) => None,
1331        }
1332    }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337    use super::{
1338        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1339        TraceFlowId,
1340    };
1341    use assert_matches::assert_matches;
1342    use block_protocol::{
1343        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1344        WriteFlags, WriteOptions,
1345    };
1346    use fidl_fuchsia_storage_block as fblock;
1347    use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1348    use fuchsia_async as fasync;
1349    use fuchsia_sync::Mutex;
1350    use futures::FutureExt as _;
1351    use futures::channel::oneshot;
1352    use futures::future::BoxFuture;
1353    use std::borrow::Cow;
1354    use std::future::poll_fn;
1355    use std::num::NonZero;
1356    use std::pin::pin;
1357    use std::sync::Arc;
1358    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1359    use std::task::{Context, Poll};
1360
1361    #[derive(Default)]
1362    struct MockInterface {
1363        read_hook: Option<
1364            Box<
1365                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1366                    + Send
1367                    + Sync,
1368            >,
1369        >,
1370        write_hook:
1371            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1372        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1373    }
1374
1375    impl super::async_interface::Interface for MockInterface {
1376        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1377            Ok(())
1378        }
1379
1380        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1381            Cow::Owned(test_device_info())
1382        }
1383
1384        async fn read(
1385            &self,
1386            device_block_offset: u64,
1387            block_count: u32,
1388            vmo: &Arc<zx::Vmo>,
1389            vmo_offset: u64,
1390            _opts: ReadOptions,
1391            _trace_flow_id: TraceFlowId,
1392        ) -> Result<(), zx::Status> {
1393            if let Some(read_hook) = &self.read_hook {
1394                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1395            } else {
1396                unimplemented!();
1397            }
1398        }
1399
1400        async fn write(
1401            &self,
1402            device_block_offset: u64,
1403            _block_count: u32,
1404            _vmo: &Arc<zx::Vmo>,
1405            _vmo_offset: u64,
1406            opts: WriteOptions,
1407            _trace_flow_id: TraceFlowId,
1408        ) -> Result<(), zx::Status> {
1409            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1410                && let Some(barrier_hook) = &self.barrier_hook
1411            {
1412                barrier_hook()?;
1413            }
1414            if let Some(write_hook) = &self.write_hook {
1415                write_hook(device_block_offset).await
1416            } else {
1417                unimplemented!();
1418            }
1419        }
1420
1421        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1422            Ok(())
1423        }
1424
1425        async fn trim(
1426            &self,
1427            _device_block_offset: u64,
1428            _block_count: u32,
1429            _trace_flow_id: TraceFlowId,
1430        ) -> Result<(), zx::Status> {
1431            unreachable!();
1432        }
1433
1434        async fn get_volume_info(
1435            &self,
1436        ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1437            // Hang forever for the test_requests_dont_block_sessions test.
1438            let () = std::future::pending().await;
1439            unreachable!();
1440        }
1441    }
1442
1443    const BLOCK_SIZE: u32 = 512;
1444    const MAX_TRANSFER_BLOCKS: u32 = 10;
1445
1446    fn test_device_info() -> DeviceInfo {
1447        DeviceInfo::Partition(PartitionInfo {
1448            device_flags: fblock::DeviceFlag::READONLY
1449                | fblock::DeviceFlag::BARRIER_SUPPORT
1450                | fblock::DeviceFlag::FUA_SUPPORT,
1451            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1452            block_range: Some(0..100),
1453            type_guid: [1; 16],
1454            instance_guid: [2; 16],
1455            name: "foo".to_string(),
1456            flags: 0xabcd,
1457        })
1458    }
1459
1460    #[fuchsia::test]
1461    async fn test_barriers_ordering() {
1462        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1463        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1464        let barrier_called = Arc::new(AtomicBool::new(false));
1465
1466        futures::join!(
1467            async move {
1468                let barrier_called_clone = barrier_called.clone();
1469                let block_server = BlockServer::new(
1470                    BLOCK_SIZE,
1471                    Arc::new(MockInterface {
1472                        barrier_hook: Some(Box::new(move || {
1473                            barrier_called.store(true, Ordering::Relaxed);
1474                            Ok(())
1475                        })),
1476                        write_hook: Some(Box::new(move |device_block_offset| {
1477                            let barrier_called = barrier_called_clone.clone();
1478                            Box::pin(async move {
1479                                // The sleep allows the server to reorder the fifo requests.
1480                                if device_block_offset % 2 == 0 {
1481                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1482                                        zx::MonotonicDuration::from_millis(200),
1483                                    ))
1484                                    .await;
1485                                }
1486                                assert!(barrier_called.load(Ordering::Relaxed));
1487                                Ok(())
1488                            })
1489                        })),
1490                        ..MockInterface::default()
1491                    }),
1492                );
1493                block_server.handle_requests(stream).await.unwrap();
1494            },
1495            async move {
1496                let (session_proxy, server) = fidl::endpoints::create_proxy();
1497
1498                proxy.open_session(server).unwrap();
1499
1500                let vmo_id = session_proxy
1501                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1502                    .await
1503                    .unwrap()
1504                    .unwrap();
1505                assert_ne!(vmo_id.id, 0);
1506
1507                let mut fifo =
1508                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1509                let (mut reader, mut writer) = fifo.async_io();
1510
1511                writer
1512                    .write_entries(&BlockFifoRequest {
1513                        command: BlockFifoCommand {
1514                            opcode: BlockOpcode::Write.into_primitive(),
1515                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1516                            ..Default::default()
1517                        },
1518                        vmoid: vmo_id.id,
1519                        dev_offset: 0,
1520                        length: 5,
1521                        vmo_offset: 6,
1522                        ..Default::default()
1523                    })
1524                    .await
1525                    .unwrap();
1526
1527                for i in 0..10 {
1528                    writer
1529                        .write_entries(&BlockFifoRequest {
1530                            command: BlockFifoCommand {
1531                                opcode: BlockOpcode::Write.into_primitive(),
1532                                ..Default::default()
1533                            },
1534                            vmoid: vmo_id.id,
1535                            dev_offset: i + 1,
1536                            length: 5,
1537                            vmo_offset: 6,
1538                            ..Default::default()
1539                        })
1540                        .await
1541                        .unwrap();
1542                }
1543                for _ in 0..11 {
1544                    let mut response = BlockFifoResponse::default();
1545                    reader.read_entries(&mut response).await.unwrap();
1546                    assert_eq!(response.status, zx::sys::ZX_OK);
1547                }
1548
1549                std::mem::drop(proxy);
1550            }
1551        );
1552    }
1553
1554    #[fuchsia::test]
1555    async fn test_info() {
1556        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1557
1558        futures::join!(
1559            async {
1560                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1561                block_server.handle_requests(stream).await.unwrap();
1562            },
1563            async {
1564                let expected_info = test_device_info();
1565                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1566                    info
1567                } else {
1568                    unreachable!()
1569                };
1570
1571                let block_info = proxy.get_info().await.unwrap().unwrap();
1572                assert_eq!(
1573                    block_info.block_count,
1574                    partition_info.block_range.as_ref().unwrap().end
1575                        - partition_info.block_range.as_ref().unwrap().start
1576                );
1577                assert_eq!(
1578                    block_info.flags,
1579                    fblock::DeviceFlag::READONLY
1580                        | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1581                        | fblock::DeviceFlag::BARRIER_SUPPORT
1582                        | fblock::DeviceFlag::FUA_SUPPORT
1583                );
1584
1585                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1586
1587                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1588                assert_eq!(status, zx::sys::ZX_OK);
1589                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1590
1591                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1592                assert_eq!(status, zx::sys::ZX_OK);
1593                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1594
1595                let (status, name) = proxy.get_name().await.unwrap();
1596                assert_eq!(status, zx::sys::ZX_OK);
1597                assert_eq!(name.as_ref(), Some(&partition_info.name));
1598
1599                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1600                assert_eq!(metadata.name, name);
1601                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1602                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1603                assert_eq!(
1604                    metadata.start_block_offset,
1605                    Some(partition_info.block_range.as_ref().unwrap().start)
1606                );
1607                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1608                assert_eq!(metadata.flags, Some(partition_info.flags));
1609
1610                std::mem::drop(proxy);
1611            }
1612        );
1613    }
1614
1615    #[fuchsia::test]
1616    async fn test_attach_vmo() {
1617        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1618
1619        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1620        let koid = vmo.koid().unwrap();
1621
1622        futures::join!(
1623            async {
1624                let block_server = BlockServer::new(
1625                    BLOCK_SIZE,
1626                    Arc::new(MockInterface {
1627                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1628                            assert_eq!(vmo.koid().unwrap(), koid);
1629                            Box::pin(async { Ok(()) })
1630                        })),
1631                        ..MockInterface::default()
1632                    }),
1633                );
1634                block_server.handle_requests(stream).await.unwrap();
1635            },
1636            async move {
1637                let (session_proxy, server) = fidl::endpoints::create_proxy();
1638
1639                proxy.open_session(server).unwrap();
1640
1641                let vmo_id = session_proxy
1642                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1643                    .await
1644                    .unwrap()
1645                    .unwrap();
1646                assert_ne!(vmo_id.id, 0);
1647
1648                let mut fifo =
1649                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1650                let (mut reader, mut writer) = fifo.async_io();
1651
1652                // Keep attaching VMOs until we eventually hit the maximum.
1653                let mut count = 1;
1654                loop {
1655                    match session_proxy
1656                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1657                        .await
1658                        .unwrap()
1659                    {
1660                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1661                        Err(e) => {
1662                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1663                            break;
1664                        }
1665                    }
1666
1667                    // Only test every 10 to keep test time down.
1668                    if count % 10 == 0 {
1669                        writer
1670                            .write_entries(&BlockFifoRequest {
1671                                command: BlockFifoCommand {
1672                                    opcode: BlockOpcode::Read.into_primitive(),
1673                                    ..Default::default()
1674                                },
1675                                vmoid: vmo_id.id,
1676                                length: 1,
1677                                ..Default::default()
1678                            })
1679                            .await
1680                            .unwrap();
1681
1682                        let mut response = BlockFifoResponse::default();
1683                        reader.read_entries(&mut response).await.unwrap();
1684                        assert_eq!(response.status, zx::sys::ZX_OK);
1685                    }
1686
1687                    count += 1;
1688                }
1689
1690                assert_eq!(count, u16::MAX as u64);
1691
1692                // Detach the original VMO, and make sure we can then attach another one.
1693                writer
1694                    .write_entries(&BlockFifoRequest {
1695                        command: BlockFifoCommand {
1696                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1697                            ..Default::default()
1698                        },
1699                        vmoid: vmo_id.id,
1700                        ..Default::default()
1701                    })
1702                    .await
1703                    .unwrap();
1704
1705                let mut response = BlockFifoResponse::default();
1706                reader.read_entries(&mut response).await.unwrap();
1707                assert_eq!(response.status, zx::sys::ZX_OK);
1708
1709                let new_vmo_id = session_proxy
1710                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1711                    .await
1712                    .unwrap()
1713                    .unwrap();
1714                // It should reuse the same ID.
1715                assert_eq!(new_vmo_id.id, vmo_id.id);
1716
1717                std::mem::drop(proxy);
1718            }
1719        );
1720    }
1721
1722    #[fuchsia::test]
1723    async fn test_close() {
1724        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1725
1726        let mut server = std::pin::pin!(
1727            async {
1728                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1729                block_server.handle_requests(stream).await.unwrap();
1730            }
1731            .fuse()
1732        );
1733
1734        let mut client = std::pin::pin!(
1735            async {
1736                let (session_proxy, server) = fidl::endpoints::create_proxy();
1737
1738                proxy.open_session(server).unwrap();
1739
1740                // Dropping the proxy should not cause the session to terminate because the session is
1741                // still live.
1742                std::mem::drop(proxy);
1743
1744                session_proxy.close().await.unwrap().unwrap();
1745
1746                // Keep the session alive.  Calling `close` should cause the server to terminate.
1747                let _: () = std::future::pending().await;
1748            }
1749            .fuse()
1750        );
1751
1752        futures::select!(
1753            _ = server => {}
1754            _ = client => unreachable!(),
1755        );
1756    }
1757
1758    #[derive(Default)]
1759    struct IoMockInterface {
1760        do_checks: bool,
1761        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1762        return_errors: bool,
1763    }
1764
1765    #[derive(Debug)]
1766    enum ExpectedOp {
1767        Read(u64, u32, u64),
1768        Write(u64, u32, u64),
1769        Trim(u64, u32),
1770        Flush,
1771    }
1772
1773    impl super::async_interface::Interface for IoMockInterface {
1774        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1775            Ok(())
1776        }
1777
1778        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1779            Cow::Owned(test_device_info())
1780        }
1781
1782        async fn read(
1783            &self,
1784            device_block_offset: u64,
1785            block_count: u32,
1786            _vmo: &Arc<zx::Vmo>,
1787            vmo_offset: u64,
1788            _opts: ReadOptions,
1789            _trace_flow_id: TraceFlowId,
1790        ) -> Result<(), zx::Status> {
1791            if self.return_errors {
1792                Err(zx::Status::INTERNAL)
1793            } else {
1794                if self.do_checks {
1795                    assert_matches!(
1796                        self.expected_op.lock().take(),
1797                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1798                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1799                        "Read {device_block_offset} {block_count} {vmo_offset}"
1800                    );
1801                }
1802                Ok(())
1803            }
1804        }
1805
1806        async fn write(
1807            &self,
1808            device_block_offset: u64,
1809            block_count: u32,
1810            _vmo: &Arc<zx::Vmo>,
1811            vmo_offset: u64,
1812            _write_opts: WriteOptions,
1813            _trace_flow_id: TraceFlowId,
1814        ) -> Result<(), zx::Status> {
1815            if self.return_errors {
1816                Err(zx::Status::NOT_SUPPORTED)
1817            } else {
1818                if self.do_checks {
1819                    assert_matches!(
1820                        self.expected_op.lock().take(),
1821                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1822                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1823                        "Write {device_block_offset} {block_count} {vmo_offset}"
1824                    );
1825                }
1826                Ok(())
1827            }
1828        }
1829
1830        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1831            if self.return_errors {
1832                Err(zx::Status::NO_RESOURCES)
1833            } else {
1834                if self.do_checks {
1835                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1836                }
1837                Ok(())
1838            }
1839        }
1840
1841        async fn trim(
1842            &self,
1843            device_block_offset: u64,
1844            block_count: u32,
1845            _trace_flow_id: TraceFlowId,
1846        ) -> Result<(), zx::Status> {
1847            if self.return_errors {
1848                Err(zx::Status::NO_MEMORY)
1849            } else {
1850                if self.do_checks {
1851                    assert_matches!(
1852                        self.expected_op.lock().take(),
1853                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1854                            block_count == b,
1855                        "Trim {device_block_offset} {block_count}"
1856                    );
1857                }
1858                Ok(())
1859            }
1860        }
1861    }
1862
1863    #[fuchsia::test]
1864    async fn test_io() {
1865        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1866
1867        let expected_op = Arc::new(Mutex::new(None));
1868        let expected_op_clone = expected_op.clone();
1869
1870        let server = async {
1871            let block_server = BlockServer::new(
1872                BLOCK_SIZE,
1873                Arc::new(IoMockInterface {
1874                    return_errors: false,
1875                    do_checks: true,
1876                    expected_op: expected_op_clone,
1877                }),
1878            );
1879            block_server.handle_requests(stream).await.unwrap();
1880        };
1881
1882        let client = async move {
1883            let (session_proxy, server) = fidl::endpoints::create_proxy();
1884
1885            proxy.open_session(server).unwrap();
1886
1887            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1888            let vmo_id = session_proxy
1889                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1890                .await
1891                .unwrap()
1892                .unwrap();
1893
1894            let mut fifo =
1895                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1896            let (mut reader, mut writer) = fifo.async_io();
1897
1898            // READ
1899            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1900            writer
1901                .write_entries(&BlockFifoRequest {
1902                    command: BlockFifoCommand {
1903                        opcode: BlockOpcode::Read.into_primitive(),
1904                        ..Default::default()
1905                    },
1906                    vmoid: vmo_id.id,
1907                    dev_offset: 1,
1908                    length: 2,
1909                    vmo_offset: 3,
1910                    ..Default::default()
1911                })
1912                .await
1913                .unwrap();
1914
1915            let mut response = BlockFifoResponse::default();
1916            reader.read_entries(&mut response).await.unwrap();
1917            assert_eq!(response.status, zx::sys::ZX_OK);
1918
1919            // WRITE
1920            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1921            writer
1922                .write_entries(&BlockFifoRequest {
1923                    command: BlockFifoCommand {
1924                        opcode: BlockOpcode::Write.into_primitive(),
1925                        ..Default::default()
1926                    },
1927                    vmoid: vmo_id.id,
1928                    dev_offset: 4,
1929                    length: 5,
1930                    vmo_offset: 6,
1931                    ..Default::default()
1932                })
1933                .await
1934                .unwrap();
1935
1936            let mut response = BlockFifoResponse::default();
1937            reader.read_entries(&mut response).await.unwrap();
1938            assert_eq!(response.status, zx::sys::ZX_OK);
1939
1940            // FLUSH
1941            *expected_op.lock() = Some(ExpectedOp::Flush);
1942            writer
1943                .write_entries(&BlockFifoRequest {
1944                    command: BlockFifoCommand {
1945                        opcode: BlockOpcode::Flush.into_primitive(),
1946                        ..Default::default()
1947                    },
1948                    ..Default::default()
1949                })
1950                .await
1951                .unwrap();
1952
1953            reader.read_entries(&mut response).await.unwrap();
1954            assert_eq!(response.status, zx::sys::ZX_OK);
1955
1956            // TRIM
1957            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1958            writer
1959                .write_entries(&BlockFifoRequest {
1960                    command: BlockFifoCommand {
1961                        opcode: BlockOpcode::Trim.into_primitive(),
1962                        ..Default::default()
1963                    },
1964                    dev_offset: 7,
1965                    length: 8,
1966                    ..Default::default()
1967                })
1968                .await
1969                .unwrap();
1970
1971            reader.read_entries(&mut response).await.unwrap();
1972            assert_eq!(response.status, zx::sys::ZX_OK);
1973
1974            std::mem::drop(proxy);
1975        };
1976
1977        futures::join!(server, client);
1978    }
1979
1980    #[fuchsia::test]
1981    async fn test_io_errors() {
1982        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1983
1984        futures::join!(
1985            async {
1986                let block_server = BlockServer::new(
1987                    BLOCK_SIZE,
1988                    Arc::new(IoMockInterface {
1989                        return_errors: true,
1990                        do_checks: false,
1991                        expected_op: Arc::new(Mutex::new(None)),
1992                    }),
1993                );
1994                block_server.handle_requests(stream).await.unwrap();
1995            },
1996            async move {
1997                let (session_proxy, server) = fidl::endpoints::create_proxy();
1998
1999                proxy.open_session(server).unwrap();
2000
2001                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2002                let vmo_id = session_proxy
2003                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2004                    .await
2005                    .unwrap()
2006                    .unwrap();
2007
2008                let mut fifo =
2009                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2010                let (mut reader, mut writer) = fifo.async_io();
2011
2012                // READ
2013                writer
2014                    .write_entries(&BlockFifoRequest {
2015                        command: BlockFifoCommand {
2016                            opcode: BlockOpcode::Read.into_primitive(),
2017                            ..Default::default()
2018                        },
2019                        vmoid: vmo_id.id,
2020                        length: 1,
2021                        reqid: 1,
2022                        ..Default::default()
2023                    })
2024                    .await
2025                    .unwrap();
2026
2027                let mut response = BlockFifoResponse::default();
2028                reader.read_entries(&mut response).await.unwrap();
2029                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2030
2031                // WRITE
2032                writer
2033                    .write_entries(&BlockFifoRequest {
2034                        command: BlockFifoCommand {
2035                            opcode: BlockOpcode::Write.into_primitive(),
2036                            ..Default::default()
2037                        },
2038                        vmoid: vmo_id.id,
2039                        length: 1,
2040                        reqid: 2,
2041                        ..Default::default()
2042                    })
2043                    .await
2044                    .unwrap();
2045
2046                reader.read_entries(&mut response).await.unwrap();
2047                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2048
2049                // FLUSH
2050                writer
2051                    .write_entries(&BlockFifoRequest {
2052                        command: BlockFifoCommand {
2053                            opcode: BlockOpcode::Flush.into_primitive(),
2054                            ..Default::default()
2055                        },
2056                        reqid: 3,
2057                        ..Default::default()
2058                    })
2059                    .await
2060                    .unwrap();
2061
2062                reader.read_entries(&mut response).await.unwrap();
2063                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2064
2065                // TRIM
2066                writer
2067                    .write_entries(&BlockFifoRequest {
2068                        command: BlockFifoCommand {
2069                            opcode: BlockOpcode::Trim.into_primitive(),
2070                            ..Default::default()
2071                        },
2072                        reqid: 4,
2073                        length: 1,
2074                        ..Default::default()
2075                    })
2076                    .await
2077                    .unwrap();
2078
2079                reader.read_entries(&mut response).await.unwrap();
2080                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2081
2082                std::mem::drop(proxy);
2083            }
2084        );
2085    }
2086
2087    #[fuchsia::test]
2088    async fn test_invalid_args() {
2089        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2090
2091        futures::join!(
2092            async {
2093                let block_server = BlockServer::new(
2094                    BLOCK_SIZE,
2095                    Arc::new(IoMockInterface {
2096                        return_errors: false,
2097                        do_checks: false,
2098                        expected_op: Arc::new(Mutex::new(None)),
2099                    }),
2100                );
2101                block_server.handle_requests(stream).await.unwrap();
2102            },
2103            async move {
2104                let (session_proxy, server) = fidl::endpoints::create_proxy();
2105
2106                proxy.open_session(server).unwrap();
2107
2108                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2109                let vmo_id = session_proxy
2110                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2111                    .await
2112                    .unwrap()
2113                    .unwrap();
2114
2115                let mut fifo =
2116                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2117
2118                async fn test(
2119                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2120                    request: BlockFifoRequest,
2121                ) -> Result<(), zx::Status> {
2122                    let (mut reader, mut writer) = fifo.async_io();
2123                    writer.write_entries(&request).await.unwrap();
2124                    let mut response = BlockFifoResponse::default();
2125                    reader.read_entries(&mut response).await.unwrap();
2126                    zx::Status::ok(response.status)
2127                }
2128
2129                // READ
2130
2131                let good_read_request = || BlockFifoRequest {
2132                    command: BlockFifoCommand {
2133                        opcode: BlockOpcode::Read.into_primitive(),
2134                        ..Default::default()
2135                    },
2136                    length: 1,
2137                    vmoid: vmo_id.id,
2138                    ..Default::default()
2139                };
2140
2141                assert_eq!(
2142                    test(
2143                        &mut fifo,
2144                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2145                    )
2146                    .await,
2147                    Err(zx::Status::IO)
2148                );
2149
2150                assert_eq!(
2151                    test(
2152                        &mut fifo,
2153                        BlockFifoRequest {
2154                            vmo_offset: 0xffff_ffff_ffff_ffff,
2155                            ..good_read_request()
2156                        }
2157                    )
2158                    .await,
2159                    Err(zx::Status::OUT_OF_RANGE)
2160                );
2161
2162                assert_eq!(
2163                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2164                    Err(zx::Status::INVALID_ARGS)
2165                );
2166
2167                // WRITE
2168
2169                let good_write_request = || BlockFifoRequest {
2170                    command: BlockFifoCommand {
2171                        opcode: BlockOpcode::Write.into_primitive(),
2172                        ..Default::default()
2173                    },
2174                    length: 1,
2175                    vmoid: vmo_id.id,
2176                    ..Default::default()
2177                };
2178
2179                assert_eq!(
2180                    test(
2181                        &mut fifo,
2182                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2183                    )
2184                    .await,
2185                    Err(zx::Status::IO)
2186                );
2187
2188                assert_eq!(
2189                    test(
2190                        &mut fifo,
2191                        BlockFifoRequest {
2192                            vmo_offset: 0xffff_ffff_ffff_ffff,
2193                            ..good_write_request()
2194                        }
2195                    )
2196                    .await,
2197                    Err(zx::Status::OUT_OF_RANGE)
2198                );
2199
2200                assert_eq!(
2201                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2202                    Err(zx::Status::INVALID_ARGS)
2203                );
2204
2205                // CLOSE VMO
2206
2207                assert_eq!(
2208                    test(
2209                        &mut fifo,
2210                        BlockFifoRequest {
2211                            command: BlockFifoCommand {
2212                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2213                                ..Default::default()
2214                            },
2215                            vmoid: vmo_id.id + 1,
2216                            ..Default::default()
2217                        }
2218                    )
2219                    .await,
2220                    Err(zx::Status::IO)
2221                );
2222
2223                std::mem::drop(proxy);
2224            }
2225        );
2226    }
2227
2228    #[fuchsia::test]
2229    async fn test_concurrent_requests() {
2230        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2231
2232        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2233        let waiting_readers_clone = waiting_readers.clone();
2234
2235        futures::join!(
2236            async move {
2237                let block_server = BlockServer::new(
2238                    BLOCK_SIZE,
2239                    Arc::new(MockInterface {
2240                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2241                            let (tx, rx) = oneshot::channel();
2242                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2243                            Box::pin(async move {
2244                                let _ = rx.await;
2245                                Ok(())
2246                            })
2247                        })),
2248                        ..MockInterface::default()
2249                    }),
2250                );
2251                block_server.handle_requests(stream).await.unwrap();
2252            },
2253            async move {
2254                let (session_proxy, server) = fidl::endpoints::create_proxy();
2255
2256                proxy.open_session(server).unwrap();
2257
2258                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2259                let vmo_id = session_proxy
2260                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2261                    .await
2262                    .unwrap()
2263                    .unwrap();
2264
2265                let mut fifo =
2266                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2267                let (mut reader, mut writer) = fifo.async_io();
2268
2269                writer
2270                    .write_entries(&BlockFifoRequest {
2271                        command: BlockFifoCommand {
2272                            opcode: BlockOpcode::Read.into_primitive(),
2273                            ..Default::default()
2274                        },
2275                        reqid: 1,
2276                        dev_offset: 1, // Intentionally use the same as `reqid`.
2277                        vmoid: vmo_id.id,
2278                        length: 1,
2279                        ..Default::default()
2280                    })
2281                    .await
2282                    .unwrap();
2283
2284                writer
2285                    .write_entries(&BlockFifoRequest {
2286                        command: BlockFifoCommand {
2287                            opcode: BlockOpcode::Read.into_primitive(),
2288                            ..Default::default()
2289                        },
2290                        reqid: 2,
2291                        dev_offset: 2,
2292                        vmoid: vmo_id.id,
2293                        length: 1,
2294                        ..Default::default()
2295                    })
2296                    .await
2297                    .unwrap();
2298
2299                // Wait till both those entries are pending.
2300                poll_fn(|cx: &mut Context<'_>| {
2301                    if waiting_readers.lock().len() == 2 {
2302                        Poll::Ready(())
2303                    } else {
2304                        // Yield to the executor.
2305                        cx.waker().wake_by_ref();
2306                        Poll::Pending
2307                    }
2308                })
2309                .await;
2310
2311                let mut response = BlockFifoResponse::default();
2312                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2313
2314                let (id, tx) = waiting_readers.lock().pop().unwrap();
2315                tx.send(()).unwrap();
2316
2317                reader.read_entries(&mut response).await.unwrap();
2318                assert_eq!(response.status, zx::sys::ZX_OK);
2319                assert_eq!(response.reqid, id);
2320
2321                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2322
2323                let (id, tx) = waiting_readers.lock().pop().unwrap();
2324                tx.send(()).unwrap();
2325
2326                reader.read_entries(&mut response).await.unwrap();
2327                assert_eq!(response.status, zx::sys::ZX_OK);
2328                assert_eq!(response.reqid, id);
2329            }
2330        );
2331    }
2332
2333    #[fuchsia::test]
2334    async fn test_session_close_is_synchronous() {
2335        use futures::{FutureExt as _, StreamExt as _};
2336
2337        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2338
2339        let (start_tx, mut start_rx) = futures::channel::mpsc::channel(1);
2340        let (finish_tx, finish_rx) = futures::channel::oneshot::channel();
2341        let finish_rx = Arc::new(Mutex::new(Some(finish_rx)));
2342
2343        futures::join!(
2344            async move {
2345                let block_server = BlockServer::new(
2346                    BLOCK_SIZE,
2347                    Arc::new(MockInterface {
2348                        read_hook: Some(Box::new(move |_, _, _, _| {
2349                            let mut start_tx = start_tx.clone();
2350                            let finish_rx = finish_rx.lock().take().unwrap();
2351                            Box::pin(async move {
2352                                start_tx.try_send(()).unwrap();
2353                                let _ = finish_rx.await;
2354                                Ok(())
2355                            })
2356                        })),
2357                        ..MockInterface::default()
2358                    }),
2359                );
2360                block_server.handle_requests(stream).await.unwrap();
2361            },
2362            async move {
2363                let (session_proxy, server) = fidl::endpoints::create_proxy();
2364                proxy.open_session(server).unwrap();
2365
2366                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2367                let vmo_id = session_proxy
2368                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2369                    .await
2370                    .unwrap()
2371                    .unwrap();
2372
2373                let mut fifo = fasync::Fifo::<BlockFifoResponse, BlockFifoRequest>::from_fifo(
2374                    session_proxy.get_fifo().await.unwrap().unwrap(),
2375                );
2376                let (_reader, mut writer) = fifo.async_io();
2377
2378                writer
2379                    .write_entries(&BlockFifoRequest {
2380                        command: BlockFifoCommand {
2381                            opcode: BlockOpcode::Read.into_primitive(),
2382                            ..Default::default()
2383                        },
2384                        reqid: 1,
2385                        vmoid: vmo_id.id,
2386                        length: 1,
2387                        ..Default::default()
2388                    })
2389                    .await
2390                    .unwrap();
2391
2392                // Wait for the read to actually start.
2393                start_rx.next().await.unwrap();
2394
2395                // The close request shouldn't complete yet because the read is still hanging.
2396                let mut close_fut = std::pin::pin!(session_proxy.close().fuse());
2397                let mut timer_fut = std::pin::pin!(
2398                    fasync::Timer::new(std::time::Duration::from_millis(100)).fuse()
2399                );
2400                futures::select! {
2401                    res = close_fut => panic!("close completed too early: {:?}", res),
2402                    _ = timer_fut => {}
2403                }
2404
2405                // Finish the pending request.
2406                finish_tx.send(()).unwrap();
2407
2408                // Verify that close() now completes.
2409                close_fut.await.unwrap().unwrap();
2410
2411                std::mem::drop(proxy);
2412            }
2413        );
2414    }
2415
2416    #[fuchsia::test]
2417    async fn test_groups() {
2418        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2419
2420        futures::join!(
2421            async move {
2422                let block_server = BlockServer::new(
2423                    BLOCK_SIZE,
2424                    Arc::new(MockInterface {
2425                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2426                        ..MockInterface::default()
2427                    }),
2428                );
2429                block_server.handle_requests(stream).await.unwrap();
2430            },
2431            async move {
2432                let (session_proxy, server) = fidl::endpoints::create_proxy();
2433
2434                proxy.open_session(server).unwrap();
2435
2436                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2437                let vmo_id = session_proxy
2438                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2439                    .await
2440                    .unwrap()
2441                    .unwrap();
2442
2443                let mut fifo =
2444                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2445                let (mut reader, mut writer) = fifo.async_io();
2446
2447                writer
2448                    .write_entries(&BlockFifoRequest {
2449                        command: BlockFifoCommand {
2450                            opcode: BlockOpcode::Read.into_primitive(),
2451                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2452                            ..Default::default()
2453                        },
2454                        group: 1,
2455                        vmoid: vmo_id.id,
2456                        length: 1,
2457                        ..Default::default()
2458                    })
2459                    .await
2460                    .unwrap();
2461
2462                writer
2463                    .write_entries(&BlockFifoRequest {
2464                        command: BlockFifoCommand {
2465                            opcode: BlockOpcode::Read.into_primitive(),
2466                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2467                            ..Default::default()
2468                        },
2469                        reqid: 2,
2470                        group: 1,
2471                        vmoid: vmo_id.id,
2472                        length: 1,
2473                        ..Default::default()
2474                    })
2475                    .await
2476                    .unwrap();
2477
2478                let mut response = BlockFifoResponse::default();
2479                reader.read_entries(&mut response).await.unwrap();
2480                assert_eq!(response.status, zx::sys::ZX_OK);
2481                assert_eq!(response.reqid, 2);
2482                assert_eq!(response.group, 1);
2483            }
2484        );
2485    }
2486
2487    #[fuchsia::test]
2488    async fn test_group_error() {
2489        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2490
2491        let counter = Arc::new(AtomicU64::new(0));
2492        let counter_clone = counter.clone();
2493
2494        futures::join!(
2495            async move {
2496                let block_server = BlockServer::new(
2497                    BLOCK_SIZE,
2498                    Arc::new(MockInterface {
2499                        read_hook: Some(Box::new(move |_, _, _, _| {
2500                            counter_clone.fetch_add(1, Ordering::Relaxed);
2501                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2502                        })),
2503                        ..MockInterface::default()
2504                    }),
2505                );
2506                block_server.handle_requests(stream).await.unwrap();
2507            },
2508            async move {
2509                let (session_proxy, server) = fidl::endpoints::create_proxy();
2510
2511                proxy.open_session(server).unwrap();
2512
2513                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2514                let vmo_id = session_proxy
2515                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2516                    .await
2517                    .unwrap()
2518                    .unwrap();
2519
2520                let mut fifo =
2521                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2522                let (mut reader, mut writer) = fifo.async_io();
2523
2524                writer
2525                    .write_entries(&BlockFifoRequest {
2526                        command: BlockFifoCommand {
2527                            opcode: BlockOpcode::Read.into_primitive(),
2528                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2529                            ..Default::default()
2530                        },
2531                        group: 1,
2532                        vmoid: vmo_id.id,
2533                        length: 1,
2534                        ..Default::default()
2535                    })
2536                    .await
2537                    .unwrap();
2538
2539                // Wait until processed.
2540                poll_fn(|cx: &mut Context<'_>| {
2541                    if counter.load(Ordering::Relaxed) == 1 {
2542                        Poll::Ready(())
2543                    } else {
2544                        // Yield to the executor.
2545                        cx.waker().wake_by_ref();
2546                        Poll::Pending
2547                    }
2548                })
2549                .await;
2550
2551                let mut response = BlockFifoResponse::default();
2552                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2553
2554                writer
2555                    .write_entries(&BlockFifoRequest {
2556                        command: BlockFifoCommand {
2557                            opcode: BlockOpcode::Read.into_primitive(),
2558                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2559                            ..Default::default()
2560                        },
2561                        group: 1,
2562                        vmoid: vmo_id.id,
2563                        length: 1,
2564                        ..Default::default()
2565                    })
2566                    .await
2567                    .unwrap();
2568
2569                writer
2570                    .write_entries(&BlockFifoRequest {
2571                        command: BlockFifoCommand {
2572                            opcode: BlockOpcode::Read.into_primitive(),
2573                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2574                            ..Default::default()
2575                        },
2576                        reqid: 2,
2577                        group: 1,
2578                        vmoid: vmo_id.id,
2579                        length: 1,
2580                        ..Default::default()
2581                    })
2582                    .await
2583                    .unwrap();
2584
2585                reader.read_entries(&mut response).await.unwrap();
2586                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2587                assert_eq!(response.reqid, 2);
2588                assert_eq!(response.group, 1);
2589
2590                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2591
2592                // Only the first request should have been processed.
2593                assert_eq!(counter.load(Ordering::Relaxed), 1);
2594            }
2595        );
2596    }
2597
2598    #[fuchsia::test]
2599    async fn test_group_with_two_lasts() {
2600        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2601
2602        let (tx, rx) = oneshot::channel();
2603
2604        futures::join!(
2605            async move {
2606                let rx = Mutex::new(Some(rx));
2607                let block_server = BlockServer::new(
2608                    BLOCK_SIZE,
2609                    Arc::new(MockInterface {
2610                        read_hook: Some(Box::new(move |_, _, _, _| {
2611                            let rx = rx.lock().take().unwrap();
2612                            Box::pin(async {
2613                                let _ = rx.await;
2614                                Ok(())
2615                            })
2616                        })),
2617                        ..MockInterface::default()
2618                    }),
2619                );
2620                block_server.handle_requests(stream).await.unwrap();
2621            },
2622            async move {
2623                let (session_proxy, server) = fidl::endpoints::create_proxy();
2624
2625                proxy.open_session(server).unwrap();
2626
2627                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2628                let vmo_id = session_proxy
2629                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2630                    .await
2631                    .unwrap()
2632                    .unwrap();
2633
2634                let mut fifo =
2635                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2636                let (mut reader, mut writer) = fifo.async_io();
2637
2638                writer
2639                    .write_entries(&BlockFifoRequest {
2640                        command: BlockFifoCommand {
2641                            opcode: BlockOpcode::Read.into_primitive(),
2642                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2643                            ..Default::default()
2644                        },
2645                        reqid: 1,
2646                        group: 1,
2647                        vmoid: vmo_id.id,
2648                        length: 1,
2649                        ..Default::default()
2650                    })
2651                    .await
2652                    .unwrap();
2653
2654                writer
2655                    .write_entries(&BlockFifoRequest {
2656                        command: BlockFifoCommand {
2657                            opcode: BlockOpcode::Read.into_primitive(),
2658                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2659                            ..Default::default()
2660                        },
2661                        reqid: 2,
2662                        group: 1,
2663                        vmoid: vmo_id.id,
2664                        length: 1,
2665                        ..Default::default()
2666                    })
2667                    .await
2668                    .unwrap();
2669
2670                // Send an independent request to flush through the fifo.
2671                writer
2672                    .write_entries(&BlockFifoRequest {
2673                        command: BlockFifoCommand {
2674                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2675                            ..Default::default()
2676                        },
2677                        reqid: 3,
2678                        vmoid: vmo_id.id,
2679                        ..Default::default()
2680                    })
2681                    .await
2682                    .unwrap();
2683
2684                // It should succeed.
2685                let mut response = BlockFifoResponse::default();
2686                reader.read_entries(&mut response).await.unwrap();
2687                assert_eq!(response.status, zx::sys::ZX_OK);
2688                assert_eq!(response.reqid, 3);
2689
2690                // Now release the original request.
2691                tx.send(()).unwrap();
2692
2693                // The response should be for the first message tagged as last, and it should be
2694                // an error because we sent two messages with the LAST marker.
2695                let mut response = BlockFifoResponse::default();
2696                reader.read_entries(&mut response).await.unwrap();
2697                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2698                assert_eq!(response.reqid, 1);
2699                assert_eq!(response.group, 1);
2700            }
2701        );
2702    }
2703
2704    #[fuchsia::test(allow_stalls = false)]
2705    async fn test_requests_dont_block_sessions() {
2706        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2707
2708        let (tx, rx) = oneshot::channel();
2709
2710        fasync::Task::local(async move {
2711            let rx = Mutex::new(Some(rx));
2712            let block_server = BlockServer::new(
2713                BLOCK_SIZE,
2714                Arc::new(MockInterface {
2715                    read_hook: Some(Box::new(move |_, _, _, _| {
2716                        let rx = rx.lock().take().unwrap();
2717                        Box::pin(async {
2718                            let _ = rx.await;
2719                            Ok(())
2720                        })
2721                    })),
2722                    ..MockInterface::default()
2723                }),
2724            );
2725            block_server.handle_requests(stream).await.unwrap();
2726        })
2727        .detach();
2728
2729        let mut fut = pin!(async {
2730            let (session_proxy, server) = fidl::endpoints::create_proxy();
2731
2732            proxy.open_session(server).unwrap();
2733
2734            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2735            let vmo_id = session_proxy
2736                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2737                .await
2738                .unwrap()
2739                .unwrap();
2740
2741            let mut fifo =
2742                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2743            let (mut reader, mut writer) = fifo.async_io();
2744
2745            writer
2746                .write_entries(&BlockFifoRequest {
2747                    command: BlockFifoCommand {
2748                        opcode: BlockOpcode::Read.into_primitive(),
2749                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2750                        ..Default::default()
2751                    },
2752                    reqid: 1,
2753                    group: 1,
2754                    vmoid: vmo_id.id,
2755                    length: 1,
2756                    ..Default::default()
2757                })
2758                .await
2759                .unwrap();
2760
2761            let mut response = BlockFifoResponse::default();
2762            reader.read_entries(&mut response).await.unwrap();
2763            assert_eq!(response.status, zx::sys::ZX_OK);
2764        });
2765
2766        // The response won't come back until we send on `tx`.
2767        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2768
2769        let mut fut2 = pin!(proxy.get_volume_info());
2770
2771        // get_volume_info is set up to stall forever.
2772        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2773
2774        // If we now free up the first future, it should resolve; the stalled call to
2775        // get_volume_info should not block the fifo response.
2776        let _ = tx.send(());
2777
2778        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2779    }
2780
2781    #[fuchsia::test]
2782    async fn test_request_flow_control() {
2783        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2784
2785        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2786        // server will block until that happens.
2787        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2788        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2789        let event_clone = event.clone();
2790        futures::join!(
2791            async move {
2792                let block_server = BlockServer::new(
2793                    BLOCK_SIZE,
2794                    Arc::new(MockInterface {
2795                        read_hook: Some(Box::new(move |_, _, _, _| {
2796                            let event_clone = event_clone.clone();
2797                            Box::pin(async move {
2798                                if !event_clone.1.load(Ordering::SeqCst) {
2799                                    event_clone.0.listen().await;
2800                                }
2801                                Ok(())
2802                            })
2803                        })),
2804                        ..MockInterface::default()
2805                    }),
2806                );
2807                block_server.handle_requests(stream).await.unwrap();
2808            },
2809            async move {
2810                let (session_proxy, server) = fidl::endpoints::create_proxy();
2811
2812                proxy.open_session(server).unwrap();
2813
2814                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2815                let vmo_id = session_proxy
2816                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2817                    .await
2818                    .unwrap()
2819                    .unwrap();
2820
2821                let mut fifo =
2822                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2823                let (mut reader, mut writer) = fifo.async_io();
2824
2825                for i in 0..MAX_REQUESTS {
2826                    writer
2827                        .write_entries(&BlockFifoRequest {
2828                            command: BlockFifoCommand {
2829                                opcode: BlockOpcode::Read.into_primitive(),
2830                                ..Default::default()
2831                            },
2832                            reqid: (i + 1) as u32,
2833                            dev_offset: i,
2834                            vmoid: vmo_id.id,
2835                            length: 1,
2836                            ..Default::default()
2837                        })
2838                        .await
2839                        .unwrap();
2840                }
2841                assert!(
2842                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2843                        command: BlockFifoCommand {
2844                            opcode: BlockOpcode::Read.into_primitive(),
2845                            ..Default::default()
2846                        },
2847                        reqid: u32::MAX,
2848                        dev_offset: MAX_REQUESTS,
2849                        vmoid: vmo_id.id,
2850                        length: 1,
2851                        ..Default::default()
2852                    })))
2853                    .is_pending()
2854                );
2855                // OK, let the server start to process.
2856                event.1.store(true, Ordering::SeqCst);
2857                event.0.notify(usize::MAX);
2858                // For each entry we read, make sure we can write a new one in.
2859                let mut finished_reqids = vec![];
2860                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2861                    let mut response = BlockFifoResponse::default();
2862                    reader.read_entries(&mut response).await.unwrap();
2863                    assert_eq!(response.status, zx::sys::ZX_OK);
2864                    finished_reqids.push(response.reqid);
2865                    writer
2866                        .write_entries(&BlockFifoRequest {
2867                            command: BlockFifoCommand {
2868                                opcode: BlockOpcode::Read.into_primitive(),
2869                                ..Default::default()
2870                            },
2871                            reqid: (i + 1) as u32,
2872                            dev_offset: i,
2873                            vmoid: vmo_id.id,
2874                            length: 1,
2875                            ..Default::default()
2876                        })
2877                        .await
2878                        .unwrap();
2879                }
2880                let mut response = BlockFifoResponse::default();
2881                for _ in 0..MAX_REQUESTS {
2882                    reader.read_entries(&mut response).await.unwrap();
2883                    assert_eq!(response.status, zx::sys::ZX_OK);
2884                    finished_reqids.push(response.reqid);
2885                }
2886                // Verify that we got a response for each request.  Note that we can't assume FIFO
2887                // ordering.
2888                finished_reqids.sort();
2889                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2890                let mut i = 1;
2891                for reqid in finished_reqids {
2892                    assert_eq!(reqid, i);
2893                    i += 1;
2894                }
2895            }
2896        );
2897    }
2898
2899    #[fuchsia::test]
2900    async fn test_passthrough_io_with_fixed_map() {
2901        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2902
2903        let expected_op = Arc::new(Mutex::new(None));
2904        let expected_op_clone = expected_op.clone();
2905        futures::join!(
2906            async {
2907                let block_server = BlockServer::new(
2908                    BLOCK_SIZE,
2909                    Arc::new(IoMockInterface {
2910                        return_errors: false,
2911                        do_checks: true,
2912                        expected_op: expected_op_clone,
2913                    }),
2914                );
2915                block_server.handle_requests(stream).await.unwrap();
2916            },
2917            async move {
2918                let (session_proxy, server) = fidl::endpoints::create_proxy();
2919
2920                let mapping = fblock::BlockOffsetMapping {
2921                    source_block_offset: 0,
2922                    target_block_offset: 10,
2923                    length: 20,
2924                };
2925                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2926
2927                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2928                let vmo_id = session_proxy
2929                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2930                    .await
2931                    .unwrap()
2932                    .unwrap();
2933
2934                let mut fifo =
2935                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2936                let (mut reader, mut writer) = fifo.async_io();
2937
2938                // READ
2939                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2940                writer
2941                    .write_entries(&BlockFifoRequest {
2942                        command: BlockFifoCommand {
2943                            opcode: BlockOpcode::Read.into_primitive(),
2944                            ..Default::default()
2945                        },
2946                        vmoid: vmo_id.id,
2947                        dev_offset: 1,
2948                        length: 2,
2949                        vmo_offset: 3,
2950                        ..Default::default()
2951                    })
2952                    .await
2953                    .unwrap();
2954
2955                let mut response = BlockFifoResponse::default();
2956                reader.read_entries(&mut response).await.unwrap();
2957                assert_eq!(response.status, zx::sys::ZX_OK);
2958
2959                // WRITE
2960                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2961                writer
2962                    .write_entries(&BlockFifoRequest {
2963                        command: BlockFifoCommand {
2964                            opcode: BlockOpcode::Write.into_primitive(),
2965                            ..Default::default()
2966                        },
2967                        vmoid: vmo_id.id,
2968                        dev_offset: 4,
2969                        length: 5,
2970                        vmo_offset: 6,
2971                        ..Default::default()
2972                    })
2973                    .await
2974                    .unwrap();
2975
2976                reader.read_entries(&mut response).await.unwrap();
2977                assert_eq!(response.status, zx::sys::ZX_OK);
2978
2979                // FLUSH
2980                *expected_op.lock() = Some(ExpectedOp::Flush);
2981                writer
2982                    .write_entries(&BlockFifoRequest {
2983                        command: BlockFifoCommand {
2984                            opcode: BlockOpcode::Flush.into_primitive(),
2985                            ..Default::default()
2986                        },
2987                        ..Default::default()
2988                    })
2989                    .await
2990                    .unwrap();
2991
2992                reader.read_entries(&mut response).await.unwrap();
2993                assert_eq!(response.status, zx::sys::ZX_OK);
2994
2995                // TRIM
2996                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2997                writer
2998                    .write_entries(&BlockFifoRequest {
2999                        command: BlockFifoCommand {
3000                            opcode: BlockOpcode::Trim.into_primitive(),
3001                            ..Default::default()
3002                        },
3003                        dev_offset: 7,
3004                        length: 3,
3005                        ..Default::default()
3006                    })
3007                    .await
3008                    .unwrap();
3009
3010                reader.read_entries(&mut response).await.unwrap();
3011                assert_eq!(response.status, zx::sys::ZX_OK);
3012
3013                // READ past window
3014                *expected_op.lock() = None;
3015                writer
3016                    .write_entries(&BlockFifoRequest {
3017                        command: BlockFifoCommand {
3018                            opcode: BlockOpcode::Read.into_primitive(),
3019                            ..Default::default()
3020                        },
3021                        vmoid: vmo_id.id,
3022                        dev_offset: 19,
3023                        length: 2,
3024                        vmo_offset: 3,
3025                        ..Default::default()
3026                    })
3027                    .await
3028                    .unwrap();
3029
3030                reader.read_entries(&mut response).await.unwrap();
3031                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3032
3033                std::mem::drop(proxy);
3034            }
3035        );
3036    }
3037
3038    #[fuchsia::test]
3039    fn operation_map() {
3040        const BLOCK_SIZE: u32 = 512;
3041
3042        #[track_caller]
3043        fn expect_map_result(
3044            mut operation: Operation,
3045            mapping: Option<BlockOffsetMapping>,
3046            max_blocks: Option<NonZero<u32>>,
3047            expected_operations: Vec<Operation>,
3048        ) {
3049            let mut ops = vec![];
3050            while let Some(remainder) =
3051                operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
3052            {
3053                ops.push(operation);
3054                operation = remainder;
3055            }
3056            ops.push(operation);
3057            assert_eq!(ops, expected_operations);
3058        }
3059
3060        // No limits
3061        expect_map_result(
3062            Operation::Read {
3063                device_block_offset: 10,
3064                block_count: 200,
3065                _unused: 0,
3066                vmo_offset: 0,
3067                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3068            },
3069            None,
3070            None,
3071            vec![Operation::Read {
3072                device_block_offset: 10,
3073                block_count: 200,
3074                _unused: 0,
3075                vmo_offset: 0,
3076                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3077            }],
3078        );
3079
3080        // Max block count
3081        expect_map_result(
3082            Operation::Read {
3083                device_block_offset: 10,
3084                block_count: 200,
3085                _unused: 0,
3086                vmo_offset: 0,
3087                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3088            },
3089            None,
3090            NonZero::new(120),
3091            vec![
3092                Operation::Read {
3093                    device_block_offset: 10,
3094                    block_count: 120,
3095                    _unused: 0,
3096                    vmo_offset: 0,
3097                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3098                },
3099                Operation::Read {
3100                    device_block_offset: 130,
3101                    block_count: 80,
3102                    _unused: 0,
3103                    vmo_offset: 120 * BLOCK_SIZE as u64,
3104                    options: ReadOptions {
3105                        // The DUN should be offset by the number of blocks in the first request.
3106                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3107                    },
3108                },
3109            ],
3110        );
3111        expect_map_result(
3112            Operation::Trim { device_block_offset: 10, block_count: 200 },
3113            None,
3114            NonZero::new(120),
3115            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3116        );
3117
3118        // Remapping + Max block count
3119        expect_map_result(
3120            Operation::Read {
3121                device_block_offset: 10,
3122                block_count: 200,
3123                _unused: 0,
3124                vmo_offset: 0,
3125                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3126            },
3127            Some(BlockOffsetMapping {
3128                source_block_offset: 10,
3129                target_block_offset: 100,
3130                length: 200,
3131            }),
3132            NonZero::new(120),
3133            vec![
3134                Operation::Read {
3135                    device_block_offset: 100,
3136                    block_count: 120,
3137                    _unused: 0,
3138                    vmo_offset: 0,
3139                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3140                },
3141                Operation::Read {
3142                    device_block_offset: 220,
3143                    block_count: 80,
3144                    _unused: 0,
3145                    vmo_offset: 120 * BLOCK_SIZE as u64,
3146                    options: ReadOptions {
3147                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3148                    },
3149                },
3150            ],
3151        );
3152        expect_map_result(
3153            Operation::Trim { device_block_offset: 10, block_count: 200 },
3154            Some(BlockOffsetMapping {
3155                source_block_offset: 10,
3156                target_block_offset: 100,
3157                length: 200,
3158            }),
3159            NonZero::new(120),
3160            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3161        );
3162    }
3163
3164    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3165    #[fuchsia::test]
3166    async fn test_pre_barrier_flush_failure() {
3167        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3168
3169        struct NoBarrierInterface;
3170        impl super::async_interface::Interface for NoBarrierInterface {
3171            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3172                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3173                    device_flags: fblock::DeviceFlag::empty(), // No BARRIER_SUPPORT
3174                    max_transfer_blocks: NonZero::new(100),
3175                    block_range: Some(0..100),
3176                    type_guid: [0; 16],
3177                    instance_guid: [0; 16],
3178                    name: "test".to_string(),
3179                    flags: 0,
3180                }))
3181            }
3182            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3183                Ok(())
3184            }
3185            async fn read(
3186                &self,
3187                _: u64,
3188                _: u32,
3189                _: &Arc<zx::Vmo>,
3190                _: u64,
3191                _: ReadOptions,
3192                _: TraceFlowId,
3193            ) -> Result<(), zx::Status> {
3194                unreachable!()
3195            }
3196            async fn write(
3197                &self,
3198                _: u64,
3199                _: u32,
3200                _: &Arc<zx::Vmo>,
3201                _: u64,
3202                _: WriteOptions,
3203                _: TraceFlowId,
3204            ) -> Result<(), zx::Status> {
3205                panic!("Write should not be called");
3206            }
3207            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3208                Err(zx::Status::IO)
3209            }
3210            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3211                unreachable!()
3212            }
3213        }
3214
3215        futures::join!(
3216            async move {
3217                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3218                block_server.handle_requests(stream).await.unwrap();
3219            },
3220            async move {
3221                let (session_proxy, server) = fidl::endpoints::create_proxy();
3222                proxy.open_session(server).unwrap();
3223                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3224                let vmo_id = session_proxy
3225                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3226                    .await
3227                    .unwrap()
3228                    .unwrap();
3229
3230                let mut fifo =
3231                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3232                let (mut reader, mut writer) = fifo.async_io();
3233
3234                writer
3235                    .write_entries(&BlockFifoRequest {
3236                        command: BlockFifoCommand {
3237                            opcode: BlockOpcode::Write.into_primitive(),
3238                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3239                            ..Default::default()
3240                        },
3241                        vmoid: vmo_id.id,
3242                        length: 1,
3243                        ..Default::default()
3244                    })
3245                    .await
3246                    .unwrap();
3247
3248                let mut response = BlockFifoResponse::default();
3249                reader.read_entries(&mut response).await.unwrap();
3250                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3251            }
3252        );
3253    }
3254
3255    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3256    // post-flush is not executed.
3257    #[fuchsia::test]
3258    async fn test_post_barrier_write_failure() {
3259        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3260
3261        struct NoBarrierInterface;
3262        impl super::async_interface::Interface for NoBarrierInterface {
3263            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3264                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3265                    device_flags: fblock::DeviceFlag::empty(), // No FUA_SUPPORT
3266                    max_transfer_blocks: NonZero::new(100),
3267                    block_range: Some(0..100),
3268                    type_guid: [0; 16],
3269                    instance_guid: [0; 16],
3270                    name: "test".to_string(),
3271                    flags: 0,
3272                }))
3273            }
3274            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3275                Ok(())
3276            }
3277            async fn read(
3278                &self,
3279                _: u64,
3280                _: u32,
3281                _: &Arc<zx::Vmo>,
3282                _: u64,
3283                _: ReadOptions,
3284                _: TraceFlowId,
3285            ) -> Result<(), zx::Status> {
3286                unreachable!()
3287            }
3288            async fn write(
3289                &self,
3290                _: u64,
3291                _: u32,
3292                _: &Arc<zx::Vmo>,
3293                _: u64,
3294                _: WriteOptions,
3295                _: TraceFlowId,
3296            ) -> Result<(), zx::Status> {
3297                Err(zx::Status::IO)
3298            }
3299            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3300                panic!("Flush should not be called")
3301            }
3302            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3303                unreachable!()
3304            }
3305        }
3306
3307        futures::join!(
3308            async move {
3309                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3310                block_server.handle_requests(stream).await.unwrap();
3311            },
3312            async move {
3313                let (session_proxy, server) = fidl::endpoints::create_proxy();
3314                proxy.open_session(server).unwrap();
3315                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3316                let vmo_id = session_proxy
3317                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3318                    .await
3319                    .unwrap()
3320                    .unwrap();
3321
3322                let mut fifo =
3323                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3324                let (mut reader, mut writer) = fifo.async_io();
3325
3326                writer
3327                    .write_entries(&BlockFifoRequest {
3328                        command: BlockFifoCommand {
3329                            opcode: BlockOpcode::Write.into_primitive(),
3330                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3331                            ..Default::default()
3332                        },
3333                        vmoid: vmo_id.id,
3334                        length: 1,
3335                        ..Default::default()
3336                    })
3337                    .await
3338                    .unwrap();
3339
3340                let mut response = BlockFifoResponse::default();
3341                reader.read_entries(&mut response).await.unwrap();
3342                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3343            }
3344        );
3345    }
3346}