_block_server_c_rustc_static/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::bin::Bin;
5use anyhow::{anyhow, Error};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use zx::HandleBased;
19use {
20    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    fn block_count(&self) -> Option<u64> {
40        match self {
41            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42            Self::Partition(PartitionInfo { block_range, .. }) => {
43                block_range.as_ref().map(|range| range.end - range.start)
44            }
45        }
46    }
47
48    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49        match self {
50            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52                max_transfer_blocks.clone()
53            }
54        }
55    }
56
57    fn max_transfer_size(&self, block_size: u32) -> u32 {
58        if let Some(max_blocks) = self.max_transfer_blocks() {
59            max_blocks.get() * block_size
60        } else {
61            MAX_TRANSFER_UNBOUNDED
62        }
63    }
64}
65
66/// Information associated with non-partition block devices.
67#[derive(Clone)]
68pub struct BlockInfo {
69    pub device_flags: fblock::Flag,
70    pub block_count: u64,
71    pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74/// Information associated with a block device that is also a partition.
75#[derive(Clone)]
76pub struct PartitionInfo {
77    /// The device flags reported by the underlying device.
78    pub device_flags: fblock::Flag,
79    pub max_transfer_blocks: Option<NonZero<u32>>,
80    /// If `block_range` is None, the partition is a volume and may not be contiguous.
81    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
82    /// slices and use that (along with the slice and block sizes) to determine the block count.
83    pub block_range: Option<Range<u64>>,
84    pub type_guid: [u8; 16],
85    pub instance_guid: [u8; 16],
86    pub name: String,
87    pub flags: u64,
88}
89
90/// We internally keep track of active requests, so that when the server is torn down, we can
91/// deallocate all of the resources for pending requests.
92struct ActiveRequest<S> {
93    session: S,
94    group_or_request: GroupOrRequest,
95    trace_flow_id: TraceFlowId,
96    vmo_bin_key: Option<usize>,
97    status: zx::Status,
98    count: u32,
99    req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105    fn default() -> Self {
106        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107    }
108}
109
110impl<S> ActiveRequests<S> {
111    fn complete_and_take_response(
112        &self,
113        request_id: RequestId,
114        status: zx::Status,
115    ) -> Option<(S, BlockFifoResponse)> {
116        self.0.lock().complete_and_take_response(request_id, status)
117    }
118}
119
120struct ActiveRequestsInner<S> {
121    requests: Slab<ActiveRequest<S>>,
122    vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125// Keeps track of all the requests that are currently being processed
126impl<S> ActiveRequestsInner<S> {
127    /// Completes a request.
128    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129        let group = &mut self.requests[request_id.0];
130
131        group.count = group.count.checked_sub(1).unwrap();
132        if status != zx::Status::OK && group.status == zx::Status::OK {
133            group.status = status
134        }
135
136        fuchsia_trace::duration!(
137            c"storage",
138            c"block_server::finish_transaction",
139            "request_id" => request_id.0,
140            "group_completed" => group.count == 0,
141            "status" => status.into_raw());
142        if let Some(trace_flow_id) = group.trace_flow_id {
143            fuchsia_trace::flow_step!(
144                c"storage",
145                c"block_server::finish_request",
146                trace_flow_id.get().into()
147            );
148        }
149    }
150
151    /// Takes the response if all requests are finished.
152    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153        let group = &self.requests[request_id.0];
154        match group.req_id {
155            Some(reqid) if group.count == 0 => {
156                let group = self.requests.remove(request_id.0);
157                if let Some(vmo_bin_key) = group.vmo_bin_key {
158                    self.vmo_bin.release(vmo_bin_key);
159                }
160                Some((
161                    group.session,
162                    BlockFifoResponse {
163                        status: group.status.into_raw(),
164                        reqid,
165                        group: group.group_or_request.group_id().unwrap_or(0),
166                        ..Default::default()
167                    },
168                ))
169            }
170            _ => None,
171        }
172    }
173
174    /// Competes the request and returns a response if the request group is finished.
175    fn complete_and_take_response(
176        &mut self,
177        request_id: RequestId,
178        status: zx::Status,
179    ) -> Option<(S, BlockFifoResponse)> {
180        self.complete(request_id, status);
181        self.take_response(request_id)
182    }
183}
184
185/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
186/// cbindgen:no-export
187pub struct BlockServer<SM> {
188    block_size: u32,
189    session_manager: Arc<SM>,
190}
191
192/// A single entry in `[OffsetMap]`.
193#[derive(Debug)]
194pub struct BlockOffsetMapping {
195    source_block_offset: u64,
196    target_block_offset: u64,
197    length: u64,
198}
199
200impl BlockOffsetMapping {
201    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202        blocks.0 >= self.source_block_offset
203            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204    }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208    type Error = zx::Status;
209
210    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213        Ok(Self {
214            source_block_offset: wire.source_block_offset,
215            target_block_offset: wire.target_block_offset,
216            length: wire.length,
217        })
218    }
219}
220
221/// Remaps the offset of block requests based on an internal map, and truncates long requests.
222/// TODO(https://fxbug.dev/402515764): For now, this just supports a single entry in the map, which
223/// is all that is required for GPT partitions.  If we want to support this for FVM, we will need
224/// to support multiple entries, which requires changing the block server to support request
225/// splitting.
226pub struct OffsetMap {
227    mapping: Option<BlockOffsetMapping>,
228    max_transfer_blocks: Option<NonZero<u32>>,
229}
230
231impl OffsetMap {
232    /// An OffsetMap that remaps requests.
233    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
234        Self { mapping: Some(mapping), max_transfer_blocks }
235    }
236
237    /// An OffsetMap that just enforces maximum request sizes.
238    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
239        Self { mapping: None, max_transfer_blocks }
240    }
241
242    pub fn is_empty(&self) -> bool {
243        self.mapping.is_none()
244    }
245
246    fn mapping(&self) -> Option<&BlockOffsetMapping> {
247        self.mapping.as_ref()
248    }
249
250    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
251        self.max_transfer_blocks
252    }
253}
254
255// Methods take Arc<Self> rather than &self because of
256// https://github.com/rust-lang/rust/issues/42940.
257pub trait SessionManager: 'static {
258    type Session;
259
260    fn on_attach_vmo(
261        self: Arc<Self>,
262        vmo: &Arc<zx::Vmo>,
263    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
264
265    /// Creates a new session to handle `stream`.
266    /// The returned future should run until the session completes, for example when the client end
267    /// closes.
268    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
269    fn open_session(
270        self: Arc<Self>,
271        stream: fblock::SessionRequestStream,
272        offset_map: OffsetMap,
273        block_size: u32,
274    ) -> impl Future<Output = Result<(), Error>> + Send;
275
276    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
277    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
278
279    /// Called to handle the GetVolumeInfo FIDL call.
280    fn get_volume_info(
281        &self,
282    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
283    {
284        async { Err(zx::Status::NOT_SUPPORTED) }
285    }
286
287    /// Called to handle the QuerySlices FIDL call.
288    fn query_slices(
289        &self,
290        _start_slices: &[u64],
291    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
292        async { Err(zx::Status::NOT_SUPPORTED) }
293    }
294
295    /// Called to handle the Shrink FIDL call.
296    fn extend(
297        &self,
298        _start_slice: u64,
299        _slice_count: u64,
300    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
301        async { Err(zx::Status::NOT_SUPPORTED) }
302    }
303
304    /// Called to handle the Shrink FIDL call.
305    fn shrink(
306        &self,
307        _start_slice: u64,
308        _slice_count: u64,
309    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
310        async { Err(zx::Status::NOT_SUPPORTED) }
311    }
312
313    /// Returns the active requests.
314    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
315}
316
317pub trait IntoSessionManager {
318    type SM: SessionManager;
319
320    fn into_session_manager(self) -> Arc<Self::SM>;
321}
322
323impl<SM: SessionManager> BlockServer<SM> {
324    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
325        Self { block_size, session_manager: session_manager.into_session_manager() }
326    }
327
328    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
329    pub async fn handle_requests(
330        &self,
331        mut requests: fvolume::VolumeRequestStream,
332    ) -> Result<(), Error> {
333        let scope = fasync::Scope::new();
334        while let Some(request) = requests.try_next().await.unwrap() {
335            if let Some(session) = self.handle_request(request).await? {
336                scope.spawn(session.map(|_| ()));
337            }
338        }
339        scope.await;
340        Ok(())
341    }
342
343    /// Processes a partition request.  If a new session task is created in response to the request,
344    /// it is returned.
345    async fn handle_request(
346        &self,
347        request: fvolume::VolumeRequest,
348    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
349        match request {
350            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
351                Ok(info) => {
352                    let max_transfer_size = info.max_transfer_size(self.block_size);
353                    let (block_count, flags) = match info.as_ref() {
354                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
355                            (*block_count, *device_flags)
356                        }
357                        DeviceInfo::Partition(partition_info) => {
358                            let block_count = if let Some(range) =
359                                partition_info.block_range.as_ref()
360                            {
361                                range.end - range.start
362                            } else {
363                                let volume_info = self.session_manager.get_volume_info().await?;
364                                volume_info.0.slice_size * volume_info.1.partition_slice_count
365                                    / self.block_size as u64
366                            };
367                            (block_count, partition_info.device_flags)
368                        }
369                    };
370                    responder.send(Ok(&fblock::BlockInfo {
371                        block_count,
372                        block_size: self.block_size,
373                        max_transfer_size,
374                        flags,
375                    }))?;
376                }
377                Err(status) => responder.send(Err(status.into_raw()))?,
378            },
379            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
380                match self.device_info().await {
381                    Ok(info) => {
382                        return Ok(Some(self.session_manager.clone().open_session(
383                            session.into_stream(),
384                            OffsetMap::empty(info.max_transfer_blocks()),
385                            self.block_size,
386                        )));
387                    }
388                    Err(status) => session.close_with_epitaph(status)?,
389                }
390            }
391            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
392                session,
393                offset_map,
394                initial_mappings,
395                control_handle: _,
396            } => match self.device_info().await {
397                Ok(info) => {
398                    if offset_map.is_some()
399                        || initial_mappings.as_ref().is_none_or(|m| m.len() != 1)
400                    {
401                        // TODO(https://fxbug.dev/402515764): Support multiple mappings and
402                        // dynamic querying for FVM as needed.  A single static map is
403                        // sufficient for GPT.
404                        session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
405                        return Ok(None);
406                    }
407                    let initial_mapping: BlockOffsetMapping =
408                        match initial_mappings.unwrap().pop().unwrap().try_into() {
409                            Ok(m) => m,
410                            Err(status) => {
411                                session.close_with_epitaph(status)?;
412                                return Ok(None);
413                            }
414                        };
415                    if let Some(max) = info.block_count() {
416                        if initial_mapping.target_block_offset + initial_mapping.length > max {
417                            log::warn!(
418                                "Invalid mapping for session: {initial_mapping:?} (max {max})"
419                            );
420                            session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
421                            return Ok(None);
422                        }
423                    }
424                    return Ok(Some(self.session_manager.clone().open_session(
425                        session.into_stream(),
426                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
427                        self.block_size,
428                    )));
429                }
430                Err(status) => session.close_with_epitaph(status)?,
431            },
432            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
433                Ok(info) => {
434                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
435                        let mut guid =
436                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
437                        guid.value.copy_from_slice(&partition_info.type_guid);
438                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
439                    } else {
440                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
441                    }
442                }
443                Err(status) => {
444                    responder.send(status.into_raw(), None)?;
445                }
446            },
447            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
448                match self.device_info().await {
449                    Ok(info) => {
450                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
451                            let mut guid =
452                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
453                            guid.value.copy_from_slice(&partition_info.instance_guid);
454                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
455                        } else {
456                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
457                        }
458                    }
459                    Err(status) => {
460                        responder.send(status.into_raw(), None)?;
461                    }
462                }
463            }
464            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
465                Ok(info) => {
466                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
467                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
468                    } else {
469                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
470                    }
471                }
472                Err(status) => {
473                    responder.send(status.into_raw(), None)?;
474                }
475            },
476            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
477                Ok(info) => {
478                    if let DeviceInfo::Partition(info) = info.as_ref() {
479                        let mut type_guid =
480                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
481                        type_guid.value.copy_from_slice(&info.type_guid);
482                        let mut instance_guid =
483                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
484                        instance_guid.value.copy_from_slice(&info.instance_guid);
485                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
486                            name: Some(info.name.clone()),
487                            type_guid: Some(type_guid),
488                            instance_guid: Some(instance_guid),
489                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
490                            num_blocks: info
491                                .block_range
492                                .as_ref()
493                                .map(|range| range.end - range.start),
494                            flags: Some(info.flags),
495                            ..Default::default()
496                        }))?;
497                    }
498                }
499                Err(status) => responder.send(Err(status.into_raw()))?,
500            },
501            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
502                match self.session_manager.query_slices(&start_slices).await {
503                    Ok(mut results) => {
504                        let results_len = results.len();
505                        assert!(results_len <= 16);
506                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
507                        responder.send(
508                            zx::sys::ZX_OK,
509                            &results.try_into().unwrap(),
510                            results_len as u64,
511                        )?;
512                    }
513                    Err(s) => {
514                        responder.send(
515                            s.into_raw(),
516                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
517                            0,
518                        )?;
519                    }
520                }
521            }
522            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
523                match self.session_manager.get_volume_info().await {
524                    Ok((manager_info, volume_info)) => {
525                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
526                    }
527                    Err(s) => responder.send(s.into_raw(), None, None)?,
528                }
529            }
530            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
531                responder.send(
532                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
533                        .into_raw(),
534                )?;
535            }
536            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
537                responder.send(
538                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
539                        .into_raw(),
540                )?;
541            }
542            fvolume::VolumeRequest::Destroy { responder, .. } => {
543                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
544            }
545        }
546        Ok(None)
547    }
548
549    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
550        self.session_manager.get_info().await
551    }
552}
553
554struct SessionHelper<SM: SessionManager> {
555    session_manager: Arc<SM>,
556    offset_map: OffsetMap,
557    block_size: u32,
558    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
559    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
560}
561
562impl<SM: SessionManager> SessionHelper<SM> {
563    fn new(
564        session_manager: Arc<SM>,
565        offset_map: OffsetMap,
566        block_size: u32,
567    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
568        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
569        Ok((
570            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
571            fifo,
572        ))
573    }
574
575    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
576        match request {
577            fblock::SessionRequest::GetFifo { responder } => {
578                let rights = zx::Rights::TRANSFER
579                    | zx::Rights::READ
580                    | zx::Rights::WRITE
581                    | zx::Rights::SIGNAL
582                    | zx::Rights::WAIT;
583                match self.peer_fifo.duplicate_handle(rights) {
584                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
585                    Err(s) => responder.send(Err(s.into_raw()))?,
586                }
587                Ok(())
588            }
589            fblock::SessionRequest::AttachVmo { vmo, responder } => {
590                let vmo = Arc::new(vmo);
591                let vmo_id = {
592                    let mut vmos = self.vmos.lock();
593                    if vmos.len() == u16::MAX as usize {
594                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
595                        return Ok(());
596                    } else {
597                        let vmo_id = match vmos.last_entry() {
598                            None => 1,
599                            Some(o) => {
600                                o.key().checked_add(1).unwrap_or_else(|| {
601                                    let mut vmo_id = 1;
602                                    // Find the first gap...
603                                    for (&id, _) in &*vmos {
604                                        if id > vmo_id {
605                                            break;
606                                        }
607                                        vmo_id = id + 1;
608                                    }
609                                    vmo_id
610                                })
611                            }
612                        };
613                        vmos.insert(vmo_id, vmo.clone());
614                        vmo_id
615                    }
616                };
617                self.session_manager.clone().on_attach_vmo(&vmo).await?;
618                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
619                Ok(())
620            }
621            fblock::SessionRequest::Close { responder } => {
622                responder.send(Ok(()))?;
623                Err(anyhow!("Closed"))
624            }
625        }
626    }
627
628    /// Decodes `request`.
629    fn decode_fifo_request(
630        &self,
631        session: SM::Session,
632        request: &BlockFifoRequest,
633    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
634        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
635        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
636            .ok_or(zx::Status::INVALID_ARGS)
637            .and_then(|code| {
638                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
639                    if request.length == 0 {
640                        return Err(zx::Status::INVALID_ARGS);
641                    }
642                    // Make sure the end offsets won't wrap.
643                    if request.dev_offset.checked_add(request.length as u64).is_none()
644                        || (code != BlockOpcode::Trim
645                            && (request.length as u64 * self.block_size as u64)
646                                .checked_add(request.vmo_offset)
647                                .is_none())
648                    {
649                        return Err(zx::Status::OUT_OF_RANGE);
650                    }
651                }
652                Ok(match code {
653                    BlockOpcode::Read => Operation::Read {
654                        device_block_offset: request.dev_offset,
655                        block_count: request.length,
656                        _unused: 0,
657                        vmo_offset: request
658                            .vmo_offset
659                            .checked_mul(self.block_size as u64)
660                            .ok_or(zx::Status::OUT_OF_RANGE)?,
661                    },
662                    BlockOpcode::Write => {
663                        let mut options = WriteOptions::empty();
664                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
665                            options |= WriteOptions::FORCE_ACCESS;
666                        }
667                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
668                            options |= WriteOptions::PRE_BARRIER;
669                        }
670                        Operation::Write {
671                            device_block_offset: request.dev_offset,
672                            block_count: request.length,
673                            options: options,
674                            vmo_offset: request
675                                .vmo_offset
676                                .checked_mul(self.block_size as u64)
677                                .ok_or(zx::Status::OUT_OF_RANGE)?,
678                        }
679                    }
680                    BlockOpcode::Flush => Operation::Flush,
681                    BlockOpcode::Trim => Operation::Trim {
682                        device_block_offset: request.dev_offset,
683                        block_count: request.length,
684                    },
685                    BlockOpcode::CloseVmo => Operation::CloseVmo,
686                })
687            });
688
689        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
690            GroupOrRequest::Group(request.group)
691        } else {
692            GroupOrRequest::Request(request.reqid)
693        };
694
695        let mut active_requests = self.session_manager.active_requests().0.lock();
696        let mut request_id = None;
697
698        // Multiple Block I/O request may be sent as a group.
699        // Notes:
700        // - the group is identified by the group id in the request
701        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
702        //   flag is set.
703        // - when processing a request of a group fails, subsequent requests of that
704        //   group will not be processed.
705        //
706        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
707        if group_or_request.is_group() {
708            // Search for an existing entry that matches this group.  NOTE: This is a potentially
709            // expensive way to find a group (it's iterating over all slots in the active-requests
710            // slab).  This can be optimised easily should we need to.
711            for (key, group) in &mut active_requests.requests {
712                if group.group_or_request == group_or_request {
713                    if group.req_id.is_some() {
714                        // We have already received a request tagged as last.
715                        if group.status == zx::Status::OK {
716                            group.status = zx::Status::INVALID_ARGS;
717                        }
718                        // Ignore this request.
719                        return Err(None);
720                    }
721                    if flags.contains(BlockIoFlag::GROUP_LAST) {
722                        group.req_id = Some(request.reqid);
723                        // If the group has had an error, there is no point trying to issue this
724                        // request.
725                        if group.status != zx::Status::OK {
726                            operation = Err(group.status);
727                        }
728                    } else if group.status != zx::Status::OK {
729                        // The group has already encountered an error, so there is no point trying
730                        // to issue this request.
731                        return Err(None);
732                    }
733                    request_id = Some(RequestId(key));
734                    group.count += 1;
735                    break;
736                }
737            }
738        }
739
740        let mut retain_vmo = false;
741        let vmo = match &operation {
742            Ok(Operation::Read { .. } | Operation::Write { .. }) => {
743                self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
744                    retain_vmo = true;
745                    Ok(Some(vmo))
746                })
747            }
748            Ok(Operation::CloseVmo) => {
749                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
750                    active_requests.vmo_bin.add(vmo.clone());
751                    Ok(Some(vmo))
752                })
753            }
754            _ => Ok(None),
755        }
756        .unwrap_or_else(|e| {
757            operation = Err(e);
758            None
759        });
760
761        let trace_flow_id = NonZero::new(request.trace_flow_id);
762        let request_id = request_id.unwrap_or_else(|| {
763            let vmo_bin_key =
764                if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
765            RequestId(active_requests.requests.insert(ActiveRequest {
766                session,
767                group_or_request,
768                trace_flow_id,
769                vmo_bin_key,
770                status: zx::Status::OK,
771                count: 1,
772                req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
773                    || flags.contains(BlockIoFlag::GROUP_LAST)
774                {
775                    Some(request.reqid)
776                } else {
777                    None
778                },
779            }))
780        });
781
782        Ok(DecodedRequest {
783            request_id,
784            trace_flow_id,
785            operation: operation.map_err(|status| {
786                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
787            })?,
788            vmo,
789        })
790    }
791
792    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
793        std::mem::take(&mut *self.vmos.lock())
794    }
795
796    /// Maps the request and returns the mapped request with an optional remainder.
797    fn map_request(
798        &self,
799        mut request: DecodedRequest,
800    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
801        let mut active_requests = self.session_manager.active_requests().0.lock();
802        let active_request = &mut active_requests.requests[request.request_id.0];
803        if active_request.status != zx::Status::OK {
804            return Err(active_requests
805                .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
806                .map(|(_, r)| r));
807        }
808        let mapping = self.offset_map.mapping();
809        match (mapping, request.operation.blocks()) {
810            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
811                return Err(active_requests
812                    .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
813                    .map(|(_, r)| r));
814            }
815            _ => {}
816        }
817        let remainder = request.operation.map(
818            self.offset_map.mapping(),
819            self.offset_map.max_transfer_blocks(),
820            self.block_size,
821        );
822        if remainder.is_some() {
823            active_request.count += 1;
824        }
825        static CACHE: AtomicU64 = AtomicU64::new(0);
826        if let Some(context) =
827            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
828        {
829            use fuchsia_trace::ArgValue;
830            let trace_args = [
831                ArgValue::of("request_id", request.request_id.0),
832                ArgValue::of("opcode", request.operation.trace_label()),
833            ];
834            let _scope = fuchsia_trace::duration(
835                c"storage",
836                c"block_server::start_transaction",
837                &trace_args,
838            );
839            if let Some(trace_flow_id) = active_request.trace_flow_id {
840                fuchsia_trace::flow_step(
841                    &context,
842                    c"block_server::start_transaction",
843                    trace_flow_id.get().into(),
844                    &[],
845                );
846            }
847        }
848        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
849        Ok((request, remainder))
850    }
851
852    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
853        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
854    }
855}
856
857#[repr(transparent)]
858#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
859pub struct RequestId(usize);
860
861#[derive(Clone, Debug)]
862struct DecodedRequest {
863    request_id: RequestId,
864    trace_flow_id: TraceFlowId,
865    operation: Operation,
866    vmo: Option<Arc<zx::Vmo>>,
867}
868
869/// cbindgen:no-export
870pub type WriteOptions = block_protocol::WriteOptions;
871
872#[repr(C)]
873#[derive(Clone, Debug, PartialEq, Eq)]
874pub enum Operation {
875    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
876    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
877    // code can read `device_block_offset` from the read variant and then assume it's valid for the
878    // write variant).
879    Read {
880        device_block_offset: u64,
881        block_count: u32,
882        _unused: u32,
883        vmo_offset: u64,
884    },
885    Write {
886        device_block_offset: u64,
887        block_count: u32,
888        options: WriteOptions,
889        vmo_offset: u64,
890    },
891    Flush,
892    Trim {
893        device_block_offset: u64,
894        block_count: u32,
895    },
896    /// This will never be seen by the C interface.
897    CloseVmo,
898}
899
900impl Operation {
901    fn trace_label(&self) -> &'static str {
902        match self {
903            Operation::Read { .. } => "read",
904            Operation::Write { .. } => "write",
905            Operation::Flush { .. } => "flush",
906            Operation::Trim { .. } => "trim",
907            Operation::CloseVmo { .. } => "close_vmo",
908        }
909    }
910
911    /// Returns (offset, length).
912    fn blocks(&self) -> Option<(u64, u32)> {
913        match self {
914            Operation::Read { device_block_offset, block_count, .. }
915            | Operation::Write { device_block_offset, block_count, .. }
916            | Operation::Trim { device_block_offset, block_count, .. } => {
917                Some((*device_block_offset, *block_count))
918            }
919            _ => None,
920        }
921    }
922
923    /// Returns mutable references to (offset, length).
924    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
925        match self {
926            Operation::Read { device_block_offset, block_count, .. }
927            | Operation::Write { device_block_offset, block_count, .. }
928            | Operation::Trim { device_block_offset, block_count, .. } => {
929                Some((device_block_offset, block_count))
930            }
931            _ => None,
932        }
933    }
934
935    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
936    /// start of the operation.
937    fn map(
938        &mut self,
939        mapping: Option<&BlockOffsetMapping>,
940        max_blocks: Option<NonZero<u32>>,
941        block_size: u32,
942    ) -> Option<Self> {
943        let mut max = match self {
944            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
945            _ => None,
946        };
947        let (offset, length) = self.blocks_mut()?;
948        let orig_offset = *offset;
949        if let Some(mapping) = mapping {
950            let delta = *offset - mapping.source_block_offset;
951            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
952            *offset = mapping.target_block_offset + delta;
953            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
954            max = match max {
955                None => Some(mapping_max),
956                Some(m) => Some(std::cmp::min(m, mapping_max)),
957            };
958        };
959        if let Some(max) = max {
960            if *length as u64 > max {
961                let rem = (*length as u64 - max) as u32;
962                *length = max as u32;
963                return Some(match self {
964                    Operation::Read {
965                        device_block_offset: _,
966                        block_count: _,
967                        vmo_offset,
968                        _unused,
969                    } => Operation::Read {
970                        device_block_offset: orig_offset + max,
971                        block_count: rem,
972                        vmo_offset: *vmo_offset + max * block_size as u64,
973                        _unused: *_unused,
974                    },
975                    Operation::Write {
976                        device_block_offset: _,
977                        block_count: _,
978                        vmo_offset,
979                        options,
980                    } => {
981                        // Only send the barrier flag once per write request.
982                        let options = *options & !WriteOptions::PRE_BARRIER;
983                        Operation::Write {
984                            device_block_offset: orig_offset + max,
985                            block_count: rem,
986                            vmo_offset: *vmo_offset + max * block_size as u64,
987                            options,
988                        }
989                    }
990                    Operation::Trim { device_block_offset: _, block_count: _ } => {
991                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
992                    }
993                    _ => unreachable!(),
994                });
995            }
996        }
997        None
998    }
999}
1000
1001#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1002pub enum GroupOrRequest {
1003    Group(u16),
1004    Request(u32),
1005}
1006
1007impl GroupOrRequest {
1008    fn is_group(&self) -> bool {
1009        matches!(self, Self::Group(_))
1010    }
1011
1012    fn group_id(&self) -> Option<u16> {
1013        match self {
1014            Self::Group(id) => Some(*id),
1015            Self::Request(_) => None,
1016        }
1017    }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use super::{
1023        BlockOffsetMapping, BlockServer, DeviceInfo, Operation, PartitionInfo, TraceFlowId,
1024        FIFO_MAX_REQUESTS,
1025    };
1026    use assert_matches::assert_matches;
1027    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
1028    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1029    use fuchsia_sync::Mutex;
1030    use futures::channel::oneshot;
1031    use futures::future::BoxFuture;
1032    use futures::FutureExt as _;
1033    use std::borrow::Cow;
1034    use std::future::poll_fn;
1035    use std::num::NonZero;
1036    use std::pin::pin;
1037    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1038    use std::sync::Arc;
1039    use std::task::{Context, Poll};
1040    use zx::{AsHandleRef as _, HandleBased as _};
1041    use {
1042        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1043        fuchsia_async as fasync,
1044    };
1045
1046    #[derive(Default)]
1047    struct MockInterface {
1048        read_hook: Option<
1049            Box<
1050                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1051                    + Send
1052                    + Sync,
1053            >,
1054        >,
1055        write_hook:
1056            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1057        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1058    }
1059
1060    impl super::async_interface::Interface for MockInterface {
1061        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1062            Ok(())
1063        }
1064
1065        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1066            Ok(Cow::Owned(test_device_info()))
1067        }
1068
1069        async fn read(
1070            &self,
1071            device_block_offset: u64,
1072            block_count: u32,
1073            vmo: &Arc<zx::Vmo>,
1074            vmo_offset: u64,
1075            _trace_flow_id: TraceFlowId,
1076        ) -> Result<(), zx::Status> {
1077            if let Some(read_hook) = &self.read_hook {
1078                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1079            } else {
1080                unimplemented!();
1081            }
1082        }
1083
1084        async fn write(
1085            &self,
1086            device_block_offset: u64,
1087            _block_count: u32,
1088            _vmo: &Arc<zx::Vmo>,
1089            _vmo_offset: u64,
1090            _opts: WriteOptions,
1091            _trace_flow_id: TraceFlowId,
1092        ) -> Result<(), zx::Status> {
1093            if let Some(write_hook) = &self.write_hook {
1094                write_hook(device_block_offset).await
1095            } else {
1096                unimplemented!();
1097            }
1098        }
1099
1100        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1101            unreachable!();
1102        }
1103
1104        fn barrier(&self) -> Result<(), zx::Status> {
1105            if let Some(barrier_hook) = &self.barrier_hook {
1106                barrier_hook()
1107            } else {
1108                unimplemented!();
1109            }
1110        }
1111
1112        async fn trim(
1113            &self,
1114            _device_block_offset: u64,
1115            _block_count: u32,
1116            _trace_flow_id: TraceFlowId,
1117        ) -> Result<(), zx::Status> {
1118            unreachable!();
1119        }
1120
1121        async fn get_volume_info(
1122            &self,
1123        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1124            // Hang forever for the test_requests_dont_block_sessions test.
1125            let () = std::future::pending().await;
1126            unreachable!();
1127        }
1128    }
1129
1130    const BLOCK_SIZE: u32 = 512;
1131    const MAX_TRANSFER_BLOCKS: u32 = 10;
1132
1133    fn test_device_info() -> DeviceInfo {
1134        DeviceInfo::Partition(PartitionInfo {
1135            device_flags: fblock::Flag::READONLY,
1136            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1137            block_range: Some(0..100),
1138            type_guid: [1; 16],
1139            instance_guid: [2; 16],
1140            name: "foo".to_string(),
1141            flags: 0xabcd,
1142        })
1143    }
1144
1145    #[fuchsia::test]
1146    async fn test_split_request_only_issues_single_barrier() {
1147        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1148        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1149        let barrier_called = Arc::new(AtomicU32::new(0));
1150        let barrier_called_clone = barrier_called.clone();
1151
1152        futures::join!(
1153            async move {
1154                let block_server = BlockServer::new(
1155                    BLOCK_SIZE,
1156                    Arc::new(MockInterface {
1157                        barrier_hook: Some(Box::new(move || {
1158                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1159                            Ok(())
1160                        })),
1161                        write_hook: Some(Box::new(move |_device_block_offset| {
1162                            Box::pin(async move { Ok(()) })
1163                        })),
1164                        ..MockInterface::default()
1165                    }),
1166                );
1167                block_server.handle_requests(stream).await.unwrap();
1168            },
1169            async move {
1170                let (session_proxy, server) = fidl::endpoints::create_proxy();
1171
1172                proxy.open_session(server).unwrap();
1173
1174                let vmo_id = session_proxy
1175                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1176                    .await
1177                    .unwrap()
1178                    .unwrap();
1179                assert_ne!(vmo_id.id, 0);
1180
1181                let mut fifo =
1182                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1183                let (mut reader, mut writer) = fifo.async_io();
1184                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1185                // sub requests.
1186                writer
1187                    .write_entries(&BlockFifoRequest {
1188                        command: BlockFifoCommand {
1189                            opcode: BlockOpcode::Write.into_primitive(),
1190                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1191                            ..Default::default()
1192                        },
1193                        vmoid: vmo_id.id,
1194                        dev_offset: 0,
1195                        length: MAX_TRANSFER_BLOCKS + 1,
1196                        vmo_offset: 6,
1197                        ..Default::default()
1198                    })
1199                    .await
1200                    .unwrap();
1201
1202                let mut response = BlockFifoResponse::default();
1203                reader.read_entries(&mut response).await.unwrap();
1204                assert_eq!(response.status, zx::sys::ZX_OK);
1205
1206                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1207
1208                std::mem::drop(proxy);
1209            }
1210        );
1211    }
1212
1213    #[fuchsia::test]
1214    async fn test_barriers_not_supported() {
1215        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1216        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1217
1218        futures::join!(
1219            async move {
1220                let block_server = BlockServer::new(
1221                    BLOCK_SIZE,
1222                    Arc::new(MockInterface {
1223                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1224                        write_hook: Some(Box::new(move |_device_block_offset| {
1225                            Box::pin(async move { Ok(()) })
1226                        })),
1227                        ..MockInterface::default()
1228                    }),
1229                );
1230                block_server.handle_requests(stream).await.unwrap();
1231            },
1232            async move {
1233                let (session_proxy, server) = fidl::endpoints::create_proxy();
1234
1235                proxy.open_session(server).unwrap();
1236
1237                let vmo_id = session_proxy
1238                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1239                    .await
1240                    .unwrap()
1241                    .unwrap();
1242                assert_ne!(vmo_id.id, 0);
1243
1244                let mut fifo =
1245                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1246                let (mut reader, mut writer) = fifo.async_io();
1247
1248                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1249                // sub requests.
1250                writer
1251                    .write_entries(&BlockFifoRequest {
1252                        command: BlockFifoCommand {
1253                            opcode: BlockOpcode::Write.into_primitive(),
1254                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1255                            ..Default::default()
1256                        },
1257                        vmoid: vmo_id.id,
1258                        dev_offset: 0,
1259                        length: MAX_TRANSFER_BLOCKS + 1,
1260                        vmo_offset: 6,
1261                        ..Default::default()
1262                    })
1263                    .await
1264                    .unwrap();
1265
1266                let mut response = BlockFifoResponse::default();
1267                reader.read_entries(&mut response).await.unwrap();
1268                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1269
1270                std::mem::drop(proxy);
1271            }
1272        );
1273    }
1274
1275    #[fuchsia::test]
1276    async fn test_barriers_ordering() {
1277        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1278        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1279        let barrier_called = Arc::new(AtomicBool::new(false));
1280
1281        futures::join!(
1282            async move {
1283                let barrier_called_clone = barrier_called.clone();
1284                let block_server = BlockServer::new(
1285                    BLOCK_SIZE,
1286                    Arc::new(MockInterface {
1287                        barrier_hook: Some(Box::new(move || {
1288                            barrier_called.store(true, Ordering::Relaxed);
1289                            Ok(())
1290                        })),
1291                        write_hook: Some(Box::new(move |device_block_offset| {
1292                            let barrier_called = barrier_called_clone.clone();
1293                            Box::pin(async move {
1294                                // The sleep allows the server to reorder the fifo requests.
1295                                if device_block_offset % 2 == 0 {
1296                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1297                                        zx::MonotonicDuration::from_millis(200),
1298                                    ))
1299                                    .await;
1300                                }
1301                                assert!(barrier_called.load(Ordering::Relaxed));
1302                                Ok(())
1303                            })
1304                        })),
1305                        ..MockInterface::default()
1306                    }),
1307                );
1308                block_server.handle_requests(stream).await.unwrap();
1309            },
1310            async move {
1311                let (session_proxy, server) = fidl::endpoints::create_proxy();
1312
1313                proxy.open_session(server).unwrap();
1314
1315                let vmo_id = session_proxy
1316                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1317                    .await
1318                    .unwrap()
1319                    .unwrap();
1320                assert_ne!(vmo_id.id, 0);
1321
1322                let mut fifo =
1323                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1324                let (mut reader, mut writer) = fifo.async_io();
1325
1326                writer
1327                    .write_entries(&BlockFifoRequest {
1328                        command: BlockFifoCommand {
1329                            opcode: BlockOpcode::Write.into_primitive(),
1330                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1331                            ..Default::default()
1332                        },
1333                        vmoid: vmo_id.id,
1334                        dev_offset: 0,
1335                        length: 5,
1336                        vmo_offset: 6,
1337                        ..Default::default()
1338                    })
1339                    .await
1340                    .unwrap();
1341
1342                for i in 0..10 {
1343                    writer
1344                        .write_entries(&BlockFifoRequest {
1345                            command: BlockFifoCommand {
1346                                opcode: BlockOpcode::Write.into_primitive(),
1347                                ..Default::default()
1348                            },
1349                            vmoid: vmo_id.id,
1350                            dev_offset: i + 1,
1351                            length: 5,
1352                            vmo_offset: 6,
1353                            ..Default::default()
1354                        })
1355                        .await
1356                        .unwrap();
1357                }
1358                for _ in 0..11 {
1359                    let mut response = BlockFifoResponse::default();
1360                    reader.read_entries(&mut response).await.unwrap();
1361                    assert_eq!(response.status, zx::sys::ZX_OK);
1362                }
1363
1364                std::mem::drop(proxy);
1365            }
1366        );
1367    }
1368
1369    #[fuchsia::test]
1370    async fn test_info() {
1371        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1372
1373        futures::join!(
1374            async {
1375                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1376                block_server.handle_requests(stream).await.unwrap();
1377            },
1378            async {
1379                let expected_info = test_device_info();
1380                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1381                    info
1382                } else {
1383                    unreachable!()
1384                };
1385
1386                let block_info = proxy.get_info().await.unwrap().unwrap();
1387                assert_eq!(
1388                    block_info.block_count,
1389                    partition_info.block_range.as_ref().unwrap().end
1390                        - partition_info.block_range.as_ref().unwrap().start
1391                );
1392                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1393
1394                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1395
1396                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1397                assert_eq!(status, zx::sys::ZX_OK);
1398                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1399
1400                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1401                assert_eq!(status, zx::sys::ZX_OK);
1402                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1403
1404                let (status, name) = proxy.get_name().await.unwrap();
1405                assert_eq!(status, zx::sys::ZX_OK);
1406                assert_eq!(name.as_ref(), Some(&partition_info.name));
1407
1408                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1409                assert_eq!(metadata.name, name);
1410                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1411                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1412                assert_eq!(
1413                    metadata.start_block_offset,
1414                    Some(partition_info.block_range.as_ref().unwrap().start)
1415                );
1416                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1417                assert_eq!(metadata.flags, Some(partition_info.flags));
1418
1419                std::mem::drop(proxy);
1420            }
1421        );
1422    }
1423
1424    #[fuchsia::test]
1425    async fn test_attach_vmo() {
1426        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1427
1428        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1429        let koid = vmo.get_koid().unwrap();
1430
1431        futures::join!(
1432            async {
1433                let block_server = BlockServer::new(
1434                    BLOCK_SIZE,
1435                    Arc::new(MockInterface {
1436                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1437                            assert_eq!(vmo.get_koid().unwrap(), koid);
1438                            Box::pin(async { Ok(()) })
1439                        })),
1440                        ..MockInterface::default()
1441                    }),
1442                );
1443                block_server.handle_requests(stream).await.unwrap();
1444            },
1445            async move {
1446                let (session_proxy, server) = fidl::endpoints::create_proxy();
1447
1448                proxy.open_session(server).unwrap();
1449
1450                let vmo_id = session_proxy
1451                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1452                    .await
1453                    .unwrap()
1454                    .unwrap();
1455                assert_ne!(vmo_id.id, 0);
1456
1457                let mut fifo =
1458                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1459                let (mut reader, mut writer) = fifo.async_io();
1460
1461                // Keep attaching VMOs until we eventually hit the maximum.
1462                let mut count = 1;
1463                loop {
1464                    match session_proxy
1465                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1466                        .await
1467                        .unwrap()
1468                    {
1469                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1470                        Err(e) => {
1471                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1472                            break;
1473                        }
1474                    }
1475
1476                    // Only test every 10 to keep test time down.
1477                    if count % 10 == 0 {
1478                        writer
1479                            .write_entries(&BlockFifoRequest {
1480                                command: BlockFifoCommand {
1481                                    opcode: BlockOpcode::Read.into_primitive(),
1482                                    ..Default::default()
1483                                },
1484                                vmoid: vmo_id.id,
1485                                length: 1,
1486                                ..Default::default()
1487                            })
1488                            .await
1489                            .unwrap();
1490
1491                        let mut response = BlockFifoResponse::default();
1492                        reader.read_entries(&mut response).await.unwrap();
1493                        assert_eq!(response.status, zx::sys::ZX_OK);
1494                    }
1495
1496                    count += 1;
1497                }
1498
1499                assert_eq!(count, u16::MAX as u64);
1500
1501                // Detach the original VMO, and make sure we can then attach another one.
1502                writer
1503                    .write_entries(&BlockFifoRequest {
1504                        command: BlockFifoCommand {
1505                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1506                            ..Default::default()
1507                        },
1508                        vmoid: vmo_id.id,
1509                        ..Default::default()
1510                    })
1511                    .await
1512                    .unwrap();
1513
1514                let mut response = BlockFifoResponse::default();
1515                reader.read_entries(&mut response).await.unwrap();
1516                assert_eq!(response.status, zx::sys::ZX_OK);
1517
1518                let new_vmo_id = session_proxy
1519                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1520                    .await
1521                    .unwrap()
1522                    .unwrap();
1523                // It should reuse the same ID.
1524                assert_eq!(new_vmo_id.id, vmo_id.id);
1525
1526                std::mem::drop(proxy);
1527            }
1528        );
1529    }
1530
1531    #[fuchsia::test]
1532    async fn test_close() {
1533        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1534
1535        let mut server = std::pin::pin!(async {
1536            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1537            block_server.handle_requests(stream).await.unwrap();
1538        }
1539        .fuse());
1540
1541        let mut client = std::pin::pin!(async {
1542            let (session_proxy, server) = fidl::endpoints::create_proxy();
1543
1544            proxy.open_session(server).unwrap();
1545
1546            // Dropping the proxy should not cause the session to terminate because the session is
1547            // still live.
1548            std::mem::drop(proxy);
1549
1550            session_proxy.close().await.unwrap().unwrap();
1551
1552            // Keep the session alive.  Calling `close` should cause the server to terminate.
1553            let _: () = std::future::pending().await;
1554        }
1555        .fuse());
1556
1557        futures::select!(
1558            _ = server => {}
1559            _ = client => unreachable!(),
1560        );
1561    }
1562
1563    #[derive(Default)]
1564    struct IoMockInterface {
1565        do_checks: bool,
1566        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1567        return_errors: bool,
1568    }
1569
1570    #[derive(Debug)]
1571    enum ExpectedOp {
1572        Read(u64, u32, u64),
1573        Write(u64, u32, u64),
1574        Trim(u64, u32),
1575        Flush,
1576    }
1577
1578    impl super::async_interface::Interface for IoMockInterface {
1579        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1580            Ok(())
1581        }
1582
1583        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1584            Ok(Cow::Owned(test_device_info()))
1585        }
1586
1587        async fn read(
1588            &self,
1589            device_block_offset: u64,
1590            block_count: u32,
1591            _vmo: &Arc<zx::Vmo>,
1592            vmo_offset: u64,
1593            _trace_flow_id: TraceFlowId,
1594        ) -> Result<(), zx::Status> {
1595            if self.return_errors {
1596                Err(zx::Status::INTERNAL)
1597            } else {
1598                if self.do_checks {
1599                    assert_matches!(
1600                        self.expected_op.lock().take(),
1601                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1602                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1603                        "Read {device_block_offset} {block_count} {vmo_offset}"
1604                    );
1605                }
1606                Ok(())
1607            }
1608        }
1609
1610        async fn write(
1611            &self,
1612            device_block_offset: u64,
1613            block_count: u32,
1614            _vmo: &Arc<zx::Vmo>,
1615            vmo_offset: u64,
1616            _opts: WriteOptions,
1617            _trace_flow_id: TraceFlowId,
1618        ) -> Result<(), zx::Status> {
1619            if self.return_errors {
1620                Err(zx::Status::NOT_SUPPORTED)
1621            } else {
1622                if self.do_checks {
1623                    assert_matches!(
1624                        self.expected_op.lock().take(),
1625                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1626                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1627                        "Write {device_block_offset} {block_count} {vmo_offset}"
1628                    );
1629                }
1630                Ok(())
1631            }
1632        }
1633
1634        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1635            if self.return_errors {
1636                Err(zx::Status::NO_RESOURCES)
1637            } else {
1638                if self.do_checks {
1639                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1640                }
1641                Ok(())
1642            }
1643        }
1644
1645        fn barrier(&self) -> Result<(), zx::Status> {
1646            unreachable!()
1647        }
1648
1649        async fn trim(
1650            &self,
1651            device_block_offset: u64,
1652            block_count: u32,
1653            _trace_flow_id: TraceFlowId,
1654        ) -> Result<(), zx::Status> {
1655            if self.return_errors {
1656                Err(zx::Status::NO_MEMORY)
1657            } else {
1658                if self.do_checks {
1659                    assert_matches!(
1660                        self.expected_op.lock().take(),
1661                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1662                            block_count == b,
1663                        "Trim {device_block_offset} {block_count}"
1664                    );
1665                }
1666                Ok(())
1667            }
1668        }
1669    }
1670
1671    #[fuchsia::test]
1672    async fn test_io() {
1673        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1674
1675        let expected_op = Arc::new(Mutex::new(None));
1676        let expected_op_clone = expected_op.clone();
1677
1678        let server = async {
1679            let block_server = BlockServer::new(
1680                BLOCK_SIZE,
1681                Arc::new(IoMockInterface {
1682                    return_errors: false,
1683                    do_checks: true,
1684                    expected_op: expected_op_clone,
1685                }),
1686            );
1687            block_server.handle_requests(stream).await.unwrap();
1688        };
1689
1690        let client = async move {
1691            let (session_proxy, server) = fidl::endpoints::create_proxy();
1692
1693            proxy.open_session(server).unwrap();
1694
1695            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1696            let vmo_id = session_proxy
1697                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1698                .await
1699                .unwrap()
1700                .unwrap();
1701
1702            let mut fifo =
1703                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1704            let (mut reader, mut writer) = fifo.async_io();
1705
1706            // READ
1707            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1708            writer
1709                .write_entries(&BlockFifoRequest {
1710                    command: BlockFifoCommand {
1711                        opcode: BlockOpcode::Read.into_primitive(),
1712                        ..Default::default()
1713                    },
1714                    vmoid: vmo_id.id,
1715                    dev_offset: 1,
1716                    length: 2,
1717                    vmo_offset: 3,
1718                    ..Default::default()
1719                })
1720                .await
1721                .unwrap();
1722
1723            let mut response = BlockFifoResponse::default();
1724            reader.read_entries(&mut response).await.unwrap();
1725            assert_eq!(response.status, zx::sys::ZX_OK);
1726
1727            // WRITE
1728            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1729            writer
1730                .write_entries(&BlockFifoRequest {
1731                    command: BlockFifoCommand {
1732                        opcode: BlockOpcode::Write.into_primitive(),
1733                        ..Default::default()
1734                    },
1735                    vmoid: vmo_id.id,
1736                    dev_offset: 4,
1737                    length: 5,
1738                    vmo_offset: 6,
1739                    ..Default::default()
1740                })
1741                .await
1742                .unwrap();
1743
1744            let mut response = BlockFifoResponse::default();
1745            reader.read_entries(&mut response).await.unwrap();
1746            assert_eq!(response.status, zx::sys::ZX_OK);
1747
1748            // FLUSH
1749            *expected_op.lock() = Some(ExpectedOp::Flush);
1750            writer
1751                .write_entries(&BlockFifoRequest {
1752                    command: BlockFifoCommand {
1753                        opcode: BlockOpcode::Flush.into_primitive(),
1754                        ..Default::default()
1755                    },
1756                    ..Default::default()
1757                })
1758                .await
1759                .unwrap();
1760
1761            reader.read_entries(&mut response).await.unwrap();
1762            assert_eq!(response.status, zx::sys::ZX_OK);
1763
1764            // TRIM
1765            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1766            writer
1767                .write_entries(&BlockFifoRequest {
1768                    command: BlockFifoCommand {
1769                        opcode: BlockOpcode::Trim.into_primitive(),
1770                        ..Default::default()
1771                    },
1772                    dev_offset: 7,
1773                    length: 8,
1774                    ..Default::default()
1775                })
1776                .await
1777                .unwrap();
1778
1779            reader.read_entries(&mut response).await.unwrap();
1780            assert_eq!(response.status, zx::sys::ZX_OK);
1781
1782            std::mem::drop(proxy);
1783        };
1784
1785        futures::join!(server, client);
1786    }
1787
1788    #[fuchsia::test]
1789    async fn test_io_errors() {
1790        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1791
1792        futures::join!(
1793            async {
1794                let block_server = BlockServer::new(
1795                    BLOCK_SIZE,
1796                    Arc::new(IoMockInterface {
1797                        return_errors: true,
1798                        do_checks: false,
1799                        expected_op: Arc::new(Mutex::new(None)),
1800                    }),
1801                );
1802                block_server.handle_requests(stream).await.unwrap();
1803            },
1804            async move {
1805                let (session_proxy, server) = fidl::endpoints::create_proxy();
1806
1807                proxy.open_session(server).unwrap();
1808
1809                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1810                let vmo_id = session_proxy
1811                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1812                    .await
1813                    .unwrap()
1814                    .unwrap();
1815
1816                let mut fifo =
1817                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1818                let (mut reader, mut writer) = fifo.async_io();
1819
1820                // READ
1821                writer
1822                    .write_entries(&BlockFifoRequest {
1823                        command: BlockFifoCommand {
1824                            opcode: BlockOpcode::Read.into_primitive(),
1825                            ..Default::default()
1826                        },
1827                        vmoid: vmo_id.id,
1828                        length: 1,
1829                        reqid: 1,
1830                        ..Default::default()
1831                    })
1832                    .await
1833                    .unwrap();
1834
1835                let mut response = BlockFifoResponse::default();
1836                reader.read_entries(&mut response).await.unwrap();
1837                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1838
1839                // WRITE
1840                writer
1841                    .write_entries(&BlockFifoRequest {
1842                        command: BlockFifoCommand {
1843                            opcode: BlockOpcode::Write.into_primitive(),
1844                            ..Default::default()
1845                        },
1846                        vmoid: vmo_id.id,
1847                        length: 1,
1848                        reqid: 2,
1849                        ..Default::default()
1850                    })
1851                    .await
1852                    .unwrap();
1853
1854                reader.read_entries(&mut response).await.unwrap();
1855                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1856
1857                // FLUSH
1858                writer
1859                    .write_entries(&BlockFifoRequest {
1860                        command: BlockFifoCommand {
1861                            opcode: BlockOpcode::Flush.into_primitive(),
1862                            ..Default::default()
1863                        },
1864                        reqid: 3,
1865                        ..Default::default()
1866                    })
1867                    .await
1868                    .unwrap();
1869
1870                reader.read_entries(&mut response).await.unwrap();
1871                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1872
1873                // TRIM
1874                writer
1875                    .write_entries(&BlockFifoRequest {
1876                        command: BlockFifoCommand {
1877                            opcode: BlockOpcode::Trim.into_primitive(),
1878                            ..Default::default()
1879                        },
1880                        reqid: 4,
1881                        length: 1,
1882                        ..Default::default()
1883                    })
1884                    .await
1885                    .unwrap();
1886
1887                reader.read_entries(&mut response).await.unwrap();
1888                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1889
1890                std::mem::drop(proxy);
1891            }
1892        );
1893    }
1894
1895    #[fuchsia::test]
1896    async fn test_invalid_args() {
1897        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1898
1899        futures::join!(
1900            async {
1901                let block_server = BlockServer::new(
1902                    BLOCK_SIZE,
1903                    Arc::new(IoMockInterface {
1904                        return_errors: false,
1905                        do_checks: false,
1906                        expected_op: Arc::new(Mutex::new(None)),
1907                    }),
1908                );
1909                block_server.handle_requests(stream).await.unwrap();
1910            },
1911            async move {
1912                let (session_proxy, server) = fidl::endpoints::create_proxy();
1913
1914                proxy.open_session(server).unwrap();
1915
1916                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1917                let vmo_id = session_proxy
1918                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1919                    .await
1920                    .unwrap()
1921                    .unwrap();
1922
1923                let mut fifo =
1924                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1925
1926                async fn test(
1927                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1928                    request: BlockFifoRequest,
1929                ) -> Result<(), zx::Status> {
1930                    let (mut reader, mut writer) = fifo.async_io();
1931                    writer.write_entries(&request).await.unwrap();
1932                    let mut response = BlockFifoResponse::default();
1933                    reader.read_entries(&mut response).await.unwrap();
1934                    zx::Status::ok(response.status)
1935                }
1936
1937                // READ
1938
1939                let good_read_request = || BlockFifoRequest {
1940                    command: BlockFifoCommand {
1941                        opcode: BlockOpcode::Read.into_primitive(),
1942                        ..Default::default()
1943                    },
1944                    length: 1,
1945                    vmoid: vmo_id.id,
1946                    ..Default::default()
1947                };
1948
1949                assert_eq!(
1950                    test(
1951                        &mut fifo,
1952                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1953                    )
1954                    .await,
1955                    Err(zx::Status::IO)
1956                );
1957
1958                assert_eq!(
1959                    test(
1960                        &mut fifo,
1961                        BlockFifoRequest {
1962                            vmo_offset: 0xffff_ffff_ffff_ffff,
1963                            ..good_read_request()
1964                        }
1965                    )
1966                    .await,
1967                    Err(zx::Status::OUT_OF_RANGE)
1968                );
1969
1970                assert_eq!(
1971                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1972                    Err(zx::Status::INVALID_ARGS)
1973                );
1974
1975                // WRITE
1976
1977                let good_write_request = || BlockFifoRequest {
1978                    command: BlockFifoCommand {
1979                        opcode: BlockOpcode::Write.into_primitive(),
1980                        ..Default::default()
1981                    },
1982                    length: 1,
1983                    vmoid: vmo_id.id,
1984                    ..Default::default()
1985                };
1986
1987                assert_eq!(
1988                    test(
1989                        &mut fifo,
1990                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1991                    )
1992                    .await,
1993                    Err(zx::Status::IO)
1994                );
1995
1996                assert_eq!(
1997                    test(
1998                        &mut fifo,
1999                        BlockFifoRequest {
2000                            vmo_offset: 0xffff_ffff_ffff_ffff,
2001                            ..good_write_request()
2002                        }
2003                    )
2004                    .await,
2005                    Err(zx::Status::OUT_OF_RANGE)
2006                );
2007
2008                assert_eq!(
2009                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2010                    Err(zx::Status::INVALID_ARGS)
2011                );
2012
2013                // CLOSE VMO
2014
2015                assert_eq!(
2016                    test(
2017                        &mut fifo,
2018                        BlockFifoRequest {
2019                            command: BlockFifoCommand {
2020                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2021                                ..Default::default()
2022                            },
2023                            vmoid: vmo_id.id + 1,
2024                            ..Default::default()
2025                        }
2026                    )
2027                    .await,
2028                    Err(zx::Status::IO)
2029                );
2030
2031                std::mem::drop(proxy);
2032            }
2033        );
2034    }
2035
2036    #[fuchsia::test]
2037    async fn test_concurrent_requests() {
2038        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2039
2040        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2041        let waiting_readers_clone = waiting_readers.clone();
2042
2043        futures::join!(
2044            async move {
2045                let block_server = BlockServer::new(
2046                    BLOCK_SIZE,
2047                    Arc::new(MockInterface {
2048                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2049                            let (tx, rx) = oneshot::channel();
2050                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2051                            Box::pin(async move {
2052                                let _ = rx.await;
2053                                Ok(())
2054                            })
2055                        })),
2056                        ..MockInterface::default()
2057                    }),
2058                );
2059                block_server.handle_requests(stream).await.unwrap();
2060            },
2061            async move {
2062                let (session_proxy, server) = fidl::endpoints::create_proxy();
2063
2064                proxy.open_session(server).unwrap();
2065
2066                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2067                let vmo_id = session_proxy
2068                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2069                    .await
2070                    .unwrap()
2071                    .unwrap();
2072
2073                let mut fifo =
2074                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2075                let (mut reader, mut writer) = fifo.async_io();
2076
2077                writer
2078                    .write_entries(&BlockFifoRequest {
2079                        command: BlockFifoCommand {
2080                            opcode: BlockOpcode::Read.into_primitive(),
2081                            ..Default::default()
2082                        },
2083                        reqid: 1,
2084                        dev_offset: 1, // Intentionally use the same as `reqid`.
2085                        vmoid: vmo_id.id,
2086                        length: 1,
2087                        ..Default::default()
2088                    })
2089                    .await
2090                    .unwrap();
2091
2092                writer
2093                    .write_entries(&BlockFifoRequest {
2094                        command: BlockFifoCommand {
2095                            opcode: BlockOpcode::Read.into_primitive(),
2096                            ..Default::default()
2097                        },
2098                        reqid: 2,
2099                        dev_offset: 2,
2100                        vmoid: vmo_id.id,
2101                        length: 1,
2102                        ..Default::default()
2103                    })
2104                    .await
2105                    .unwrap();
2106
2107                // Wait till both those entries are pending.
2108                poll_fn(|cx: &mut Context<'_>| {
2109                    if waiting_readers.lock().len() == 2 {
2110                        Poll::Ready(())
2111                    } else {
2112                        // Yield to the executor.
2113                        cx.waker().wake_by_ref();
2114                        Poll::Pending
2115                    }
2116                })
2117                .await;
2118
2119                let mut response = BlockFifoResponse::default();
2120                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2121
2122                let (id, tx) = waiting_readers.lock().pop().unwrap();
2123                tx.send(()).unwrap();
2124
2125                reader.read_entries(&mut response).await.unwrap();
2126                assert_eq!(response.status, zx::sys::ZX_OK);
2127                assert_eq!(response.reqid, id);
2128
2129                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2130
2131                let (id, tx) = waiting_readers.lock().pop().unwrap();
2132                tx.send(()).unwrap();
2133
2134                reader.read_entries(&mut response).await.unwrap();
2135                assert_eq!(response.status, zx::sys::ZX_OK);
2136                assert_eq!(response.reqid, id);
2137            }
2138        );
2139    }
2140
2141    #[fuchsia::test]
2142    async fn test_groups() {
2143        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2144
2145        futures::join!(
2146            async move {
2147                let block_server = BlockServer::new(
2148                    BLOCK_SIZE,
2149                    Arc::new(MockInterface {
2150                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2151                        ..MockInterface::default()
2152                    }),
2153                );
2154                block_server.handle_requests(stream).await.unwrap();
2155            },
2156            async move {
2157                let (session_proxy, server) = fidl::endpoints::create_proxy();
2158
2159                proxy.open_session(server).unwrap();
2160
2161                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2162                let vmo_id = session_proxy
2163                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2164                    .await
2165                    .unwrap()
2166                    .unwrap();
2167
2168                let mut fifo =
2169                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2170                let (mut reader, mut writer) = fifo.async_io();
2171
2172                writer
2173                    .write_entries(&BlockFifoRequest {
2174                        command: BlockFifoCommand {
2175                            opcode: BlockOpcode::Read.into_primitive(),
2176                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2177                            ..Default::default()
2178                        },
2179                        group: 1,
2180                        vmoid: vmo_id.id,
2181                        length: 1,
2182                        ..Default::default()
2183                    })
2184                    .await
2185                    .unwrap();
2186
2187                writer
2188                    .write_entries(&BlockFifoRequest {
2189                        command: BlockFifoCommand {
2190                            opcode: BlockOpcode::Read.into_primitive(),
2191                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2192                            ..Default::default()
2193                        },
2194                        reqid: 2,
2195                        group: 1,
2196                        vmoid: vmo_id.id,
2197                        length: 1,
2198                        ..Default::default()
2199                    })
2200                    .await
2201                    .unwrap();
2202
2203                let mut response = BlockFifoResponse::default();
2204                reader.read_entries(&mut response).await.unwrap();
2205                assert_eq!(response.status, zx::sys::ZX_OK);
2206                assert_eq!(response.reqid, 2);
2207                assert_eq!(response.group, 1);
2208            }
2209        );
2210    }
2211
2212    #[fuchsia::test]
2213    async fn test_group_error() {
2214        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2215
2216        let counter = Arc::new(AtomicU64::new(0));
2217        let counter_clone = counter.clone();
2218
2219        futures::join!(
2220            async move {
2221                let block_server = BlockServer::new(
2222                    BLOCK_SIZE,
2223                    Arc::new(MockInterface {
2224                        read_hook: Some(Box::new(move |_, _, _, _| {
2225                            counter_clone.fetch_add(1, Ordering::Relaxed);
2226                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2227                        })),
2228                        ..MockInterface::default()
2229                    }),
2230                );
2231                block_server.handle_requests(stream).await.unwrap();
2232            },
2233            async move {
2234                let (session_proxy, server) = fidl::endpoints::create_proxy();
2235
2236                proxy.open_session(server).unwrap();
2237
2238                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2239                let vmo_id = session_proxy
2240                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2241                    .await
2242                    .unwrap()
2243                    .unwrap();
2244
2245                let mut fifo =
2246                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2247                let (mut reader, mut writer) = fifo.async_io();
2248
2249                writer
2250                    .write_entries(&BlockFifoRequest {
2251                        command: BlockFifoCommand {
2252                            opcode: BlockOpcode::Read.into_primitive(),
2253                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2254                            ..Default::default()
2255                        },
2256                        group: 1,
2257                        vmoid: vmo_id.id,
2258                        length: 1,
2259                        ..Default::default()
2260                    })
2261                    .await
2262                    .unwrap();
2263
2264                // Wait until processed.
2265                poll_fn(|cx: &mut Context<'_>| {
2266                    if counter.load(Ordering::Relaxed) == 1 {
2267                        Poll::Ready(())
2268                    } else {
2269                        // Yield to the executor.
2270                        cx.waker().wake_by_ref();
2271                        Poll::Pending
2272                    }
2273                })
2274                .await;
2275
2276                let mut response = BlockFifoResponse::default();
2277                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2278
2279                writer
2280                    .write_entries(&BlockFifoRequest {
2281                        command: BlockFifoCommand {
2282                            opcode: BlockOpcode::Read.into_primitive(),
2283                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2284                            ..Default::default()
2285                        },
2286                        group: 1,
2287                        vmoid: vmo_id.id,
2288                        length: 1,
2289                        ..Default::default()
2290                    })
2291                    .await
2292                    .unwrap();
2293
2294                writer
2295                    .write_entries(&BlockFifoRequest {
2296                        command: BlockFifoCommand {
2297                            opcode: BlockOpcode::Read.into_primitive(),
2298                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2299                            ..Default::default()
2300                        },
2301                        reqid: 2,
2302                        group: 1,
2303                        vmoid: vmo_id.id,
2304                        length: 1,
2305                        ..Default::default()
2306                    })
2307                    .await
2308                    .unwrap();
2309
2310                reader.read_entries(&mut response).await.unwrap();
2311                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2312                assert_eq!(response.reqid, 2);
2313                assert_eq!(response.group, 1);
2314
2315                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2316
2317                // Only the first request should have been processed.
2318                assert_eq!(counter.load(Ordering::Relaxed), 1);
2319            }
2320        );
2321    }
2322
2323    #[fuchsia::test]
2324    async fn test_group_with_two_lasts() {
2325        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2326
2327        let (tx, rx) = oneshot::channel();
2328
2329        futures::join!(
2330            async move {
2331                let rx = Mutex::new(Some(rx));
2332                let block_server = BlockServer::new(
2333                    BLOCK_SIZE,
2334                    Arc::new(MockInterface {
2335                        read_hook: Some(Box::new(move |_, _, _, _| {
2336                            let rx = rx.lock().take().unwrap();
2337                            Box::pin(async {
2338                                let _ = rx.await;
2339                                Ok(())
2340                            })
2341                        })),
2342                        ..MockInterface::default()
2343                    }),
2344                );
2345                block_server.handle_requests(stream).await.unwrap();
2346            },
2347            async move {
2348                let (session_proxy, server) = fidl::endpoints::create_proxy();
2349
2350                proxy.open_session(server).unwrap();
2351
2352                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2353                let vmo_id = session_proxy
2354                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2355                    .await
2356                    .unwrap()
2357                    .unwrap();
2358
2359                let mut fifo =
2360                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2361                let (mut reader, mut writer) = fifo.async_io();
2362
2363                writer
2364                    .write_entries(&BlockFifoRequest {
2365                        command: BlockFifoCommand {
2366                            opcode: BlockOpcode::Read.into_primitive(),
2367                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2368                            ..Default::default()
2369                        },
2370                        reqid: 1,
2371                        group: 1,
2372                        vmoid: vmo_id.id,
2373                        length: 1,
2374                        ..Default::default()
2375                    })
2376                    .await
2377                    .unwrap();
2378
2379                writer
2380                    .write_entries(&BlockFifoRequest {
2381                        command: BlockFifoCommand {
2382                            opcode: BlockOpcode::Read.into_primitive(),
2383                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2384                            ..Default::default()
2385                        },
2386                        reqid: 2,
2387                        group: 1,
2388                        vmoid: vmo_id.id,
2389                        length: 1,
2390                        ..Default::default()
2391                    })
2392                    .await
2393                    .unwrap();
2394
2395                // Send an independent request to flush through the fifo.
2396                writer
2397                    .write_entries(&BlockFifoRequest {
2398                        command: BlockFifoCommand {
2399                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2400                            ..Default::default()
2401                        },
2402                        reqid: 3,
2403                        vmoid: vmo_id.id,
2404                        ..Default::default()
2405                    })
2406                    .await
2407                    .unwrap();
2408
2409                // It should succeed.
2410                let mut response = BlockFifoResponse::default();
2411                reader.read_entries(&mut response).await.unwrap();
2412                assert_eq!(response.status, zx::sys::ZX_OK);
2413                assert_eq!(response.reqid, 3);
2414
2415                // Now release the original request.
2416                tx.send(()).unwrap();
2417
2418                // The response should be for the first message tagged as last, and it should be
2419                // an error because we sent two messages with the LAST marker.
2420                let mut response = BlockFifoResponse::default();
2421                reader.read_entries(&mut response).await.unwrap();
2422                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2423                assert_eq!(response.reqid, 1);
2424                assert_eq!(response.group, 1);
2425            }
2426        );
2427    }
2428
2429    #[fuchsia::test(allow_stalls = false)]
2430    async fn test_requests_dont_block_sessions() {
2431        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2432
2433        let (tx, rx) = oneshot::channel();
2434
2435        fasync::Task::local(async move {
2436            let rx = Mutex::new(Some(rx));
2437            let block_server = BlockServer::new(
2438                BLOCK_SIZE,
2439                Arc::new(MockInterface {
2440                    read_hook: Some(Box::new(move |_, _, _, _| {
2441                        let rx = rx.lock().take().unwrap();
2442                        Box::pin(async {
2443                            let _ = rx.await;
2444                            Ok(())
2445                        })
2446                    })),
2447                    ..MockInterface::default()
2448                }),
2449            );
2450            block_server.handle_requests(stream).await.unwrap();
2451        })
2452        .detach();
2453
2454        let mut fut = pin!(async {
2455            let (session_proxy, server) = fidl::endpoints::create_proxy();
2456
2457            proxy.open_session(server).unwrap();
2458
2459            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2460            let vmo_id = session_proxy
2461                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2462                .await
2463                .unwrap()
2464                .unwrap();
2465
2466            let mut fifo =
2467                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2468            let (mut reader, mut writer) = fifo.async_io();
2469
2470            writer
2471                .write_entries(&BlockFifoRequest {
2472                    command: BlockFifoCommand {
2473                        opcode: BlockOpcode::Read.into_primitive(),
2474                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2475                        ..Default::default()
2476                    },
2477                    reqid: 1,
2478                    group: 1,
2479                    vmoid: vmo_id.id,
2480                    length: 1,
2481                    ..Default::default()
2482                })
2483                .await
2484                .unwrap();
2485
2486            let mut response = BlockFifoResponse::default();
2487            reader.read_entries(&mut response).await.unwrap();
2488            assert_eq!(response.status, zx::sys::ZX_OK);
2489        });
2490
2491        // The response won't come back until we send on `tx`.
2492        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2493
2494        let mut fut2 = pin!(proxy.get_volume_info());
2495
2496        // get_volume_info is set up to stall forever.
2497        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2498
2499        // If we now free up the first future, it should resolve; the stalled call to
2500        // get_volume_info should not block the fifo response.
2501        let _ = tx.send(());
2502
2503        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2504    }
2505
2506    #[fuchsia::test]
2507    async fn test_request_flow_control() {
2508        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2509
2510        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2511        // server will block until that happens.
2512        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2513        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2514        let event_clone = event.clone();
2515        futures::join!(
2516            async move {
2517                let block_server = BlockServer::new(
2518                    BLOCK_SIZE,
2519                    Arc::new(MockInterface {
2520                        read_hook: Some(Box::new(move |_, _, _, _| {
2521                            let event_clone = event_clone.clone();
2522                            Box::pin(async move {
2523                                if !event_clone.1.load(Ordering::SeqCst) {
2524                                    event_clone.0.listen().await;
2525                                }
2526                                Ok(())
2527                            })
2528                        })),
2529                        ..MockInterface::default()
2530                    }),
2531                );
2532                block_server.handle_requests(stream).await.unwrap();
2533            },
2534            async move {
2535                let (session_proxy, server) = fidl::endpoints::create_proxy();
2536
2537                proxy.open_session(server).unwrap();
2538
2539                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2540                let vmo_id = session_proxy
2541                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2542                    .await
2543                    .unwrap()
2544                    .unwrap();
2545
2546                let mut fifo =
2547                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2548                let (mut reader, mut writer) = fifo.async_io();
2549
2550                for i in 0..MAX_REQUESTS {
2551                    writer
2552                        .write_entries(&BlockFifoRequest {
2553                            command: BlockFifoCommand {
2554                                opcode: BlockOpcode::Read.into_primitive(),
2555                                ..Default::default()
2556                            },
2557                            reqid: (i + 1) as u32,
2558                            dev_offset: i,
2559                            vmoid: vmo_id.id,
2560                            length: 1,
2561                            ..Default::default()
2562                        })
2563                        .await
2564                        .unwrap();
2565                }
2566                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2567                    command: BlockFifoCommand {
2568                        opcode: BlockOpcode::Read.into_primitive(),
2569                        ..Default::default()
2570                    },
2571                    reqid: u32::MAX,
2572                    dev_offset: MAX_REQUESTS,
2573                    vmoid: vmo_id.id,
2574                    length: 1,
2575                    ..Default::default()
2576                })))
2577                .is_pending());
2578                // OK, let the server start to process.
2579                event.1.store(true, Ordering::SeqCst);
2580                event.0.notify(usize::MAX);
2581                // For each entry we read, make sure we can write a new one in.
2582                let mut finished_reqids = vec![];
2583                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2584                    let mut response = BlockFifoResponse::default();
2585                    reader.read_entries(&mut response).await.unwrap();
2586                    assert_eq!(response.status, zx::sys::ZX_OK);
2587                    finished_reqids.push(response.reqid);
2588                    writer
2589                        .write_entries(&BlockFifoRequest {
2590                            command: BlockFifoCommand {
2591                                opcode: BlockOpcode::Read.into_primitive(),
2592                                ..Default::default()
2593                            },
2594                            reqid: (i + 1) as u32,
2595                            dev_offset: i,
2596                            vmoid: vmo_id.id,
2597                            length: 1,
2598                            ..Default::default()
2599                        })
2600                        .await
2601                        .unwrap();
2602                }
2603                let mut response = BlockFifoResponse::default();
2604                for _ in 0..MAX_REQUESTS {
2605                    reader.read_entries(&mut response).await.unwrap();
2606                    assert_eq!(response.status, zx::sys::ZX_OK);
2607                    finished_reqids.push(response.reqid);
2608                }
2609                // Verify that we got a response for each request.  Note that we can't assume FIFO
2610                // ordering.
2611                finished_reqids.sort();
2612                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2613                let mut i = 1;
2614                for reqid in finished_reqids {
2615                    assert_eq!(reqid, i);
2616                    i += 1;
2617                }
2618            }
2619        );
2620    }
2621
2622    #[fuchsia::test]
2623    async fn test_passthrough_io_with_fixed_map() {
2624        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2625
2626        let expected_op = Arc::new(Mutex::new(None));
2627        let expected_op_clone = expected_op.clone();
2628        futures::join!(
2629            async {
2630                let block_server = BlockServer::new(
2631                    BLOCK_SIZE,
2632                    Arc::new(IoMockInterface {
2633                        return_errors: false,
2634                        do_checks: true,
2635                        expected_op: expected_op_clone,
2636                    }),
2637                );
2638                block_server.handle_requests(stream).await.unwrap();
2639            },
2640            async move {
2641                let (session_proxy, server) = fidl::endpoints::create_proxy();
2642
2643                let mappings = [fblock::BlockOffsetMapping {
2644                    source_block_offset: 0,
2645                    target_block_offset: 10,
2646                    length: 20,
2647                }];
2648                proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2649
2650                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2651                let vmo_id = session_proxy
2652                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2653                    .await
2654                    .unwrap()
2655                    .unwrap();
2656
2657                let mut fifo =
2658                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2659                let (mut reader, mut writer) = fifo.async_io();
2660
2661                // READ
2662                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2663                writer
2664                    .write_entries(&BlockFifoRequest {
2665                        command: BlockFifoCommand {
2666                            opcode: BlockOpcode::Read.into_primitive(),
2667                            ..Default::default()
2668                        },
2669                        vmoid: vmo_id.id,
2670                        dev_offset: 1,
2671                        length: 2,
2672                        vmo_offset: 3,
2673                        ..Default::default()
2674                    })
2675                    .await
2676                    .unwrap();
2677
2678                let mut response = BlockFifoResponse::default();
2679                reader.read_entries(&mut response).await.unwrap();
2680                assert_eq!(response.status, zx::sys::ZX_OK);
2681
2682                // WRITE
2683                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2684                writer
2685                    .write_entries(&BlockFifoRequest {
2686                        command: BlockFifoCommand {
2687                            opcode: BlockOpcode::Write.into_primitive(),
2688                            ..Default::default()
2689                        },
2690                        vmoid: vmo_id.id,
2691                        dev_offset: 4,
2692                        length: 5,
2693                        vmo_offset: 6,
2694                        ..Default::default()
2695                    })
2696                    .await
2697                    .unwrap();
2698
2699                reader.read_entries(&mut response).await.unwrap();
2700                assert_eq!(response.status, zx::sys::ZX_OK);
2701
2702                // FLUSH
2703                *expected_op.lock() = Some(ExpectedOp::Flush);
2704                writer
2705                    .write_entries(&BlockFifoRequest {
2706                        command: BlockFifoCommand {
2707                            opcode: BlockOpcode::Flush.into_primitive(),
2708                            ..Default::default()
2709                        },
2710                        ..Default::default()
2711                    })
2712                    .await
2713                    .unwrap();
2714
2715                reader.read_entries(&mut response).await.unwrap();
2716                assert_eq!(response.status, zx::sys::ZX_OK);
2717
2718                // TRIM
2719                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2720                writer
2721                    .write_entries(&BlockFifoRequest {
2722                        command: BlockFifoCommand {
2723                            opcode: BlockOpcode::Trim.into_primitive(),
2724                            ..Default::default()
2725                        },
2726                        dev_offset: 7,
2727                        length: 3,
2728                        ..Default::default()
2729                    })
2730                    .await
2731                    .unwrap();
2732
2733                reader.read_entries(&mut response).await.unwrap();
2734                assert_eq!(response.status, zx::sys::ZX_OK);
2735
2736                // READ past window
2737                *expected_op.lock() = None;
2738                writer
2739                    .write_entries(&BlockFifoRequest {
2740                        command: BlockFifoCommand {
2741                            opcode: BlockOpcode::Read.into_primitive(),
2742                            ..Default::default()
2743                        },
2744                        vmoid: vmo_id.id,
2745                        dev_offset: 19,
2746                        length: 2,
2747                        vmo_offset: 3,
2748                        ..Default::default()
2749                    })
2750                    .await
2751                    .unwrap();
2752
2753                reader.read_entries(&mut response).await.unwrap();
2754                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2755
2756                std::mem::drop(proxy);
2757            }
2758        );
2759    }
2760
2761    #[fuchsia::test]
2762    fn operation_map() {
2763        fn expect_map_result(
2764            mut operation: Operation,
2765            mapping: Option<BlockOffsetMapping>,
2766            max_blocks: Option<NonZero<u32>>,
2767            expected_operations: Vec<Operation>,
2768        ) {
2769            let mut ops = vec![];
2770            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2771                ops.push(operation);
2772                operation = remainder;
2773            }
2774            ops.push(operation);
2775            assert_eq!(ops, expected_operations);
2776        }
2777
2778        // No limits
2779        expect_map_result(
2780            Operation::Read {
2781                device_block_offset: 10,
2782                block_count: 200,
2783                _unused: 0,
2784                vmo_offset: 0,
2785            },
2786            None,
2787            None,
2788            vec![Operation::Read {
2789                device_block_offset: 10,
2790                block_count: 200,
2791                _unused: 0,
2792                vmo_offset: 0,
2793            }],
2794        );
2795
2796        // Max block count
2797        expect_map_result(
2798            Operation::Read {
2799                device_block_offset: 10,
2800                block_count: 200,
2801                _unused: 0,
2802                vmo_offset: 0,
2803            },
2804            None,
2805            NonZero::new(120),
2806            vec![
2807                Operation::Read {
2808                    device_block_offset: 10,
2809                    block_count: 120,
2810                    _unused: 0,
2811                    vmo_offset: 0,
2812                },
2813                Operation::Read {
2814                    device_block_offset: 130,
2815                    block_count: 80,
2816                    _unused: 0,
2817                    vmo_offset: 120 * 512,
2818                },
2819            ],
2820        );
2821        expect_map_result(
2822            Operation::Write {
2823                device_block_offset: 10,
2824                block_count: 200,
2825                options: WriteOptions::PRE_BARRIER,
2826                vmo_offset: 0,
2827            },
2828            None,
2829            NonZero::new(120),
2830            vec![
2831                Operation::Write {
2832                    device_block_offset: 10,
2833                    block_count: 120,
2834                    options: WriteOptions::PRE_BARRIER,
2835                    vmo_offset: 0,
2836                },
2837                Operation::Write {
2838                    device_block_offset: 130,
2839                    block_count: 80,
2840                    options: WriteOptions::empty(),
2841                    vmo_offset: 120 * 512,
2842                },
2843            ],
2844        );
2845        expect_map_result(
2846            Operation::Trim { device_block_offset: 10, block_count: 200 },
2847            None,
2848            NonZero::new(120),
2849            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2850        );
2851
2852        // Remapping + Max block count
2853        expect_map_result(
2854            Operation::Read {
2855                device_block_offset: 10,
2856                block_count: 200,
2857                _unused: 0,
2858                vmo_offset: 0,
2859            },
2860            Some(BlockOffsetMapping {
2861                source_block_offset: 10,
2862                target_block_offset: 100,
2863                length: 200,
2864            }),
2865            NonZero::new(120),
2866            vec![
2867                Operation::Read {
2868                    device_block_offset: 100,
2869                    block_count: 120,
2870                    _unused: 0,
2871                    vmo_offset: 0,
2872                },
2873                Operation::Read {
2874                    device_block_offset: 220,
2875                    block_count: 80,
2876                    _unused: 0,
2877                    vmo_offset: 120 * 512,
2878                },
2879            ],
2880        );
2881        expect_map_result(
2882            Operation::Write {
2883                device_block_offset: 10,
2884                block_count: 200,
2885                options: WriteOptions::PRE_BARRIER,
2886                vmo_offset: 0,
2887            },
2888            Some(BlockOffsetMapping {
2889                source_block_offset: 10,
2890                target_block_offset: 100,
2891                length: 200,
2892            }),
2893            NonZero::new(120),
2894            vec![
2895                Operation::Write {
2896                    device_block_offset: 100,
2897                    block_count: 120,
2898                    options: WriteOptions::PRE_BARRIER,
2899                    vmo_offset: 0,
2900                },
2901                Operation::Write {
2902                    device_block_offset: 220,
2903                    block_count: 80,
2904                    options: WriteOptions::empty(),
2905                    vmo_offset: 120 * 512,
2906                },
2907            ],
2908        );
2909        expect_map_result(
2910            Operation::Trim { device_block_offset: 10, block_count: 200 },
2911            Some(BlockOffsetMapping {
2912                source_block_offset: 10,
2913                target_block_offset: 100,
2914                length: 200,
2915            }),
2916            NonZero::new(120),
2917            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2918        );
2919    }
2920}