block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37    Block(BlockInfo),
38    Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42    pub fn block_count(&self) -> Option<u64> {
43        match self {
44            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
45            Self::Partition(PartitionInfo { block_range, .. }) => {
46                block_range.as_ref().map(|range| range.end - range.start)
47            }
48        }
49    }
50
51    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
52        match self {
53            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
54            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
55                max_transfer_blocks.clone()
56            }
57        }
58    }
59
60    fn max_transfer_size(&self, block_size: u32) -> u32 {
61        if let Some(max_blocks) = self.max_transfer_blocks() {
62            max_blocks.get() * block_size
63        } else {
64            MAX_TRANSFER_UNBOUNDED
65        }
66    }
67}
68
69/// Information associated with non-partition block devices.
70#[derive(Clone, Default)]
71pub struct BlockInfo {
72    pub device_flags: fblock::Flag,
73    pub block_count: u64,
74    pub max_transfer_blocks: Option<NonZero<u32>>,
75}
76
77/// Information associated with a block device that is also a partition.
78#[derive(Clone, Default)]
79pub struct PartitionInfo {
80    /// The device flags reported by the underlying device.
81    pub device_flags: fblock::Flag,
82    pub max_transfer_blocks: Option<NonZero<u32>>,
83    /// If `block_range` is None, the partition is a volume and may not be contiguous.
84    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
85    /// slices and use that (along with the slice and block sizes) to determine the block count.
86    pub block_range: Option<Range<u64>>,
87    pub type_guid: [u8; 16],
88    pub instance_guid: [u8; 16],
89    pub name: String,
90    pub flags: u64,
91}
92
93/// We internally keep track of active requests, so that when the server is torn down, we can
94/// deallocate all of the resources for pending requests.
95struct ActiveRequest<S> {
96    session: S,
97    group_or_request: GroupOrRequest,
98    trace_flow_id: TraceFlowId,
99    _epoch_guard: EpochGuard<'static>,
100    status: zx::Status,
101    count: u32,
102    req_id: Option<u32>,
103    decompression_info: Option<DecompressionInfo>,
104}
105
106struct DecompressionInfo {
107    // This is the range of compressed bytes in receiving buffer.
108    compressed_range: Range<usize>,
109
110    // This is the range in the target VMO where we will write uncompressed bytes.
111    uncompressed_range: Range<u64>,
112
113    bytes_so_far: u64,
114    mapping: Arc<VmoMapping>,
115    buffer: Option<Buffer<'static>>,
116}
117
118impl DecompressionInfo {
119    /// Returns the uncompressed slice.
120    fn uncompressed_slice(&self) -> *mut [u8] {
121        std::ptr::slice_from_raw_parts_mut(
122            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
123            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
124        )
125    }
126}
127
128pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
129
130impl<S> Default for ActiveRequests<S> {
131    fn default() -> Self {
132        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
133    }
134}
135
136impl<S> ActiveRequests<S> {
137    fn complete_and_take_response(
138        &self,
139        request_id: RequestId,
140        status: zx::Status,
141    ) -> Option<(S, BlockFifoResponse)> {
142        self.0.lock().complete_and_take_response(request_id, status)
143    }
144
145    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
146        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
147    }
148}
149
150struct ActiveRequestsInner<S> {
151    requests: Slab<ActiveRequest<S>>,
152}
153
154// Keeps track of all the requests that are currently being processed
155impl<S> ActiveRequestsInner<S> {
156    /// Completes a request.
157    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
158        let group = &mut self.requests[request_id.0];
159
160        group.count = group.count.checked_sub(1).unwrap();
161        if status != zx::Status::OK && group.status == zx::Status::OK {
162            group.status = status
163        }
164
165        fuchsia_trace::duration!(
166            c"storage",
167            c"block_server::finish_transaction",
168            "request_id" => request_id.0,
169            "group_completed" => group.count == 0,
170            "status" => status.into_raw());
171        if let Some(trace_flow_id) = group.trace_flow_id {
172            fuchsia_trace::flow_step!(
173                c"storage",
174                c"block_server::finish_request",
175                trace_flow_id.get().into()
176            );
177        }
178
179        if group.count == 0
180            && group.status == zx::Status::OK
181            && let Some(info) = &mut group.decompression_info
182        {
183            thread_local! {
184                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
185                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
186            }
187            DECOMPRESSOR.with(|decompressor| {
188                // SAFETY: We verified `uncompressed_range` fits within our mapping.
189                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
190                let mut decompressor = decompressor.borrow_mut();
191                if let Err(error) = decompressor.decompress_to_buffer(
192                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
193                    target_slice,
194                ) {
195                    log::warn!(error:?; "Decompression error");
196                    group.status = zx::Status::IO_DATA_INTEGRITY;
197                };
198            });
199        }
200    }
201
202    /// Takes the response if all requests are finished.
203    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
204        let group = &self.requests[request_id.0];
205        match group.req_id {
206            Some(reqid) if group.count == 0 => {
207                let group = self.requests.remove(request_id.0);
208                Some((
209                    group.session,
210                    BlockFifoResponse {
211                        status: group.status.into_raw(),
212                        reqid,
213                        group: group.group_or_request.group_id().unwrap_or(0),
214                        ..Default::default()
215                    },
216                ))
217            }
218            _ => None,
219        }
220    }
221
222    /// Competes the request and returns a response if the request group is finished.
223    fn complete_and_take_response(
224        &mut self,
225        request_id: RequestId,
226        status: zx::Status,
227    ) -> Option<(S, BlockFifoResponse)> {
228        self.complete(request_id, status);
229        self.take_response(request_id)
230    }
231}
232
233/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
234/// cbindgen:no-export
235pub struct BlockServer<SM> {
236    block_size: u32,
237    session_manager: Arc<SM>,
238}
239
240/// A single entry in `[OffsetMap]`.
241#[derive(Debug)]
242pub struct BlockOffsetMapping {
243    source_block_offset: u64,
244    target_block_offset: u64,
245    length: u64,
246}
247
248impl BlockOffsetMapping {
249    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
250        blocks.0 >= self.source_block_offset
251            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
252    }
253}
254
255impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
256    type Error = zx::Status;
257
258    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
259        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
260        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
261        Ok(Self {
262            source_block_offset: wire.source_block_offset,
263            target_block_offset: wire.target_block_offset,
264            length: wire.length,
265        })
266    }
267}
268
269/// Remaps the offset of block requests based on an internal map, and truncates long requests.
270pub struct OffsetMap {
271    mapping: Option<BlockOffsetMapping>,
272    max_transfer_blocks: Option<NonZero<u32>>,
273}
274
275impl OffsetMap {
276    /// An OffsetMap that remaps requests.
277    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
278        Self { mapping: Some(mapping), max_transfer_blocks }
279    }
280
281    /// An OffsetMap that just enforces maximum request sizes.
282    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
283        Self { mapping: None, max_transfer_blocks }
284    }
285
286    pub fn is_empty(&self) -> bool {
287        self.mapping.is_none()
288    }
289
290    fn mapping(&self) -> Option<&BlockOffsetMapping> {
291        self.mapping.as_ref()
292    }
293
294    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
295        self.max_transfer_blocks
296    }
297}
298
299// Methods take Arc<Self> rather than &self because of
300// https://github.com/rust-lang/rust/issues/42940.
301pub trait SessionManager: 'static {
302    const SUPPORTS_DECOMPRESSION: bool;
303
304    type Session;
305
306    fn on_attach_vmo(
307        self: Arc<Self>,
308        vmo: &Arc<zx::Vmo>,
309    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
310
311    /// Creates a new session to handle `stream`.
312    /// The returned future should run until the session completes, for example when the client end
313    /// closes.
314    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
315    fn open_session(
316        self: Arc<Self>,
317        stream: fblock::SessionRequestStream,
318        offset_map: OffsetMap,
319        block_size: u32,
320    ) -> impl Future<Output = Result<(), Error>> + Send;
321
322    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
323    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
324
325    /// Called to handle the GetVolumeInfo FIDL call.
326    fn get_volume_info(
327        &self,
328    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
329    {
330        async { Err(zx::Status::NOT_SUPPORTED) }
331    }
332
333    /// Called to handle the QuerySlices FIDL call.
334    fn query_slices(
335        &self,
336        _start_slices: &[u64],
337    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
338        async { Err(zx::Status::NOT_SUPPORTED) }
339    }
340
341    /// Called to handle the Shrink FIDL call.
342    fn extend(
343        &self,
344        _start_slice: u64,
345        _slice_count: u64,
346    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
347        async { Err(zx::Status::NOT_SUPPORTED) }
348    }
349
350    /// Called to handle the Shrink FIDL call.
351    fn shrink(
352        &self,
353        _start_slice: u64,
354        _slice_count: u64,
355    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
356        async { Err(zx::Status::NOT_SUPPORTED) }
357    }
358
359    /// Returns the active requests.
360    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
361}
362
363pub trait IntoSessionManager {
364    type SM: SessionManager;
365
366    fn into_session_manager(self) -> Arc<Self::SM>;
367}
368
369impl<SM: SessionManager> BlockServer<SM> {
370    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
371        Self { block_size, session_manager: session_manager.into_session_manager() }
372    }
373
374    pub fn session_manager(&self) -> &SM {
375        self.session_manager.as_ref()
376    }
377
378    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
379    pub async fn handle_requests(
380        &self,
381        mut requests: fvolume::VolumeRequestStream,
382    ) -> Result<(), Error> {
383        let scope = fasync::Scope::new();
384        loop {
385            match requests.try_next().await {
386                Ok(Some(request)) => {
387                    if let Some(session) = self.handle_request(request).await? {
388                        scope.spawn(session.map(|_| ()));
389                    }
390                }
391                Ok(None) => break,
392                Err(err) => log::warn!(err:?; "Invalid request"),
393            }
394        }
395        scope.await;
396        Ok(())
397    }
398
399    /// Processes a partition request.  If a new session task is created in response to the request,
400    /// it is returned.
401    async fn handle_request(
402        &self,
403        request: fvolume::VolumeRequest,
404    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
405        match request {
406            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
407                Ok(info) => {
408                    let max_transfer_size = info.max_transfer_size(self.block_size);
409                    let (block_count, mut flags) = match info.as_ref() {
410                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
411                            (*block_count, *device_flags)
412                        }
413                        DeviceInfo::Partition(partition_info) => {
414                            let block_count = if let Some(range) =
415                                partition_info.block_range.as_ref()
416                            {
417                                range.end - range.start
418                            } else {
419                                let volume_info = self.session_manager.get_volume_info().await?;
420                                volume_info.0.slice_size * volume_info.1.partition_slice_count
421                                    / self.block_size as u64
422                            };
423                            (block_count, partition_info.device_flags)
424                        }
425                    };
426                    if SM::SUPPORTS_DECOMPRESSION {
427                        flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
428                    }
429                    responder.send(Ok(&fblock::BlockInfo {
430                        block_count,
431                        block_size: self.block_size,
432                        max_transfer_size,
433                        flags,
434                    }))?;
435                }
436                Err(status) => responder.send(Err(status.into_raw()))?,
437            },
438            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
439                match self.device_info().await {
440                    Ok(info) => {
441                        return Ok(Some(self.session_manager.clone().open_session(
442                            session.into_stream(),
443                            OffsetMap::empty(info.max_transfer_blocks()),
444                            self.block_size,
445                        )));
446                    }
447                    Err(status) => session.close_with_epitaph(status)?,
448                }
449            }
450            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
451                session,
452                mapping,
453                control_handle: _,
454            } => match self.device_info().await {
455                Ok(info) => {
456                    let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
457                        Ok(m) => m,
458                        Err(status) => {
459                            session.close_with_epitaph(status)?;
460                            return Ok(None);
461                        }
462                    };
463                    if let Some(max) = info.block_count() {
464                        if initial_mapping.target_block_offset + initial_mapping.length > max {
465                            log::warn!(
466                                "Invalid mapping for session: {initial_mapping:?} (max {max})"
467                            );
468                            session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
469                            return Ok(None);
470                        }
471                    }
472                    return Ok(Some(self.session_manager.clone().open_session(
473                        session.into_stream(),
474                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
475                        self.block_size,
476                    )));
477                }
478                Err(status) => session.close_with_epitaph(status)?,
479            },
480            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
481                Ok(info) => {
482                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
483                        let mut guid =
484                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
485                        guid.value.copy_from_slice(&partition_info.type_guid);
486                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
487                    } else {
488                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
489                    }
490                }
491                Err(status) => {
492                    responder.send(status.into_raw(), None)?;
493                }
494            },
495            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
496                match self.device_info().await {
497                    Ok(info) => {
498                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
499                            let mut guid =
500                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
501                            guid.value.copy_from_slice(&partition_info.instance_guid);
502                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
503                        } else {
504                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
505                        }
506                    }
507                    Err(status) => {
508                        responder.send(status.into_raw(), None)?;
509                    }
510                }
511            }
512            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
513                Ok(info) => {
514                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
515                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
516                    } else {
517                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
518                    }
519                }
520                Err(status) => {
521                    responder.send(status.into_raw(), None)?;
522                }
523            },
524            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
525                Ok(info) => {
526                    if let DeviceInfo::Partition(info) = info.as_ref() {
527                        let mut type_guid =
528                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
529                        type_guid.value.copy_from_slice(&info.type_guid);
530                        let mut instance_guid =
531                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
532                        instance_guid.value.copy_from_slice(&info.instance_guid);
533                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
534                            name: Some(info.name.clone()),
535                            type_guid: Some(type_guid),
536                            instance_guid: Some(instance_guid),
537                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
538                            num_blocks: info
539                                .block_range
540                                .as_ref()
541                                .map(|range| range.end - range.start),
542                            flags: Some(info.flags),
543                            ..Default::default()
544                        }))?;
545                    } else {
546                        responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
547                    }
548                }
549                Err(status) => responder.send(Err(status.into_raw()))?,
550            },
551            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
552                match self.session_manager.query_slices(&start_slices).await {
553                    Ok(mut results) => {
554                        let results_len = results.len();
555                        assert!(results_len <= 16);
556                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
557                        responder.send(
558                            zx::sys::ZX_OK,
559                            &results.try_into().unwrap(),
560                            results_len as u64,
561                        )?;
562                    }
563                    Err(s) => {
564                        responder.send(
565                            s.into_raw(),
566                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
567                            0,
568                        )?;
569                    }
570                }
571            }
572            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
573                match self.session_manager.get_volume_info().await {
574                    Ok((manager_info, volume_info)) => {
575                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
576                    }
577                    Err(s) => responder.send(s.into_raw(), None, None)?,
578                }
579            }
580            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
581                responder.send(
582                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
583                        .into_raw(),
584                )?;
585            }
586            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
587                responder.send(
588                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
589                        .into_raw(),
590                )?;
591            }
592            fvolume::VolumeRequest::Destroy { responder, .. } => {
593                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
594            }
595        }
596        Ok(None)
597    }
598
599    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
600        self.session_manager.get_info().await
601    }
602}
603
604struct SessionHelper<SM: SessionManager> {
605    session_manager: Arc<SM>,
606    offset_map: OffsetMap,
607    block_size: u32,
608    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
609    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
610}
611
612struct VmoMapping {
613    base: usize,
614    size: usize,
615}
616
617impl VmoMapping {
618    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
619        let size = vmo.get_size().unwrap() as usize;
620        Ok(Arc::new(Self {
621            base: fuchsia_runtime::vmar_root_self()
622                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
623                .inspect_err(|error| {
624                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
625                })?,
626            size,
627        }))
628    }
629}
630
631impl Drop for VmoMapping {
632    fn drop(&mut self) {
633        // SAFETY: We mapped this in `VmoMapping::new`.
634        unsafe {
635            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
636        }
637    }
638}
639
640impl<SM: SessionManager> SessionHelper<SM> {
641    fn new(
642        session_manager: Arc<SM>,
643        offset_map: OffsetMap,
644        block_size: u32,
645    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
646        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
647        Ok((
648            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
649            fifo,
650        ))
651    }
652
653    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
654        match request {
655            fblock::SessionRequest::GetFifo { responder } => {
656                let rights = zx::Rights::TRANSFER
657                    | zx::Rights::READ
658                    | zx::Rights::WRITE
659                    | zx::Rights::SIGNAL
660                    | zx::Rights::WAIT;
661                match self.peer_fifo.duplicate_handle(rights) {
662                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
663                    Err(s) => responder.send(Err(s.into_raw()))?,
664                }
665                Ok(())
666            }
667            fblock::SessionRequest::AttachVmo { vmo, responder } => {
668                let vmo = Arc::new(vmo);
669                let vmo_id = {
670                    let mut vmos = self.vmos.lock();
671                    if vmos.len() == u16::MAX as usize {
672                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
673                        return Ok(());
674                    } else {
675                        let vmo_id = match vmos.last_entry() {
676                            None => 1,
677                            Some(o) => {
678                                o.key().checked_add(1).unwrap_or_else(|| {
679                                    let mut vmo_id = 1;
680                                    // Find the first gap...
681                                    for (&id, _) in &*vmos {
682                                        if id > vmo_id {
683                                            break;
684                                        }
685                                        vmo_id = id + 1;
686                                    }
687                                    vmo_id
688                                })
689                            }
690                        };
691                        vmos.insert(vmo_id, (vmo.clone(), None));
692                        vmo_id
693                    }
694                };
695                self.session_manager.clone().on_attach_vmo(&vmo).await?;
696                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
697                Ok(())
698            }
699            fblock::SessionRequest::Close { responder } => {
700                responder.send(Ok(()))?;
701                Err(anyhow!("Closed"))
702            }
703        }
704    }
705
706    /// Decodes `request`.
707    fn decode_fifo_request(
708        &self,
709        session: SM::Session,
710        request: &BlockFifoRequest,
711    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
712        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
713
714        let request_bytes = request.length as u64 * self.block_size as u64;
715
716        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
717            .ok_or(zx::Status::INVALID_ARGS)
718            .and_then(|code| {
719                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
720                    if code != BlockOpcode::Read {
721                        return Err(zx::Status::INVALID_ARGS);
722                    }
723                    if !SM::SUPPORTS_DECOMPRESSION {
724                        return Err(zx::Status::NOT_SUPPORTED);
725                    }
726                }
727                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
728                    if request.length == 0 {
729                        return Err(zx::Status::INVALID_ARGS);
730                    }
731                    // Make sure the end offsets won't wrap.
732                    if request.dev_offset.checked_add(request.length as u64).is_none()
733                        || (code != BlockOpcode::Trim
734                            && request_bytes.checked_add(request.vmo_offset).is_none())
735                    {
736                        return Err(zx::Status::OUT_OF_RANGE);
737                    }
738                }
739                Ok(match code {
740                    BlockOpcode::Read => Operation::Read {
741                        device_block_offset: request.dev_offset,
742                        block_count: request.length,
743                        _unused: 0,
744                        vmo_offset: request
745                            .vmo_offset
746                            .checked_mul(self.block_size as u64)
747                            .ok_or(zx::Status::OUT_OF_RANGE)?,
748                        options: ReadOptions {
749                            inline_crypto_options: InlineCryptoOptions {
750                                dun: request.dun,
751                                slot: request.slot,
752                            },
753                        },
754                    },
755                    BlockOpcode::Write => {
756                        let mut options = WriteOptions {
757                            inline_crypto_options: InlineCryptoOptions {
758                                dun: request.dun,
759                                slot: request.slot,
760                            },
761                            ..WriteOptions::default()
762                        };
763                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
764                            options.flags |= WriteFlags::FORCE_ACCESS;
765                        }
766                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
767                            options.flags |= WriteFlags::PRE_BARRIER;
768                        }
769                        Operation::Write {
770                            device_block_offset: request.dev_offset,
771                            block_count: request.length,
772                            _unused: 0,
773                            options,
774                            vmo_offset: request
775                                .vmo_offset
776                                .checked_mul(self.block_size as u64)
777                                .ok_or(zx::Status::OUT_OF_RANGE)?,
778                        }
779                    }
780                    BlockOpcode::Flush => Operation::Flush,
781                    BlockOpcode::Trim => Operation::Trim {
782                        device_block_offset: request.dev_offset,
783                        block_count: request.length,
784                    },
785                    BlockOpcode::CloseVmo => Operation::CloseVmo,
786                })
787            });
788
789        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
790            GroupOrRequest::Group(request.group)
791        } else {
792            GroupOrRequest::Request(request.reqid)
793        };
794
795        let mut active_requests = self.session_manager.active_requests().0.lock();
796        let mut request_id = None;
797
798        // Multiple Block I/O request may be sent as a group.
799        // Notes:
800        // - the group is identified by the group id in the request
801        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
802        //   flag is set.
803        // - when processing a request of a group fails, subsequent requests of that
804        //   group will not be processed.
805        // - decompression is a special case, see block-fifo.h for semantics.
806        //
807        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
808        if group_or_request.is_group() {
809            // Search for an existing entry that matches this group.  NOTE: This is a potentially
810            // expensive way to find a group (it's iterating over all slots in the active-requests
811            // slab).  This can be optimised easily should we need to.
812            for (key, group) in &mut active_requests.requests {
813                if group.group_or_request == group_or_request {
814                    if group.req_id.is_some() {
815                        // We have already received a request tagged as last.
816                        if group.status == zx::Status::OK {
817                            group.status = zx::Status::INVALID_ARGS;
818                        }
819                        // Ignore this request.
820                        return Err(None);
821                    }
822                    // See if this is a continuation of a decompressed read.
823                    if group.status == zx::Status::OK
824                        && let Some(info) = &mut group.decompression_info
825                    {
826                        if let Ok(Operation::Read {
827                            device_block_offset,
828                            mut block_count,
829                            options,
830                            vmo_offset: 0,
831                            ..
832                        }) = operation
833                        {
834                            let remaining_bytes = info
835                                .compressed_range
836                                .end
837                                .next_multiple_of(self.block_size as usize)
838                                as u64
839                                - info.bytes_so_far;
840                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
841                                || request.total_compressed_bytes != 0
842                                || request.uncompressed_bytes != 0
843                                || request.compressed_prefix_bytes != 0
844                                || (flags.contains(BlockIoFlag::GROUP_LAST)
845                                    && info.bytes_so_far + request_bytes
846                                        < info.compressed_range.end as u64)
847                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
848                                    && request_bytes >= remaining_bytes)
849                            {
850                                group.status = zx::Status::INVALID_ARGS;
851                            } else {
852                                // We are tolerant of `block_count` being more than we actually
853                                // need.  This can happen if the client is working with a larger
854                                // block size than the device block size.  For example, if Blobfs
855                                // has a 8192 byte block size, but the device might has a 512 byte
856                                // block size, it can ask for a multiple of 16 blocks, when fewer
857                                // than that might actually be required to hold the compressed data.
858                                // It is easier for us to tolerate this here than to get Blobfs to
859                                // change to pass only the blocks that are required.
860                                if request_bytes > remaining_bytes {
861                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
862                                }
863
864                                operation = Ok(Operation::ContinueDecompressedRead {
865                                    offset: info.bytes_so_far,
866                                    device_block_offset,
867                                    block_count,
868                                    options,
869                                });
870
871                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
872                            }
873                        } else {
874                            group.status = zx::Status::INVALID_ARGS;
875                        }
876                    }
877                    if flags.contains(BlockIoFlag::GROUP_LAST) {
878                        group.req_id = Some(request.reqid);
879                        // If the group has had an error, there is no point trying to issue this
880                        // request.
881                        if group.status != zx::Status::OK {
882                            operation = Err(group.status);
883                        }
884                    } else if group.status != zx::Status::OK {
885                        // The group has already encountered an error, so there is no point trying
886                        // to issue this request.
887                        return Err(None);
888                    }
889                    request_id = Some(RequestId(key));
890                    group.count += 1;
891                    break;
892                }
893            }
894        }
895
896        let is_single_request =
897            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
898
899        let mut decompression_info = None;
900        let vmo = match operation {
901            Ok(Operation::Read {
902                device_block_offset,
903                mut block_count,
904                options,
905                vmo_offset,
906                ..
907            }) => match self.vmos.lock().get_mut(&request.vmoid) {
908                Some((vmo, mapping)) => {
909                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
910                        let compressed_range = request.compressed_prefix_bytes as usize
911                            ..request.compressed_prefix_bytes as usize
912                                + request.total_compressed_bytes as usize;
913                        let required_buffer_size =
914                            compressed_range.end.next_multiple_of(self.block_size as usize);
915
916                        // Validate the initial decompression request.
917                        if compressed_range.start >= compressed_range.end
918                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
919                            || (is_single_request && request_bytes < compressed_range.end as u64)
920                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
921                        {
922                            Err(zx::Status::INVALID_ARGS)
923                        } else {
924                            // We are tolerant of `block_count` being more than we actually need.
925                            // This can happen if the client is working in a larger block size than
926                            // the device block size.  For example, Blobfs has a 8192 byte block
927                            // size, but the device might have a 512 byte block size.  It is easier
928                            // for us to tolerate this here than to get Blobfs to change to pass
929                            // only the blocks that are required.
930                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
931                                block_count =
932                                    (required_buffer_size / self.block_size as usize) as u32;
933                                required_buffer_size as u64
934                            } else {
935                                request_bytes
936                            };
937
938                            // To decompress, we need to have the target VMO mapped (cached).
939                            match mapping {
940                                Some(mapping) => Ok(mapping.clone()),
941                                None => {
942                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
943                                }
944                            }
945                            .and_then(|mapping| {
946                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
947                                // range.
948                                if vmo_offset
949                                    .checked_add(request.uncompressed_bytes as u64)
950                                    .is_some_and(|end| end <= mapping.size as u64)
951                                {
952                                    Ok(mapping)
953                                } else {
954                                    Err(zx::Status::OUT_OF_RANGE)
955                                }
956                            })
957                            .map(|mapping| {
958                                // Convert the operation into a `StartDecompressedRead`
959                                // operation. For non-fragmented requests, this will be the only
960                                // operation, but if it's a fragmented read,
961                                // `ContinueDecompressedRead` operations will follow.
962                                operation = Ok(Operation::StartDecompressedRead {
963                                    required_buffer_size,
964                                    device_block_offset,
965                                    block_count,
966                                    options,
967                                });
968                                // Record sufficient information so that we can decompress when all
969                                // the requests complete.
970                                decompression_info = Some(DecompressionInfo {
971                                    compressed_range,
972                                    bytes_so_far,
973                                    mapping,
974                                    uncompressed_range: vmo_offset
975                                        ..vmo_offset + request.uncompressed_bytes as u64,
976                                    buffer: None,
977                                });
978                                None
979                            })
980                        }
981                    } else {
982                        Ok(Some(vmo.clone()))
983                    }
984                }
985                None => Err(zx::Status::IO),
986            },
987            Ok(Operation::Write { .. }) => self
988                .vmos
989                .lock()
990                .get(&request.vmoid)
991                .cloned()
992                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
993            Ok(Operation::CloseVmo) => {
994                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
995                    let vmo_clone = vmo.clone();
996                    // Make sure the VMO is dropped after all current Epoch guards have been
997                    // dropped.
998                    Epoch::global().defer(move || drop(vmo_clone));
999                    Ok(Some(vmo))
1000                })
1001            }
1002            _ => Ok(None),
1003        }
1004        .unwrap_or_else(|e| {
1005            operation = Err(e);
1006            None
1007        });
1008
1009        let trace_flow_id = NonZero::new(request.trace_flow_id);
1010        let request_id = request_id.unwrap_or_else(|| {
1011            RequestId(active_requests.requests.insert(ActiveRequest {
1012                session,
1013                group_or_request,
1014                trace_flow_id,
1015                _epoch_guard: Epoch::global().guard(),
1016                status: zx::Status::OK,
1017                count: 1,
1018                req_id: is_single_request.then_some(request.reqid),
1019                decompression_info,
1020            }))
1021        });
1022
1023        Ok(DecodedRequest {
1024            request_id,
1025            trace_flow_id,
1026            operation: operation.map_err(|status| {
1027                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1028            })?,
1029            vmo,
1030        })
1031    }
1032
1033    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1034        std::mem::take(&mut *self.vmos.lock())
1035    }
1036
1037    /// Maps the request and returns the mapped request with an optional remainder.
1038    fn map_request(
1039        &self,
1040        mut request: DecodedRequest,
1041        active_request: &mut ActiveRequest<SM::Session>,
1042    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1043        if active_request.status != zx::Status::OK {
1044            return Err(zx::Status::BAD_STATE);
1045        }
1046        let mapping = self.offset_map.mapping();
1047        match (mapping, request.operation.blocks()) {
1048            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1049                return Err(zx::Status::OUT_OF_RANGE);
1050            }
1051            _ => {}
1052        }
1053        let remainder = request.operation.map(
1054            self.offset_map.mapping(),
1055            self.offset_map.max_transfer_blocks(),
1056            self.block_size,
1057        );
1058        if remainder.is_some() {
1059            active_request.count += 1;
1060        }
1061        static CACHE: AtomicU64 = AtomicU64::new(0);
1062        if let Some(context) =
1063            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
1064        {
1065            use fuchsia_trace::ArgValue;
1066            let trace_args = [
1067                ArgValue::of("request_id", request.request_id.0),
1068                ArgValue::of("opcode", request.operation.trace_label()),
1069            ];
1070            let _scope = fuchsia_trace::duration(
1071                c"storage",
1072                c"block_server::start_transaction",
1073                &trace_args,
1074            );
1075            if let Some(trace_flow_id) = active_request.trace_flow_id {
1076                fuchsia_trace::flow_step(
1077                    &context,
1078                    c"block_server::start_transaction",
1079                    trace_flow_id.get().into(),
1080                    &[],
1081                );
1082            }
1083        }
1084        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1085        Ok((request, remainder))
1086    }
1087
1088    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1089        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1090    }
1091}
1092
1093#[repr(transparent)]
1094#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1095pub struct RequestId(usize);
1096
1097#[derive(Clone, Debug)]
1098struct DecodedRequest {
1099    request_id: RequestId,
1100    trace_flow_id: TraceFlowId,
1101    operation: Operation,
1102    vmo: Option<Arc<zx::Vmo>>,
1103}
1104
1105/// cbindgen:no-export
1106pub type WriteFlags = block_protocol::WriteFlags;
1107pub type WriteOptions = block_protocol::WriteOptions;
1108pub type ReadOptions = block_protocol::ReadOptions;
1109
1110#[repr(C)]
1111#[derive(Clone, Debug, PartialEq, Eq)]
1112pub enum Operation {
1113    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1114    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1115    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1116    // write variant).
1117    Read {
1118        device_block_offset: u64,
1119        block_count: u32,
1120        _unused: u32,
1121        vmo_offset: u64,
1122        options: ReadOptions,
1123    },
1124    Write {
1125        device_block_offset: u64,
1126        block_count: u32,
1127        _unused: u32,
1128        vmo_offset: u64,
1129        options: WriteOptions,
1130    },
1131    Flush,
1132    Trim {
1133        device_block_offset: u64,
1134        block_count: u32,
1135    },
1136    /// This will never be seen by the C interface.
1137    CloseVmo,
1138    StartDecompressedRead {
1139        required_buffer_size: usize,
1140        device_block_offset: u64,
1141        block_count: u32,
1142        options: ReadOptions,
1143    },
1144    ContinueDecompressedRead {
1145        offset: u64,
1146        device_block_offset: u64,
1147        block_count: u32,
1148        options: ReadOptions,
1149    },
1150}
1151
1152impl Operation {
1153    fn trace_label(&self) -> &'static str {
1154        match self {
1155            Operation::Read { .. } => "read",
1156            Operation::Write { .. } => "write",
1157            Operation::Flush { .. } => "flush",
1158            Operation::Trim { .. } => "trim",
1159            Operation::CloseVmo { .. } => "close_vmo",
1160            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1161            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1162        }
1163    }
1164
1165    /// Returns (offset, length).
1166    fn blocks(&self) -> Option<(u64, u32)> {
1167        match self {
1168            Operation::Read { device_block_offset, block_count, .. }
1169            | Operation::Write { device_block_offset, block_count, .. }
1170            | Operation::Trim { device_block_offset, block_count, .. } => {
1171                Some((*device_block_offset, *block_count))
1172            }
1173            _ => None,
1174        }
1175    }
1176
1177    /// Returns mutable references to (offset, length).
1178    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1179        match self {
1180            Operation::Read { device_block_offset, block_count, .. }
1181            | Operation::Write { device_block_offset, block_count, .. }
1182            | Operation::Trim { device_block_offset, block_count, .. } => {
1183                Some((device_block_offset, block_count))
1184            }
1185            _ => None,
1186        }
1187    }
1188
1189    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1190    /// start of the operation.
1191    fn map(
1192        &mut self,
1193        mapping: Option<&BlockOffsetMapping>,
1194        max_blocks: Option<NonZero<u32>>,
1195        block_size: u32,
1196    ) -> Option<Self> {
1197        let mut max = match self {
1198            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1199            _ => None,
1200        };
1201        let (offset, length) = self.blocks_mut()?;
1202        let orig_offset = *offset;
1203        if let Some(mapping) = mapping {
1204            let delta = *offset - mapping.source_block_offset;
1205            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1206            *offset = mapping.target_block_offset + delta;
1207            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1208            max = match max {
1209                None => Some(mapping_max),
1210                Some(m) => Some(std::cmp::min(m, mapping_max)),
1211            };
1212        };
1213        if let Some(max) = max {
1214            if *length as u64 > max {
1215                let rem = (*length as u64 - max) as u32;
1216                *length = max as u32;
1217                return Some(match self {
1218                    Operation::Read {
1219                        device_block_offset: _,
1220                        block_count: _,
1221                        vmo_offset,
1222                        _unused,
1223                        options,
1224                    } => Operation::Read {
1225                        device_block_offset: orig_offset + max,
1226                        block_count: rem,
1227                        vmo_offset: *vmo_offset + max * block_size as u64,
1228                        _unused: *_unused,
1229                        options: *options,
1230                    },
1231                    Operation::Write {
1232                        device_block_offset: _,
1233                        block_count: _,
1234                        _unused,
1235                        vmo_offset,
1236                        options,
1237                    } => {
1238                        // Only send the barrier flag once per write request.
1239                        let new_options = WriteOptions {
1240                            flags: options.flags & !WriteFlags::PRE_BARRIER,
1241                            ..*options
1242                        };
1243
1244                        Operation::Write {
1245                            device_block_offset: orig_offset + max,
1246                            block_count: rem,
1247                            _unused: *_unused,
1248                            vmo_offset: *vmo_offset + max * block_size as u64,
1249                            options: new_options,
1250                        }
1251                    }
1252                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1253                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1254                    }
1255                    _ => unreachable!(),
1256                });
1257            }
1258        }
1259        None
1260    }
1261}
1262
1263#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1264pub enum GroupOrRequest {
1265    Group(u16),
1266    Request(u32),
1267}
1268
1269impl GroupOrRequest {
1270    fn is_group(&self) -> bool {
1271        matches!(self, Self::Group(_))
1272    }
1273
1274    fn group_id(&self) -> Option<u16> {
1275        match self {
1276            Self::Group(id) => Some(*id),
1277            Self::Request(_) => None,
1278        }
1279    }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use super::{
1285        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1286        TraceFlowId,
1287    };
1288    use assert_matches::assert_matches;
1289    use block_protocol::{
1290        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1291        WriteOptions,
1292    };
1293    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1294    use fuchsia_sync::Mutex;
1295    use futures::FutureExt as _;
1296    use futures::channel::oneshot;
1297    use futures::future::BoxFuture;
1298    use std::borrow::Cow;
1299    use std::future::poll_fn;
1300    use std::num::NonZero;
1301    use std::pin::pin;
1302    use std::sync::Arc;
1303    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1304    use std::task::{Context, Poll};
1305    use zx::{AsHandleRef as _, HandleBased as _};
1306    use {
1307        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1308        fuchsia_async as fasync,
1309    };
1310
1311    #[derive(Default)]
1312    struct MockInterface {
1313        read_hook: Option<
1314            Box<
1315                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1316                    + Send
1317                    + Sync,
1318            >,
1319        >,
1320        write_hook:
1321            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1322        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1323    }
1324
1325    impl super::async_interface::Interface for MockInterface {
1326        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1327            Ok(())
1328        }
1329
1330        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1331            Ok(Cow::Owned(test_device_info()))
1332        }
1333
1334        async fn read(
1335            &self,
1336            device_block_offset: u64,
1337            block_count: u32,
1338            vmo: &Arc<zx::Vmo>,
1339            vmo_offset: u64,
1340            _opts: ReadOptions,
1341            _trace_flow_id: TraceFlowId,
1342        ) -> Result<(), zx::Status> {
1343            if let Some(read_hook) = &self.read_hook {
1344                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1345            } else {
1346                unimplemented!();
1347            }
1348        }
1349
1350        async fn write(
1351            &self,
1352            device_block_offset: u64,
1353            _block_count: u32,
1354            _vmo: &Arc<zx::Vmo>,
1355            _vmo_offset: u64,
1356            _opts: WriteOptions,
1357            _trace_flow_id: TraceFlowId,
1358        ) -> Result<(), zx::Status> {
1359            if let Some(write_hook) = &self.write_hook {
1360                write_hook(device_block_offset).await
1361            } else {
1362                unimplemented!();
1363            }
1364        }
1365
1366        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1367            unreachable!();
1368        }
1369
1370        fn barrier(&self) -> Result<(), zx::Status> {
1371            if let Some(barrier_hook) = &self.barrier_hook {
1372                barrier_hook()
1373            } else {
1374                unimplemented!();
1375            }
1376        }
1377
1378        async fn trim(
1379            &self,
1380            _device_block_offset: u64,
1381            _block_count: u32,
1382            _trace_flow_id: TraceFlowId,
1383        ) -> Result<(), zx::Status> {
1384            unreachable!();
1385        }
1386
1387        async fn get_volume_info(
1388            &self,
1389        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1390            // Hang forever for the test_requests_dont_block_sessions test.
1391            let () = std::future::pending().await;
1392            unreachable!();
1393        }
1394    }
1395
1396    const BLOCK_SIZE: u32 = 512;
1397    const MAX_TRANSFER_BLOCKS: u32 = 10;
1398
1399    fn test_device_info() -> DeviceInfo {
1400        DeviceInfo::Partition(PartitionInfo {
1401            device_flags: fblock::Flag::READONLY,
1402            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1403            block_range: Some(0..100),
1404            type_guid: [1; 16],
1405            instance_guid: [2; 16],
1406            name: "foo".to_string(),
1407            flags: 0xabcd,
1408        })
1409    }
1410
1411    #[fuchsia::test]
1412    async fn test_split_request_only_issues_single_barrier() {
1413        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1414        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1415        let barrier_called = Arc::new(AtomicU32::new(0));
1416        let barrier_called_clone = barrier_called.clone();
1417
1418        futures::join!(
1419            async move {
1420                let block_server = BlockServer::new(
1421                    BLOCK_SIZE,
1422                    Arc::new(MockInterface {
1423                        barrier_hook: Some(Box::new(move || {
1424                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1425                            Ok(())
1426                        })),
1427                        write_hook: Some(Box::new(move |_device_block_offset| {
1428                            Box::pin(async move { Ok(()) })
1429                        })),
1430                        ..MockInterface::default()
1431                    }),
1432                );
1433                block_server.handle_requests(stream).await.unwrap();
1434            },
1435            async move {
1436                let (session_proxy, server) = fidl::endpoints::create_proxy();
1437
1438                proxy.open_session(server).unwrap();
1439
1440                let vmo_id = session_proxy
1441                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1442                    .await
1443                    .unwrap()
1444                    .unwrap();
1445                assert_ne!(vmo_id.id, 0);
1446
1447                let mut fifo =
1448                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1449                let (mut reader, mut writer) = fifo.async_io();
1450                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1451                // sub requests.
1452                writer
1453                    .write_entries(&BlockFifoRequest {
1454                        command: BlockFifoCommand {
1455                            opcode: BlockOpcode::Write.into_primitive(),
1456                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1457                            ..Default::default()
1458                        },
1459                        vmoid: vmo_id.id,
1460                        dev_offset: 0,
1461                        length: MAX_TRANSFER_BLOCKS + 1,
1462                        vmo_offset: 6,
1463                        ..Default::default()
1464                    })
1465                    .await
1466                    .unwrap();
1467
1468                let mut response = BlockFifoResponse::default();
1469                reader.read_entries(&mut response).await.unwrap();
1470                assert_eq!(response.status, zx::sys::ZX_OK);
1471
1472                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1473
1474                std::mem::drop(proxy);
1475            }
1476        );
1477    }
1478
1479    #[fuchsia::test]
1480    async fn test_barriers_not_supported() {
1481        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1482        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1483
1484        futures::join!(
1485            async move {
1486                let block_server = BlockServer::new(
1487                    BLOCK_SIZE,
1488                    Arc::new(MockInterface {
1489                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1490                        write_hook: Some(Box::new(move |_device_block_offset| {
1491                            Box::pin(async move { Ok(()) })
1492                        })),
1493                        ..MockInterface::default()
1494                    }),
1495                );
1496                block_server.handle_requests(stream).await.unwrap();
1497            },
1498            async move {
1499                let (session_proxy, server) = fidl::endpoints::create_proxy();
1500
1501                proxy.open_session(server).unwrap();
1502
1503                let vmo_id = session_proxy
1504                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1505                    .await
1506                    .unwrap()
1507                    .unwrap();
1508                assert_ne!(vmo_id.id, 0);
1509
1510                let mut fifo =
1511                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1512                let (mut reader, mut writer) = fifo.async_io();
1513
1514                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1515                // sub requests.
1516                writer
1517                    .write_entries(&BlockFifoRequest {
1518                        command: BlockFifoCommand {
1519                            opcode: BlockOpcode::Write.into_primitive(),
1520                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1521                            ..Default::default()
1522                        },
1523                        vmoid: vmo_id.id,
1524                        dev_offset: 0,
1525                        length: MAX_TRANSFER_BLOCKS + 1,
1526                        vmo_offset: 6,
1527                        ..Default::default()
1528                    })
1529                    .await
1530                    .unwrap();
1531
1532                let mut response = BlockFifoResponse::default();
1533                reader.read_entries(&mut response).await.unwrap();
1534                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1535
1536                std::mem::drop(proxy);
1537            }
1538        );
1539    }
1540
1541    #[fuchsia::test]
1542    async fn test_barriers_ordering() {
1543        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1544        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1545        let barrier_called = Arc::new(AtomicBool::new(false));
1546
1547        futures::join!(
1548            async move {
1549                let barrier_called_clone = barrier_called.clone();
1550                let block_server = BlockServer::new(
1551                    BLOCK_SIZE,
1552                    Arc::new(MockInterface {
1553                        barrier_hook: Some(Box::new(move || {
1554                            barrier_called.store(true, Ordering::Relaxed);
1555                            Ok(())
1556                        })),
1557                        write_hook: Some(Box::new(move |device_block_offset| {
1558                            let barrier_called = barrier_called_clone.clone();
1559                            Box::pin(async move {
1560                                // The sleep allows the server to reorder the fifo requests.
1561                                if device_block_offset % 2 == 0 {
1562                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1563                                        zx::MonotonicDuration::from_millis(200),
1564                                    ))
1565                                    .await;
1566                                }
1567                                assert!(barrier_called.load(Ordering::Relaxed));
1568                                Ok(())
1569                            })
1570                        })),
1571                        ..MockInterface::default()
1572                    }),
1573                );
1574                block_server.handle_requests(stream).await.unwrap();
1575            },
1576            async move {
1577                let (session_proxy, server) = fidl::endpoints::create_proxy();
1578
1579                proxy.open_session(server).unwrap();
1580
1581                let vmo_id = session_proxy
1582                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1583                    .await
1584                    .unwrap()
1585                    .unwrap();
1586                assert_ne!(vmo_id.id, 0);
1587
1588                let mut fifo =
1589                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1590                let (mut reader, mut writer) = fifo.async_io();
1591
1592                writer
1593                    .write_entries(&BlockFifoRequest {
1594                        command: BlockFifoCommand {
1595                            opcode: BlockOpcode::Write.into_primitive(),
1596                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1597                            ..Default::default()
1598                        },
1599                        vmoid: vmo_id.id,
1600                        dev_offset: 0,
1601                        length: 5,
1602                        vmo_offset: 6,
1603                        ..Default::default()
1604                    })
1605                    .await
1606                    .unwrap();
1607
1608                for i in 0..10 {
1609                    writer
1610                        .write_entries(&BlockFifoRequest {
1611                            command: BlockFifoCommand {
1612                                opcode: BlockOpcode::Write.into_primitive(),
1613                                ..Default::default()
1614                            },
1615                            vmoid: vmo_id.id,
1616                            dev_offset: i + 1,
1617                            length: 5,
1618                            vmo_offset: 6,
1619                            ..Default::default()
1620                        })
1621                        .await
1622                        .unwrap();
1623                }
1624                for _ in 0..11 {
1625                    let mut response = BlockFifoResponse::default();
1626                    reader.read_entries(&mut response).await.unwrap();
1627                    assert_eq!(response.status, zx::sys::ZX_OK);
1628                }
1629
1630                std::mem::drop(proxy);
1631            }
1632        );
1633    }
1634
1635    #[fuchsia::test]
1636    async fn test_info() {
1637        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1638
1639        futures::join!(
1640            async {
1641                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1642                block_server.handle_requests(stream).await.unwrap();
1643            },
1644            async {
1645                let expected_info = test_device_info();
1646                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1647                    info
1648                } else {
1649                    unreachable!()
1650                };
1651
1652                let block_info = proxy.get_info().await.unwrap().unwrap();
1653                assert_eq!(
1654                    block_info.block_count,
1655                    partition_info.block_range.as_ref().unwrap().end
1656                        - partition_info.block_range.as_ref().unwrap().start
1657                );
1658                assert_eq!(
1659                    block_info.flags,
1660                    fblock::Flag::READONLY | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1661                );
1662
1663                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1664
1665                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1666                assert_eq!(status, zx::sys::ZX_OK);
1667                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1668
1669                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1670                assert_eq!(status, zx::sys::ZX_OK);
1671                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1672
1673                let (status, name) = proxy.get_name().await.unwrap();
1674                assert_eq!(status, zx::sys::ZX_OK);
1675                assert_eq!(name.as_ref(), Some(&partition_info.name));
1676
1677                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1678                assert_eq!(metadata.name, name);
1679                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1680                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1681                assert_eq!(
1682                    metadata.start_block_offset,
1683                    Some(partition_info.block_range.as_ref().unwrap().start)
1684                );
1685                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1686                assert_eq!(metadata.flags, Some(partition_info.flags));
1687
1688                std::mem::drop(proxy);
1689            }
1690        );
1691    }
1692
1693    #[fuchsia::test]
1694    async fn test_attach_vmo() {
1695        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1696
1697        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1698        let koid = vmo.get_koid().unwrap();
1699
1700        futures::join!(
1701            async {
1702                let block_server = BlockServer::new(
1703                    BLOCK_SIZE,
1704                    Arc::new(MockInterface {
1705                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1706                            assert_eq!(vmo.get_koid().unwrap(), koid);
1707                            Box::pin(async { Ok(()) })
1708                        })),
1709                        ..MockInterface::default()
1710                    }),
1711                );
1712                block_server.handle_requests(stream).await.unwrap();
1713            },
1714            async move {
1715                let (session_proxy, server) = fidl::endpoints::create_proxy();
1716
1717                proxy.open_session(server).unwrap();
1718
1719                let vmo_id = session_proxy
1720                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1721                    .await
1722                    .unwrap()
1723                    .unwrap();
1724                assert_ne!(vmo_id.id, 0);
1725
1726                let mut fifo =
1727                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1728                let (mut reader, mut writer) = fifo.async_io();
1729
1730                // Keep attaching VMOs until we eventually hit the maximum.
1731                let mut count = 1;
1732                loop {
1733                    match session_proxy
1734                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1735                        .await
1736                        .unwrap()
1737                    {
1738                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1739                        Err(e) => {
1740                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1741                            break;
1742                        }
1743                    }
1744
1745                    // Only test every 10 to keep test time down.
1746                    if count % 10 == 0 {
1747                        writer
1748                            .write_entries(&BlockFifoRequest {
1749                                command: BlockFifoCommand {
1750                                    opcode: BlockOpcode::Read.into_primitive(),
1751                                    ..Default::default()
1752                                },
1753                                vmoid: vmo_id.id,
1754                                length: 1,
1755                                ..Default::default()
1756                            })
1757                            .await
1758                            .unwrap();
1759
1760                        let mut response = BlockFifoResponse::default();
1761                        reader.read_entries(&mut response).await.unwrap();
1762                        assert_eq!(response.status, zx::sys::ZX_OK);
1763                    }
1764
1765                    count += 1;
1766                }
1767
1768                assert_eq!(count, u16::MAX as u64);
1769
1770                // Detach the original VMO, and make sure we can then attach another one.
1771                writer
1772                    .write_entries(&BlockFifoRequest {
1773                        command: BlockFifoCommand {
1774                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1775                            ..Default::default()
1776                        },
1777                        vmoid: vmo_id.id,
1778                        ..Default::default()
1779                    })
1780                    .await
1781                    .unwrap();
1782
1783                let mut response = BlockFifoResponse::default();
1784                reader.read_entries(&mut response).await.unwrap();
1785                assert_eq!(response.status, zx::sys::ZX_OK);
1786
1787                let new_vmo_id = session_proxy
1788                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1789                    .await
1790                    .unwrap()
1791                    .unwrap();
1792                // It should reuse the same ID.
1793                assert_eq!(new_vmo_id.id, vmo_id.id);
1794
1795                std::mem::drop(proxy);
1796            }
1797        );
1798    }
1799
1800    #[fuchsia::test]
1801    async fn test_close() {
1802        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1803
1804        let mut server = std::pin::pin!(
1805            async {
1806                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1807                block_server.handle_requests(stream).await.unwrap();
1808            }
1809            .fuse()
1810        );
1811
1812        let mut client = std::pin::pin!(
1813            async {
1814                let (session_proxy, server) = fidl::endpoints::create_proxy();
1815
1816                proxy.open_session(server).unwrap();
1817
1818                // Dropping the proxy should not cause the session to terminate because the session is
1819                // still live.
1820                std::mem::drop(proxy);
1821
1822                session_proxy.close().await.unwrap().unwrap();
1823
1824                // Keep the session alive.  Calling `close` should cause the server to terminate.
1825                let _: () = std::future::pending().await;
1826            }
1827            .fuse()
1828        );
1829
1830        futures::select!(
1831            _ = server => {}
1832            _ = client => unreachable!(),
1833        );
1834    }
1835
1836    #[derive(Default)]
1837    struct IoMockInterface {
1838        do_checks: bool,
1839        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1840        return_errors: bool,
1841    }
1842
1843    #[derive(Debug)]
1844    enum ExpectedOp {
1845        Read(u64, u32, u64),
1846        Write(u64, u32, u64),
1847        Trim(u64, u32),
1848        Flush,
1849    }
1850
1851    impl super::async_interface::Interface for IoMockInterface {
1852        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1853            Ok(())
1854        }
1855
1856        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1857            Ok(Cow::Owned(test_device_info()))
1858        }
1859
1860        async fn read(
1861            &self,
1862            device_block_offset: u64,
1863            block_count: u32,
1864            _vmo: &Arc<zx::Vmo>,
1865            vmo_offset: u64,
1866            _opts: ReadOptions,
1867            _trace_flow_id: TraceFlowId,
1868        ) -> Result<(), zx::Status> {
1869            if self.return_errors {
1870                Err(zx::Status::INTERNAL)
1871            } else {
1872                if self.do_checks {
1873                    assert_matches!(
1874                        self.expected_op.lock().take(),
1875                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1876                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1877                        "Read {device_block_offset} {block_count} {vmo_offset}"
1878                    );
1879                }
1880                Ok(())
1881            }
1882        }
1883
1884        async fn write(
1885            &self,
1886            device_block_offset: u64,
1887            block_count: u32,
1888            _vmo: &Arc<zx::Vmo>,
1889            vmo_offset: u64,
1890            _write_opts: WriteOptions,
1891            _trace_flow_id: TraceFlowId,
1892        ) -> Result<(), zx::Status> {
1893            if self.return_errors {
1894                Err(zx::Status::NOT_SUPPORTED)
1895            } else {
1896                if self.do_checks {
1897                    assert_matches!(
1898                        self.expected_op.lock().take(),
1899                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1900                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1901                        "Write {device_block_offset} {block_count} {vmo_offset}"
1902                    );
1903                }
1904                Ok(())
1905            }
1906        }
1907
1908        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1909            if self.return_errors {
1910                Err(zx::Status::NO_RESOURCES)
1911            } else {
1912                if self.do_checks {
1913                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1914                }
1915                Ok(())
1916            }
1917        }
1918
1919        fn barrier(&self) -> Result<(), zx::Status> {
1920            unreachable!()
1921        }
1922
1923        async fn trim(
1924            &self,
1925            device_block_offset: u64,
1926            block_count: u32,
1927            _trace_flow_id: TraceFlowId,
1928        ) -> Result<(), zx::Status> {
1929            if self.return_errors {
1930                Err(zx::Status::NO_MEMORY)
1931            } else {
1932                if self.do_checks {
1933                    assert_matches!(
1934                        self.expected_op.lock().take(),
1935                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1936                            block_count == b,
1937                        "Trim {device_block_offset} {block_count}"
1938                    );
1939                }
1940                Ok(())
1941            }
1942        }
1943    }
1944
1945    #[fuchsia::test]
1946    async fn test_io() {
1947        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1948
1949        let expected_op = Arc::new(Mutex::new(None));
1950        let expected_op_clone = expected_op.clone();
1951
1952        let server = async {
1953            let block_server = BlockServer::new(
1954                BLOCK_SIZE,
1955                Arc::new(IoMockInterface {
1956                    return_errors: false,
1957                    do_checks: true,
1958                    expected_op: expected_op_clone,
1959                }),
1960            );
1961            block_server.handle_requests(stream).await.unwrap();
1962        };
1963
1964        let client = async move {
1965            let (session_proxy, server) = fidl::endpoints::create_proxy();
1966
1967            proxy.open_session(server).unwrap();
1968
1969            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1970            let vmo_id = session_proxy
1971                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1972                .await
1973                .unwrap()
1974                .unwrap();
1975
1976            let mut fifo =
1977                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1978            let (mut reader, mut writer) = fifo.async_io();
1979
1980            // READ
1981            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1982            writer
1983                .write_entries(&BlockFifoRequest {
1984                    command: BlockFifoCommand {
1985                        opcode: BlockOpcode::Read.into_primitive(),
1986                        ..Default::default()
1987                    },
1988                    vmoid: vmo_id.id,
1989                    dev_offset: 1,
1990                    length: 2,
1991                    vmo_offset: 3,
1992                    ..Default::default()
1993                })
1994                .await
1995                .unwrap();
1996
1997            let mut response = BlockFifoResponse::default();
1998            reader.read_entries(&mut response).await.unwrap();
1999            assert_eq!(response.status, zx::sys::ZX_OK);
2000
2001            // WRITE
2002            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
2003            writer
2004                .write_entries(&BlockFifoRequest {
2005                    command: BlockFifoCommand {
2006                        opcode: BlockOpcode::Write.into_primitive(),
2007                        ..Default::default()
2008                    },
2009                    vmoid: vmo_id.id,
2010                    dev_offset: 4,
2011                    length: 5,
2012                    vmo_offset: 6,
2013                    ..Default::default()
2014                })
2015                .await
2016                .unwrap();
2017
2018            let mut response = BlockFifoResponse::default();
2019            reader.read_entries(&mut response).await.unwrap();
2020            assert_eq!(response.status, zx::sys::ZX_OK);
2021
2022            // FLUSH
2023            *expected_op.lock() = Some(ExpectedOp::Flush);
2024            writer
2025                .write_entries(&BlockFifoRequest {
2026                    command: BlockFifoCommand {
2027                        opcode: BlockOpcode::Flush.into_primitive(),
2028                        ..Default::default()
2029                    },
2030                    ..Default::default()
2031                })
2032                .await
2033                .unwrap();
2034
2035            reader.read_entries(&mut response).await.unwrap();
2036            assert_eq!(response.status, zx::sys::ZX_OK);
2037
2038            // TRIM
2039            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
2040            writer
2041                .write_entries(&BlockFifoRequest {
2042                    command: BlockFifoCommand {
2043                        opcode: BlockOpcode::Trim.into_primitive(),
2044                        ..Default::default()
2045                    },
2046                    dev_offset: 7,
2047                    length: 8,
2048                    ..Default::default()
2049                })
2050                .await
2051                .unwrap();
2052
2053            reader.read_entries(&mut response).await.unwrap();
2054            assert_eq!(response.status, zx::sys::ZX_OK);
2055
2056            std::mem::drop(proxy);
2057        };
2058
2059        futures::join!(server, client);
2060    }
2061
2062    #[fuchsia::test]
2063    async fn test_io_errors() {
2064        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2065
2066        futures::join!(
2067            async {
2068                let block_server = BlockServer::new(
2069                    BLOCK_SIZE,
2070                    Arc::new(IoMockInterface {
2071                        return_errors: true,
2072                        do_checks: false,
2073                        expected_op: Arc::new(Mutex::new(None)),
2074                    }),
2075                );
2076                block_server.handle_requests(stream).await.unwrap();
2077            },
2078            async move {
2079                let (session_proxy, server) = fidl::endpoints::create_proxy();
2080
2081                proxy.open_session(server).unwrap();
2082
2083                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2084                let vmo_id = session_proxy
2085                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2086                    .await
2087                    .unwrap()
2088                    .unwrap();
2089
2090                let mut fifo =
2091                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2092                let (mut reader, mut writer) = fifo.async_io();
2093
2094                // READ
2095                writer
2096                    .write_entries(&BlockFifoRequest {
2097                        command: BlockFifoCommand {
2098                            opcode: BlockOpcode::Read.into_primitive(),
2099                            ..Default::default()
2100                        },
2101                        vmoid: vmo_id.id,
2102                        length: 1,
2103                        reqid: 1,
2104                        ..Default::default()
2105                    })
2106                    .await
2107                    .unwrap();
2108
2109                let mut response = BlockFifoResponse::default();
2110                reader.read_entries(&mut response).await.unwrap();
2111                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2112
2113                // WRITE
2114                writer
2115                    .write_entries(&BlockFifoRequest {
2116                        command: BlockFifoCommand {
2117                            opcode: BlockOpcode::Write.into_primitive(),
2118                            ..Default::default()
2119                        },
2120                        vmoid: vmo_id.id,
2121                        length: 1,
2122                        reqid: 2,
2123                        ..Default::default()
2124                    })
2125                    .await
2126                    .unwrap();
2127
2128                reader.read_entries(&mut response).await.unwrap();
2129                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2130
2131                // FLUSH
2132                writer
2133                    .write_entries(&BlockFifoRequest {
2134                        command: BlockFifoCommand {
2135                            opcode: BlockOpcode::Flush.into_primitive(),
2136                            ..Default::default()
2137                        },
2138                        reqid: 3,
2139                        ..Default::default()
2140                    })
2141                    .await
2142                    .unwrap();
2143
2144                reader.read_entries(&mut response).await.unwrap();
2145                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2146
2147                // TRIM
2148                writer
2149                    .write_entries(&BlockFifoRequest {
2150                        command: BlockFifoCommand {
2151                            opcode: BlockOpcode::Trim.into_primitive(),
2152                            ..Default::default()
2153                        },
2154                        reqid: 4,
2155                        length: 1,
2156                        ..Default::default()
2157                    })
2158                    .await
2159                    .unwrap();
2160
2161                reader.read_entries(&mut response).await.unwrap();
2162                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2163
2164                std::mem::drop(proxy);
2165            }
2166        );
2167    }
2168
2169    #[fuchsia::test]
2170    async fn test_invalid_args() {
2171        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2172
2173        futures::join!(
2174            async {
2175                let block_server = BlockServer::new(
2176                    BLOCK_SIZE,
2177                    Arc::new(IoMockInterface {
2178                        return_errors: false,
2179                        do_checks: false,
2180                        expected_op: Arc::new(Mutex::new(None)),
2181                    }),
2182                );
2183                block_server.handle_requests(stream).await.unwrap();
2184            },
2185            async move {
2186                let (session_proxy, server) = fidl::endpoints::create_proxy();
2187
2188                proxy.open_session(server).unwrap();
2189
2190                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2191                let vmo_id = session_proxy
2192                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2193                    .await
2194                    .unwrap()
2195                    .unwrap();
2196
2197                let mut fifo =
2198                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2199
2200                async fn test(
2201                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2202                    request: BlockFifoRequest,
2203                ) -> Result<(), zx::Status> {
2204                    let (mut reader, mut writer) = fifo.async_io();
2205                    writer.write_entries(&request).await.unwrap();
2206                    let mut response = BlockFifoResponse::default();
2207                    reader.read_entries(&mut response).await.unwrap();
2208                    zx::Status::ok(response.status)
2209                }
2210
2211                // READ
2212
2213                let good_read_request = || BlockFifoRequest {
2214                    command: BlockFifoCommand {
2215                        opcode: BlockOpcode::Read.into_primitive(),
2216                        ..Default::default()
2217                    },
2218                    length: 1,
2219                    vmoid: vmo_id.id,
2220                    ..Default::default()
2221                };
2222
2223                assert_eq!(
2224                    test(
2225                        &mut fifo,
2226                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2227                    )
2228                    .await,
2229                    Err(zx::Status::IO)
2230                );
2231
2232                assert_eq!(
2233                    test(
2234                        &mut fifo,
2235                        BlockFifoRequest {
2236                            vmo_offset: 0xffff_ffff_ffff_ffff,
2237                            ..good_read_request()
2238                        }
2239                    )
2240                    .await,
2241                    Err(zx::Status::OUT_OF_RANGE)
2242                );
2243
2244                assert_eq!(
2245                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2246                    Err(zx::Status::INVALID_ARGS)
2247                );
2248
2249                // WRITE
2250
2251                let good_write_request = || BlockFifoRequest {
2252                    command: BlockFifoCommand {
2253                        opcode: BlockOpcode::Write.into_primitive(),
2254                        ..Default::default()
2255                    },
2256                    length: 1,
2257                    vmoid: vmo_id.id,
2258                    ..Default::default()
2259                };
2260
2261                assert_eq!(
2262                    test(
2263                        &mut fifo,
2264                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2265                    )
2266                    .await,
2267                    Err(zx::Status::IO)
2268                );
2269
2270                assert_eq!(
2271                    test(
2272                        &mut fifo,
2273                        BlockFifoRequest {
2274                            vmo_offset: 0xffff_ffff_ffff_ffff,
2275                            ..good_write_request()
2276                        }
2277                    )
2278                    .await,
2279                    Err(zx::Status::OUT_OF_RANGE)
2280                );
2281
2282                assert_eq!(
2283                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2284                    Err(zx::Status::INVALID_ARGS)
2285                );
2286
2287                // CLOSE VMO
2288
2289                assert_eq!(
2290                    test(
2291                        &mut fifo,
2292                        BlockFifoRequest {
2293                            command: BlockFifoCommand {
2294                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2295                                ..Default::default()
2296                            },
2297                            vmoid: vmo_id.id + 1,
2298                            ..Default::default()
2299                        }
2300                    )
2301                    .await,
2302                    Err(zx::Status::IO)
2303                );
2304
2305                std::mem::drop(proxy);
2306            }
2307        );
2308    }
2309
2310    #[fuchsia::test]
2311    async fn test_concurrent_requests() {
2312        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2313
2314        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2315        let waiting_readers_clone = waiting_readers.clone();
2316
2317        futures::join!(
2318            async move {
2319                let block_server = BlockServer::new(
2320                    BLOCK_SIZE,
2321                    Arc::new(MockInterface {
2322                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2323                            let (tx, rx) = oneshot::channel();
2324                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2325                            Box::pin(async move {
2326                                let _ = rx.await;
2327                                Ok(())
2328                            })
2329                        })),
2330                        ..MockInterface::default()
2331                    }),
2332                );
2333                block_server.handle_requests(stream).await.unwrap();
2334            },
2335            async move {
2336                let (session_proxy, server) = fidl::endpoints::create_proxy();
2337
2338                proxy.open_session(server).unwrap();
2339
2340                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2341                let vmo_id = session_proxy
2342                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2343                    .await
2344                    .unwrap()
2345                    .unwrap();
2346
2347                let mut fifo =
2348                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2349                let (mut reader, mut writer) = fifo.async_io();
2350
2351                writer
2352                    .write_entries(&BlockFifoRequest {
2353                        command: BlockFifoCommand {
2354                            opcode: BlockOpcode::Read.into_primitive(),
2355                            ..Default::default()
2356                        },
2357                        reqid: 1,
2358                        dev_offset: 1, // Intentionally use the same as `reqid`.
2359                        vmoid: vmo_id.id,
2360                        length: 1,
2361                        ..Default::default()
2362                    })
2363                    .await
2364                    .unwrap();
2365
2366                writer
2367                    .write_entries(&BlockFifoRequest {
2368                        command: BlockFifoCommand {
2369                            opcode: BlockOpcode::Read.into_primitive(),
2370                            ..Default::default()
2371                        },
2372                        reqid: 2,
2373                        dev_offset: 2,
2374                        vmoid: vmo_id.id,
2375                        length: 1,
2376                        ..Default::default()
2377                    })
2378                    .await
2379                    .unwrap();
2380
2381                // Wait till both those entries are pending.
2382                poll_fn(|cx: &mut Context<'_>| {
2383                    if waiting_readers.lock().len() == 2 {
2384                        Poll::Ready(())
2385                    } else {
2386                        // Yield to the executor.
2387                        cx.waker().wake_by_ref();
2388                        Poll::Pending
2389                    }
2390                })
2391                .await;
2392
2393                let mut response = BlockFifoResponse::default();
2394                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2395
2396                let (id, tx) = waiting_readers.lock().pop().unwrap();
2397                tx.send(()).unwrap();
2398
2399                reader.read_entries(&mut response).await.unwrap();
2400                assert_eq!(response.status, zx::sys::ZX_OK);
2401                assert_eq!(response.reqid, id);
2402
2403                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2404
2405                let (id, tx) = waiting_readers.lock().pop().unwrap();
2406                tx.send(()).unwrap();
2407
2408                reader.read_entries(&mut response).await.unwrap();
2409                assert_eq!(response.status, zx::sys::ZX_OK);
2410                assert_eq!(response.reqid, id);
2411            }
2412        );
2413    }
2414
2415    #[fuchsia::test]
2416    async fn test_groups() {
2417        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2418
2419        futures::join!(
2420            async move {
2421                let block_server = BlockServer::new(
2422                    BLOCK_SIZE,
2423                    Arc::new(MockInterface {
2424                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2425                        ..MockInterface::default()
2426                    }),
2427                );
2428                block_server.handle_requests(stream).await.unwrap();
2429            },
2430            async move {
2431                let (session_proxy, server) = fidl::endpoints::create_proxy();
2432
2433                proxy.open_session(server).unwrap();
2434
2435                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2436                let vmo_id = session_proxy
2437                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2438                    .await
2439                    .unwrap()
2440                    .unwrap();
2441
2442                let mut fifo =
2443                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2444                let (mut reader, mut writer) = fifo.async_io();
2445
2446                writer
2447                    .write_entries(&BlockFifoRequest {
2448                        command: BlockFifoCommand {
2449                            opcode: BlockOpcode::Read.into_primitive(),
2450                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2451                            ..Default::default()
2452                        },
2453                        group: 1,
2454                        vmoid: vmo_id.id,
2455                        length: 1,
2456                        ..Default::default()
2457                    })
2458                    .await
2459                    .unwrap();
2460
2461                writer
2462                    .write_entries(&BlockFifoRequest {
2463                        command: BlockFifoCommand {
2464                            opcode: BlockOpcode::Read.into_primitive(),
2465                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2466                            ..Default::default()
2467                        },
2468                        reqid: 2,
2469                        group: 1,
2470                        vmoid: vmo_id.id,
2471                        length: 1,
2472                        ..Default::default()
2473                    })
2474                    .await
2475                    .unwrap();
2476
2477                let mut response = BlockFifoResponse::default();
2478                reader.read_entries(&mut response).await.unwrap();
2479                assert_eq!(response.status, zx::sys::ZX_OK);
2480                assert_eq!(response.reqid, 2);
2481                assert_eq!(response.group, 1);
2482            }
2483        );
2484    }
2485
2486    #[fuchsia::test]
2487    async fn test_group_error() {
2488        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2489
2490        let counter = Arc::new(AtomicU64::new(0));
2491        let counter_clone = counter.clone();
2492
2493        futures::join!(
2494            async move {
2495                let block_server = BlockServer::new(
2496                    BLOCK_SIZE,
2497                    Arc::new(MockInterface {
2498                        read_hook: Some(Box::new(move |_, _, _, _| {
2499                            counter_clone.fetch_add(1, Ordering::Relaxed);
2500                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2501                        })),
2502                        ..MockInterface::default()
2503                    }),
2504                );
2505                block_server.handle_requests(stream).await.unwrap();
2506            },
2507            async move {
2508                let (session_proxy, server) = fidl::endpoints::create_proxy();
2509
2510                proxy.open_session(server).unwrap();
2511
2512                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2513                let vmo_id = session_proxy
2514                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2515                    .await
2516                    .unwrap()
2517                    .unwrap();
2518
2519                let mut fifo =
2520                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2521                let (mut reader, mut writer) = fifo.async_io();
2522
2523                writer
2524                    .write_entries(&BlockFifoRequest {
2525                        command: BlockFifoCommand {
2526                            opcode: BlockOpcode::Read.into_primitive(),
2527                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2528                            ..Default::default()
2529                        },
2530                        group: 1,
2531                        vmoid: vmo_id.id,
2532                        length: 1,
2533                        ..Default::default()
2534                    })
2535                    .await
2536                    .unwrap();
2537
2538                // Wait until processed.
2539                poll_fn(|cx: &mut Context<'_>| {
2540                    if counter.load(Ordering::Relaxed) == 1 {
2541                        Poll::Ready(())
2542                    } else {
2543                        // Yield to the executor.
2544                        cx.waker().wake_by_ref();
2545                        Poll::Pending
2546                    }
2547                })
2548                .await;
2549
2550                let mut response = BlockFifoResponse::default();
2551                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2552
2553                writer
2554                    .write_entries(&BlockFifoRequest {
2555                        command: BlockFifoCommand {
2556                            opcode: BlockOpcode::Read.into_primitive(),
2557                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2558                            ..Default::default()
2559                        },
2560                        group: 1,
2561                        vmoid: vmo_id.id,
2562                        length: 1,
2563                        ..Default::default()
2564                    })
2565                    .await
2566                    .unwrap();
2567
2568                writer
2569                    .write_entries(&BlockFifoRequest {
2570                        command: BlockFifoCommand {
2571                            opcode: BlockOpcode::Read.into_primitive(),
2572                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2573                            ..Default::default()
2574                        },
2575                        reqid: 2,
2576                        group: 1,
2577                        vmoid: vmo_id.id,
2578                        length: 1,
2579                        ..Default::default()
2580                    })
2581                    .await
2582                    .unwrap();
2583
2584                reader.read_entries(&mut response).await.unwrap();
2585                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2586                assert_eq!(response.reqid, 2);
2587                assert_eq!(response.group, 1);
2588
2589                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2590
2591                // Only the first request should have been processed.
2592                assert_eq!(counter.load(Ordering::Relaxed), 1);
2593            }
2594        );
2595    }
2596
2597    #[fuchsia::test]
2598    async fn test_group_with_two_lasts() {
2599        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2600
2601        let (tx, rx) = oneshot::channel();
2602
2603        futures::join!(
2604            async move {
2605                let rx = Mutex::new(Some(rx));
2606                let block_server = BlockServer::new(
2607                    BLOCK_SIZE,
2608                    Arc::new(MockInterface {
2609                        read_hook: Some(Box::new(move |_, _, _, _| {
2610                            let rx = rx.lock().take().unwrap();
2611                            Box::pin(async {
2612                                let _ = rx.await;
2613                                Ok(())
2614                            })
2615                        })),
2616                        ..MockInterface::default()
2617                    }),
2618                );
2619                block_server.handle_requests(stream).await.unwrap();
2620            },
2621            async move {
2622                let (session_proxy, server) = fidl::endpoints::create_proxy();
2623
2624                proxy.open_session(server).unwrap();
2625
2626                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2627                let vmo_id = session_proxy
2628                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2629                    .await
2630                    .unwrap()
2631                    .unwrap();
2632
2633                let mut fifo =
2634                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2635                let (mut reader, mut writer) = fifo.async_io();
2636
2637                writer
2638                    .write_entries(&BlockFifoRequest {
2639                        command: BlockFifoCommand {
2640                            opcode: BlockOpcode::Read.into_primitive(),
2641                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2642                            ..Default::default()
2643                        },
2644                        reqid: 1,
2645                        group: 1,
2646                        vmoid: vmo_id.id,
2647                        length: 1,
2648                        ..Default::default()
2649                    })
2650                    .await
2651                    .unwrap();
2652
2653                writer
2654                    .write_entries(&BlockFifoRequest {
2655                        command: BlockFifoCommand {
2656                            opcode: BlockOpcode::Read.into_primitive(),
2657                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2658                            ..Default::default()
2659                        },
2660                        reqid: 2,
2661                        group: 1,
2662                        vmoid: vmo_id.id,
2663                        length: 1,
2664                        ..Default::default()
2665                    })
2666                    .await
2667                    .unwrap();
2668
2669                // Send an independent request to flush through the fifo.
2670                writer
2671                    .write_entries(&BlockFifoRequest {
2672                        command: BlockFifoCommand {
2673                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2674                            ..Default::default()
2675                        },
2676                        reqid: 3,
2677                        vmoid: vmo_id.id,
2678                        ..Default::default()
2679                    })
2680                    .await
2681                    .unwrap();
2682
2683                // It should succeed.
2684                let mut response = BlockFifoResponse::default();
2685                reader.read_entries(&mut response).await.unwrap();
2686                assert_eq!(response.status, zx::sys::ZX_OK);
2687                assert_eq!(response.reqid, 3);
2688
2689                // Now release the original request.
2690                tx.send(()).unwrap();
2691
2692                // The response should be for the first message tagged as last, and it should be
2693                // an error because we sent two messages with the LAST marker.
2694                let mut response = BlockFifoResponse::default();
2695                reader.read_entries(&mut response).await.unwrap();
2696                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2697                assert_eq!(response.reqid, 1);
2698                assert_eq!(response.group, 1);
2699            }
2700        );
2701    }
2702
2703    #[fuchsia::test(allow_stalls = false)]
2704    async fn test_requests_dont_block_sessions() {
2705        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2706
2707        let (tx, rx) = oneshot::channel();
2708
2709        fasync::Task::local(async move {
2710            let rx = Mutex::new(Some(rx));
2711            let block_server = BlockServer::new(
2712                BLOCK_SIZE,
2713                Arc::new(MockInterface {
2714                    read_hook: Some(Box::new(move |_, _, _, _| {
2715                        let rx = rx.lock().take().unwrap();
2716                        Box::pin(async {
2717                            let _ = rx.await;
2718                            Ok(())
2719                        })
2720                    })),
2721                    ..MockInterface::default()
2722                }),
2723            );
2724            block_server.handle_requests(stream).await.unwrap();
2725        })
2726        .detach();
2727
2728        let mut fut = pin!(async {
2729            let (session_proxy, server) = fidl::endpoints::create_proxy();
2730
2731            proxy.open_session(server).unwrap();
2732
2733            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2734            let vmo_id = session_proxy
2735                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2736                .await
2737                .unwrap()
2738                .unwrap();
2739
2740            let mut fifo =
2741                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2742            let (mut reader, mut writer) = fifo.async_io();
2743
2744            writer
2745                .write_entries(&BlockFifoRequest {
2746                    command: BlockFifoCommand {
2747                        opcode: BlockOpcode::Read.into_primitive(),
2748                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2749                        ..Default::default()
2750                    },
2751                    reqid: 1,
2752                    group: 1,
2753                    vmoid: vmo_id.id,
2754                    length: 1,
2755                    ..Default::default()
2756                })
2757                .await
2758                .unwrap();
2759
2760            let mut response = BlockFifoResponse::default();
2761            reader.read_entries(&mut response).await.unwrap();
2762            assert_eq!(response.status, zx::sys::ZX_OK);
2763        });
2764
2765        // The response won't come back until we send on `tx`.
2766        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2767
2768        let mut fut2 = pin!(proxy.get_volume_info());
2769
2770        // get_volume_info is set up to stall forever.
2771        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2772
2773        // If we now free up the first future, it should resolve; the stalled call to
2774        // get_volume_info should not block the fifo response.
2775        let _ = tx.send(());
2776
2777        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2778    }
2779
2780    #[fuchsia::test]
2781    async fn test_request_flow_control() {
2782        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2783
2784        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2785        // server will block until that happens.
2786        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2787        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2788        let event_clone = event.clone();
2789        futures::join!(
2790            async move {
2791                let block_server = BlockServer::new(
2792                    BLOCK_SIZE,
2793                    Arc::new(MockInterface {
2794                        read_hook: Some(Box::new(move |_, _, _, _| {
2795                            let event_clone = event_clone.clone();
2796                            Box::pin(async move {
2797                                if !event_clone.1.load(Ordering::SeqCst) {
2798                                    event_clone.0.listen().await;
2799                                }
2800                                Ok(())
2801                            })
2802                        })),
2803                        ..MockInterface::default()
2804                    }),
2805                );
2806                block_server.handle_requests(stream).await.unwrap();
2807            },
2808            async move {
2809                let (session_proxy, server) = fidl::endpoints::create_proxy();
2810
2811                proxy.open_session(server).unwrap();
2812
2813                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2814                let vmo_id = session_proxy
2815                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2816                    .await
2817                    .unwrap()
2818                    .unwrap();
2819
2820                let mut fifo =
2821                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2822                let (mut reader, mut writer) = fifo.async_io();
2823
2824                for i in 0..MAX_REQUESTS {
2825                    writer
2826                        .write_entries(&BlockFifoRequest {
2827                            command: BlockFifoCommand {
2828                                opcode: BlockOpcode::Read.into_primitive(),
2829                                ..Default::default()
2830                            },
2831                            reqid: (i + 1) as u32,
2832                            dev_offset: i,
2833                            vmoid: vmo_id.id,
2834                            length: 1,
2835                            ..Default::default()
2836                        })
2837                        .await
2838                        .unwrap();
2839                }
2840                assert!(
2841                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2842                        command: BlockFifoCommand {
2843                            opcode: BlockOpcode::Read.into_primitive(),
2844                            ..Default::default()
2845                        },
2846                        reqid: u32::MAX,
2847                        dev_offset: MAX_REQUESTS,
2848                        vmoid: vmo_id.id,
2849                        length: 1,
2850                        ..Default::default()
2851                    })))
2852                    .is_pending()
2853                );
2854                // OK, let the server start to process.
2855                event.1.store(true, Ordering::SeqCst);
2856                event.0.notify(usize::MAX);
2857                // For each entry we read, make sure we can write a new one in.
2858                let mut finished_reqids = vec![];
2859                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2860                    let mut response = BlockFifoResponse::default();
2861                    reader.read_entries(&mut response).await.unwrap();
2862                    assert_eq!(response.status, zx::sys::ZX_OK);
2863                    finished_reqids.push(response.reqid);
2864                    writer
2865                        .write_entries(&BlockFifoRequest {
2866                            command: BlockFifoCommand {
2867                                opcode: BlockOpcode::Read.into_primitive(),
2868                                ..Default::default()
2869                            },
2870                            reqid: (i + 1) as u32,
2871                            dev_offset: i,
2872                            vmoid: vmo_id.id,
2873                            length: 1,
2874                            ..Default::default()
2875                        })
2876                        .await
2877                        .unwrap();
2878                }
2879                let mut response = BlockFifoResponse::default();
2880                for _ in 0..MAX_REQUESTS {
2881                    reader.read_entries(&mut response).await.unwrap();
2882                    assert_eq!(response.status, zx::sys::ZX_OK);
2883                    finished_reqids.push(response.reqid);
2884                }
2885                // Verify that we got a response for each request.  Note that we can't assume FIFO
2886                // ordering.
2887                finished_reqids.sort();
2888                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2889                let mut i = 1;
2890                for reqid in finished_reqids {
2891                    assert_eq!(reqid, i);
2892                    i += 1;
2893                }
2894            }
2895        );
2896    }
2897
2898    #[fuchsia::test]
2899    async fn test_passthrough_io_with_fixed_map() {
2900        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2901
2902        let expected_op = Arc::new(Mutex::new(None));
2903        let expected_op_clone = expected_op.clone();
2904        futures::join!(
2905            async {
2906                let block_server = BlockServer::new(
2907                    BLOCK_SIZE,
2908                    Arc::new(IoMockInterface {
2909                        return_errors: false,
2910                        do_checks: true,
2911                        expected_op: expected_op_clone,
2912                    }),
2913                );
2914                block_server.handle_requests(stream).await.unwrap();
2915            },
2916            async move {
2917                let (session_proxy, server) = fidl::endpoints::create_proxy();
2918
2919                let mapping = fblock::BlockOffsetMapping {
2920                    source_block_offset: 0,
2921                    target_block_offset: 10,
2922                    length: 20,
2923                };
2924                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2925
2926                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2927                let vmo_id = session_proxy
2928                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2929                    .await
2930                    .unwrap()
2931                    .unwrap();
2932
2933                let mut fifo =
2934                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2935                let (mut reader, mut writer) = fifo.async_io();
2936
2937                // READ
2938                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2939                writer
2940                    .write_entries(&BlockFifoRequest {
2941                        command: BlockFifoCommand {
2942                            opcode: BlockOpcode::Read.into_primitive(),
2943                            ..Default::default()
2944                        },
2945                        vmoid: vmo_id.id,
2946                        dev_offset: 1,
2947                        length: 2,
2948                        vmo_offset: 3,
2949                        ..Default::default()
2950                    })
2951                    .await
2952                    .unwrap();
2953
2954                let mut response = BlockFifoResponse::default();
2955                reader.read_entries(&mut response).await.unwrap();
2956                assert_eq!(response.status, zx::sys::ZX_OK);
2957
2958                // WRITE
2959                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2960                writer
2961                    .write_entries(&BlockFifoRequest {
2962                        command: BlockFifoCommand {
2963                            opcode: BlockOpcode::Write.into_primitive(),
2964                            ..Default::default()
2965                        },
2966                        vmoid: vmo_id.id,
2967                        dev_offset: 4,
2968                        length: 5,
2969                        vmo_offset: 6,
2970                        ..Default::default()
2971                    })
2972                    .await
2973                    .unwrap();
2974
2975                reader.read_entries(&mut response).await.unwrap();
2976                assert_eq!(response.status, zx::sys::ZX_OK);
2977
2978                // FLUSH
2979                *expected_op.lock() = Some(ExpectedOp::Flush);
2980                writer
2981                    .write_entries(&BlockFifoRequest {
2982                        command: BlockFifoCommand {
2983                            opcode: BlockOpcode::Flush.into_primitive(),
2984                            ..Default::default()
2985                        },
2986                        ..Default::default()
2987                    })
2988                    .await
2989                    .unwrap();
2990
2991                reader.read_entries(&mut response).await.unwrap();
2992                assert_eq!(response.status, zx::sys::ZX_OK);
2993
2994                // TRIM
2995                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2996                writer
2997                    .write_entries(&BlockFifoRequest {
2998                        command: BlockFifoCommand {
2999                            opcode: BlockOpcode::Trim.into_primitive(),
3000                            ..Default::default()
3001                        },
3002                        dev_offset: 7,
3003                        length: 3,
3004                        ..Default::default()
3005                    })
3006                    .await
3007                    .unwrap();
3008
3009                reader.read_entries(&mut response).await.unwrap();
3010                assert_eq!(response.status, zx::sys::ZX_OK);
3011
3012                // READ past window
3013                *expected_op.lock() = None;
3014                writer
3015                    .write_entries(&BlockFifoRequest {
3016                        command: BlockFifoCommand {
3017                            opcode: BlockOpcode::Read.into_primitive(),
3018                            ..Default::default()
3019                        },
3020                        vmoid: vmo_id.id,
3021                        dev_offset: 19,
3022                        length: 2,
3023                        vmo_offset: 3,
3024                        ..Default::default()
3025                    })
3026                    .await
3027                    .unwrap();
3028
3029                reader.read_entries(&mut response).await.unwrap();
3030                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3031
3032                std::mem::drop(proxy);
3033            }
3034        );
3035    }
3036
3037    #[fuchsia::test]
3038    fn operation_map() {
3039        fn expect_map_result(
3040            mut operation: Operation,
3041            mapping: Option<BlockOffsetMapping>,
3042            max_blocks: Option<NonZero<u32>>,
3043            expected_operations: Vec<Operation>,
3044        ) {
3045            let mut ops = vec![];
3046            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
3047                ops.push(operation);
3048                operation = remainder;
3049            }
3050            ops.push(operation);
3051            assert_eq!(ops, expected_operations);
3052        }
3053
3054        // No limits
3055        expect_map_result(
3056            Operation::Read {
3057                device_block_offset: 10,
3058                block_count: 200,
3059                _unused: 0,
3060                vmo_offset: 0,
3061                options: ReadOptions::default(),
3062            },
3063            None,
3064            None,
3065            vec![Operation::Read {
3066                device_block_offset: 10,
3067                block_count: 200,
3068                _unused: 0,
3069                vmo_offset: 0,
3070                options: ReadOptions::default(),
3071            }],
3072        );
3073
3074        // Max block count
3075        expect_map_result(
3076            Operation::Read {
3077                device_block_offset: 10,
3078                block_count: 200,
3079                _unused: 0,
3080                vmo_offset: 0,
3081                options: ReadOptions::default(),
3082            },
3083            None,
3084            NonZero::new(120),
3085            vec![
3086                Operation::Read {
3087                    device_block_offset: 10,
3088                    block_count: 120,
3089                    _unused: 0,
3090                    vmo_offset: 0,
3091                    options: ReadOptions::default(),
3092                },
3093                Operation::Read {
3094                    device_block_offset: 130,
3095                    block_count: 80,
3096                    _unused: 0,
3097                    vmo_offset: 120 * 512,
3098                    options: ReadOptions::default(),
3099                },
3100            ],
3101        );
3102        expect_map_result(
3103            Operation::Write {
3104                device_block_offset: 10,
3105                block_count: 200,
3106                _unused: 0,
3107                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3108                vmo_offset: 0,
3109            },
3110            None,
3111            NonZero::new(120),
3112            vec![
3113                Operation::Write {
3114                    device_block_offset: 10,
3115                    block_count: 120,
3116                    _unused: 0,
3117                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3118                    vmo_offset: 0,
3119                },
3120                Operation::Write {
3121                    device_block_offset: 130,
3122                    block_count: 80,
3123                    _unused: 0,
3124                    options: WriteOptions::default(),
3125                    vmo_offset: 120 * 512,
3126                },
3127            ],
3128        );
3129        expect_map_result(
3130            Operation::Trim { device_block_offset: 10, block_count: 200 },
3131            None,
3132            NonZero::new(120),
3133            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3134        );
3135
3136        // Remapping + Max block count
3137        expect_map_result(
3138            Operation::Read {
3139                device_block_offset: 10,
3140                block_count: 200,
3141                _unused: 0,
3142                vmo_offset: 0,
3143                options: ReadOptions::default(),
3144            },
3145            Some(BlockOffsetMapping {
3146                source_block_offset: 10,
3147                target_block_offset: 100,
3148                length: 200,
3149            }),
3150            NonZero::new(120),
3151            vec![
3152                Operation::Read {
3153                    device_block_offset: 100,
3154                    block_count: 120,
3155                    _unused: 0,
3156                    vmo_offset: 0,
3157                    options: ReadOptions::default(),
3158                },
3159                Operation::Read {
3160                    device_block_offset: 220,
3161                    block_count: 80,
3162                    _unused: 0,
3163                    vmo_offset: 120 * 512,
3164                    options: ReadOptions::default(),
3165                },
3166            ],
3167        );
3168        expect_map_result(
3169            Operation::Write {
3170                device_block_offset: 10,
3171                block_count: 200,
3172                _unused: 0,
3173                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3174                vmo_offset: 0,
3175            },
3176            Some(BlockOffsetMapping {
3177                source_block_offset: 10,
3178                target_block_offset: 100,
3179                length: 200,
3180            }),
3181            NonZero::new(120),
3182            vec![
3183                Operation::Write {
3184                    device_block_offset: 100,
3185                    block_count: 120,
3186                    _unused: 0,
3187                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3188                    vmo_offset: 0,
3189                },
3190                Operation::Write {
3191                    device_block_offset: 220,
3192                    block_count: 80,
3193                    _unused: 0,
3194                    options: WriteOptions::default(),
3195                    vmo_offset: 120 * 512,
3196                },
3197            ],
3198        );
3199        expect_map_result(
3200            Operation::Trim { device_block_offset: 10, block_count: 200 },
3201            Some(BlockOffsetMapping {
3202                source_block_offset: 10,
3203                target_block_offset: 100,
3204                length: 200,
3205            }),
3206            NonZero::new(120),
3207            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3208        );
3209    }
3210}