block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::bin::Bin;
5use anyhow::{anyhow, Error};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use zx::HandleBased;
19use {
20    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn block_count(&self) -> Option<u64> {
40        match self {
41            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42            Self::Partition(PartitionInfo { block_range, .. }) => {
43                block_range.as_ref().map(|range| range.end - range.start)
44            }
45        }
46    }
47
48    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49        match self {
50            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52                max_transfer_blocks.clone()
53            }
54        }
55    }
56
57    fn max_transfer_size(&self, block_size: u32) -> u32 {
58        if let Some(max_blocks) = self.max_transfer_blocks() {
59            max_blocks.get() * block_size
60        } else {
61            MAX_TRANSFER_UNBOUNDED
62        }
63    }
64}
65
66/// Information associated with non-partition block devices.
67#[derive(Clone, Default)]
68pub struct BlockInfo {
69    pub device_flags: fblock::Flag,
70    pub block_count: u64,
71    pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74/// Information associated with a block device that is also a partition.
75#[derive(Clone, Default)]
76pub struct PartitionInfo {
77    /// The device flags reported by the underlying device.
78    pub device_flags: fblock::Flag,
79    pub max_transfer_blocks: Option<NonZero<u32>>,
80    /// If `block_range` is None, the partition is a volume and may not be contiguous.
81    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
82    /// slices and use that (along with the slice and block sizes) to determine the block count.
83    pub block_range: Option<Range<u64>>,
84    pub type_guid: [u8; 16],
85    pub instance_guid: [u8; 16],
86    pub name: String,
87    pub flags: u64,
88}
89
90/// We internally keep track of active requests, so that when the server is torn down, we can
91/// deallocate all of the resources for pending requests.
92struct ActiveRequest<S> {
93    session: S,
94    group_or_request: GroupOrRequest,
95    trace_flow_id: TraceFlowId,
96    vmo_bin_key: Option<usize>,
97    status: zx::Status,
98    count: u32,
99    req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105    fn default() -> Self {
106        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107    }
108}
109
110impl<S> ActiveRequests<S> {
111    fn complete_and_take_response(
112        &self,
113        request_id: RequestId,
114        status: zx::Status,
115    ) -> Option<(S, BlockFifoResponse)> {
116        self.0.lock().complete_and_take_response(request_id, status)
117    }
118}
119
120struct ActiveRequestsInner<S> {
121    requests: Slab<ActiveRequest<S>>,
122    vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125// Keeps track of all the requests that are currently being processed
126impl<S> ActiveRequestsInner<S> {
127    /// Completes a request.
128    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129        let group = &mut self.requests[request_id.0];
130
131        group.count = group.count.checked_sub(1).unwrap();
132        if status != zx::Status::OK && group.status == zx::Status::OK {
133            group.status = status
134        }
135
136        fuchsia_trace::duration!(
137            c"storage",
138            c"block_server::finish_transaction",
139            "request_id" => request_id.0,
140            "group_completed" => group.count == 0,
141            "status" => status.into_raw());
142        if let Some(trace_flow_id) = group.trace_flow_id {
143            fuchsia_trace::flow_step!(
144                c"storage",
145                c"block_server::finish_request",
146                trace_flow_id.get().into()
147            );
148        }
149    }
150
151    /// Takes the response if all requests are finished.
152    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153        let group = &self.requests[request_id.0];
154        match group.req_id {
155            Some(reqid) if group.count == 0 => {
156                let group = self.requests.remove(request_id.0);
157                if let Some(vmo_bin_key) = group.vmo_bin_key {
158                    self.vmo_bin.release(vmo_bin_key);
159                }
160                Some((
161                    group.session,
162                    BlockFifoResponse {
163                        status: group.status.into_raw(),
164                        reqid,
165                        group: group.group_or_request.group_id().unwrap_or(0),
166                        ..Default::default()
167                    },
168                ))
169            }
170            _ => None,
171        }
172    }
173
174    /// Competes the request and returns a response if the request group is finished.
175    fn complete_and_take_response(
176        &mut self,
177        request_id: RequestId,
178        status: zx::Status,
179    ) -> Option<(S, BlockFifoResponse)> {
180        self.complete(request_id, status);
181        self.take_response(request_id)
182    }
183}
184
185/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
186/// cbindgen:no-export
187pub struct BlockServer<SM> {
188    block_size: u32,
189    session_manager: Arc<SM>,
190}
191
192/// A single entry in `[OffsetMap]`.
193#[derive(Debug)]
194pub struct BlockOffsetMapping {
195    source_block_offset: u64,
196    target_block_offset: u64,
197    length: u64,
198}
199
200impl BlockOffsetMapping {
201    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202        blocks.0 >= self.source_block_offset
203            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204    }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208    type Error = zx::Status;
209
210    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213        Ok(Self {
214            source_block_offset: wire.source_block_offset,
215            target_block_offset: wire.target_block_offset,
216            length: wire.length,
217        })
218    }
219}
220
221/// Remaps the offset of block requests based on an internal map, and truncates long requests.
222pub struct OffsetMap {
223    mapping: Option<BlockOffsetMapping>,
224    max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228    /// An OffsetMap that remaps requests.
229    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230        Self { mapping: Some(mapping), max_transfer_blocks }
231    }
232
233    /// An OffsetMap that just enforces maximum request sizes.
234    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235        Self { mapping: None, max_transfer_blocks }
236    }
237
238    pub fn is_empty(&self) -> bool {
239        self.mapping.is_none()
240    }
241
242    fn mapping(&self) -> Option<&BlockOffsetMapping> {
243        self.mapping.as_ref()
244    }
245
246    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247        self.max_transfer_blocks
248    }
249}
250
251// Methods take Arc<Self> rather than &self because of
252// https://github.com/rust-lang/rust/issues/42940.
253pub trait SessionManager: 'static {
254    type Session;
255
256    fn on_attach_vmo(
257        self: Arc<Self>,
258        vmo: &Arc<zx::Vmo>,
259    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261    /// Creates a new session to handle `stream`.
262    /// The returned future should run until the session completes, for example when the client end
263    /// closes.
264    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
265    fn open_session(
266        self: Arc<Self>,
267        stream: fblock::SessionRequestStream,
268        offset_map: OffsetMap,
269        block_size: u32,
270    ) -> impl Future<Output = Result<(), Error>> + Send;
271
272    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
273    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275    /// Called to handle the GetVolumeInfo FIDL call.
276    fn get_volume_info(
277        &self,
278    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279    {
280        async { Err(zx::Status::NOT_SUPPORTED) }
281    }
282
283    /// Called to handle the QuerySlices FIDL call.
284    fn query_slices(
285        &self,
286        _start_slices: &[u64],
287    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288        async { Err(zx::Status::NOT_SUPPORTED) }
289    }
290
291    /// Called to handle the Shrink FIDL call.
292    fn extend(
293        &self,
294        _start_slice: u64,
295        _slice_count: u64,
296    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297        async { Err(zx::Status::NOT_SUPPORTED) }
298    }
299
300    /// Called to handle the Shrink FIDL call.
301    fn shrink(
302        &self,
303        _start_slice: u64,
304        _slice_count: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306        async { Err(zx::Status::NOT_SUPPORTED) }
307    }
308
309    /// Returns the active requests.
310    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314    type SM: SessionManager;
315
316    fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321        Self { block_size, session_manager: session_manager.into_session_manager() }
322    }
323
324    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
325    pub async fn handle_requests(
326        &self,
327        mut requests: fvolume::VolumeRequestStream,
328    ) -> Result<(), Error> {
329        let scope = fasync::Scope::new();
330        while let Some(request) = requests.try_next().await.unwrap() {
331            if let Some(session) = self.handle_request(request).await? {
332                scope.spawn(session.map(|_| ()));
333            }
334        }
335        scope.await;
336        Ok(())
337    }
338
339    /// Processes a partition request.  If a new session task is created in response to the request,
340    /// it is returned.
341    async fn handle_request(
342        &self,
343        request: fvolume::VolumeRequest,
344    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
345        match request {
346            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
347                Ok(info) => {
348                    let max_transfer_size = info.max_transfer_size(self.block_size);
349                    let (block_count, flags) = match info.as_ref() {
350                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
351                            (*block_count, *device_flags)
352                        }
353                        DeviceInfo::Partition(partition_info) => {
354                            let block_count = if let Some(range) =
355                                partition_info.block_range.as_ref()
356                            {
357                                range.end - range.start
358                            } else {
359                                let volume_info = self.session_manager.get_volume_info().await?;
360                                volume_info.0.slice_size * volume_info.1.partition_slice_count
361                                    / self.block_size as u64
362                            };
363                            (block_count, partition_info.device_flags)
364                        }
365                    };
366                    responder.send(Ok(&fblock::BlockInfo {
367                        block_count,
368                        block_size: self.block_size,
369                        max_transfer_size,
370                        flags,
371                    }))?;
372                }
373                Err(status) => responder.send(Err(status.into_raw()))?,
374            },
375            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
376                match self.device_info().await {
377                    Ok(info) => {
378                        return Ok(Some(self.session_manager.clone().open_session(
379                            session.into_stream(),
380                            OffsetMap::empty(info.max_transfer_blocks()),
381                            self.block_size,
382                        )));
383                    }
384                    Err(status) => session.close_with_epitaph(status)?,
385                }
386            }
387            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
388                session,
389                mapping,
390                control_handle: _,
391            } => match self.device_info().await {
392                Ok(info) => {
393                    let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
394                        Ok(m) => m,
395                        Err(status) => {
396                            session.close_with_epitaph(status)?;
397                            return Ok(None);
398                        }
399                    };
400                    if let Some(max) = info.block_count() {
401                        if initial_mapping.target_block_offset + initial_mapping.length > max {
402                            log::warn!(
403                                "Invalid mapping for session: {initial_mapping:?} (max {max})"
404                            );
405                            session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
406                            return Ok(None);
407                        }
408                    }
409                    return Ok(Some(self.session_manager.clone().open_session(
410                        session.into_stream(),
411                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
412                        self.block_size,
413                    )));
414                }
415                Err(status) => session.close_with_epitaph(status)?,
416            },
417            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
418                Ok(info) => {
419                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
420                        let mut guid =
421                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
422                        guid.value.copy_from_slice(&partition_info.type_guid);
423                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
424                    } else {
425                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
426                    }
427                }
428                Err(status) => {
429                    responder.send(status.into_raw(), None)?;
430                }
431            },
432            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
433                match self.device_info().await {
434                    Ok(info) => {
435                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
436                            let mut guid =
437                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
438                            guid.value.copy_from_slice(&partition_info.instance_guid);
439                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
440                        } else {
441                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
442                        }
443                    }
444                    Err(status) => {
445                        responder.send(status.into_raw(), None)?;
446                    }
447                }
448            }
449            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
450                Ok(info) => {
451                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
452                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
453                    } else {
454                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
455                    }
456                }
457                Err(status) => {
458                    responder.send(status.into_raw(), None)?;
459                }
460            },
461            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
462                Ok(info) => {
463                    if let DeviceInfo::Partition(info) = info.as_ref() {
464                        let mut type_guid =
465                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
466                        type_guid.value.copy_from_slice(&info.type_guid);
467                        let mut instance_guid =
468                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
469                        instance_guid.value.copy_from_slice(&info.instance_guid);
470                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
471                            name: Some(info.name.clone()),
472                            type_guid: Some(type_guid),
473                            instance_guid: Some(instance_guid),
474                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
475                            num_blocks: info
476                                .block_range
477                                .as_ref()
478                                .map(|range| range.end - range.start),
479                            flags: Some(info.flags),
480                            ..Default::default()
481                        }))?;
482                    }
483                }
484                Err(status) => responder.send(Err(status.into_raw()))?,
485            },
486            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
487                match self.session_manager.query_slices(&start_slices).await {
488                    Ok(mut results) => {
489                        let results_len = results.len();
490                        assert!(results_len <= 16);
491                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
492                        responder.send(
493                            zx::sys::ZX_OK,
494                            &results.try_into().unwrap(),
495                            results_len as u64,
496                        )?;
497                    }
498                    Err(s) => {
499                        responder.send(
500                            s.into_raw(),
501                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
502                            0,
503                        )?;
504                    }
505                }
506            }
507            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
508                match self.session_manager.get_volume_info().await {
509                    Ok((manager_info, volume_info)) => {
510                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
511                    }
512                    Err(s) => responder.send(s.into_raw(), None, None)?,
513                }
514            }
515            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
516                responder.send(
517                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
518                        .into_raw(),
519                )?;
520            }
521            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
522                responder.send(
523                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
524                        .into_raw(),
525                )?;
526            }
527            fvolume::VolumeRequest::Destroy { responder, .. } => {
528                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
529            }
530        }
531        Ok(None)
532    }
533
534    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
535        self.session_manager.get_info().await
536    }
537}
538
539struct SessionHelper<SM: SessionManager> {
540    session_manager: Arc<SM>,
541    offset_map: OffsetMap,
542    block_size: u32,
543    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
544    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
545}
546
547impl<SM: SessionManager> SessionHelper<SM> {
548    fn new(
549        session_manager: Arc<SM>,
550        offset_map: OffsetMap,
551        block_size: u32,
552    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
553        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
554        Ok((
555            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
556            fifo,
557        ))
558    }
559
560    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
561        match request {
562            fblock::SessionRequest::GetFifo { responder } => {
563                let rights = zx::Rights::TRANSFER
564                    | zx::Rights::READ
565                    | zx::Rights::WRITE
566                    | zx::Rights::SIGNAL
567                    | zx::Rights::WAIT;
568                match self.peer_fifo.duplicate_handle(rights) {
569                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
570                    Err(s) => responder.send(Err(s.into_raw()))?,
571                }
572                Ok(())
573            }
574            fblock::SessionRequest::AttachVmo { vmo, responder } => {
575                let vmo = Arc::new(vmo);
576                let vmo_id = {
577                    let mut vmos = self.vmos.lock();
578                    if vmos.len() == u16::MAX as usize {
579                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
580                        return Ok(());
581                    } else {
582                        let vmo_id = match vmos.last_entry() {
583                            None => 1,
584                            Some(o) => {
585                                o.key().checked_add(1).unwrap_or_else(|| {
586                                    let mut vmo_id = 1;
587                                    // Find the first gap...
588                                    for (&id, _) in &*vmos {
589                                        if id > vmo_id {
590                                            break;
591                                        }
592                                        vmo_id = id + 1;
593                                    }
594                                    vmo_id
595                                })
596                            }
597                        };
598                        vmos.insert(vmo_id, vmo.clone());
599                        vmo_id
600                    }
601                };
602                self.session_manager.clone().on_attach_vmo(&vmo).await?;
603                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
604                Ok(())
605            }
606            fblock::SessionRequest::Close { responder } => {
607                responder.send(Ok(()))?;
608                Err(anyhow!("Closed"))
609            }
610        }
611    }
612
613    /// Decodes `request`.
614    fn decode_fifo_request(
615        &self,
616        session: SM::Session,
617        request: &BlockFifoRequest,
618    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
619        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
620        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
621            .ok_or(zx::Status::INVALID_ARGS)
622            .and_then(|code| {
623                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
624                    if request.length == 0 {
625                        return Err(zx::Status::INVALID_ARGS);
626                    }
627                    // Make sure the end offsets won't wrap.
628                    if request.dev_offset.checked_add(request.length as u64).is_none()
629                        || (code != BlockOpcode::Trim
630                            && (request.length as u64 * self.block_size as u64)
631                                .checked_add(request.vmo_offset)
632                                .is_none())
633                    {
634                        return Err(zx::Status::OUT_OF_RANGE);
635                    }
636                }
637                Ok(match code {
638                    BlockOpcode::Read => Operation::Read {
639                        device_block_offset: request.dev_offset,
640                        block_count: request.length,
641                        _unused: 0,
642                        vmo_offset: request
643                            .vmo_offset
644                            .checked_mul(self.block_size as u64)
645                            .ok_or(zx::Status::OUT_OF_RANGE)?,
646                    },
647                    BlockOpcode::Write => {
648                        let mut options = WriteOptions::empty();
649                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
650                            options |= WriteOptions::FORCE_ACCESS;
651                        }
652                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
653                            options |= WriteOptions::PRE_BARRIER;
654                        }
655                        Operation::Write {
656                            device_block_offset: request.dev_offset,
657                            block_count: request.length,
658                            options: options,
659                            vmo_offset: request
660                                .vmo_offset
661                                .checked_mul(self.block_size as u64)
662                                .ok_or(zx::Status::OUT_OF_RANGE)?,
663                        }
664                    }
665                    BlockOpcode::Flush => Operation::Flush,
666                    BlockOpcode::Trim => Operation::Trim {
667                        device_block_offset: request.dev_offset,
668                        block_count: request.length,
669                    },
670                    BlockOpcode::CloseVmo => Operation::CloseVmo,
671                })
672            });
673
674        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
675            GroupOrRequest::Group(request.group)
676        } else {
677            GroupOrRequest::Request(request.reqid)
678        };
679
680        let mut active_requests = self.session_manager.active_requests().0.lock();
681        let mut request_id = None;
682
683        // Multiple Block I/O request may be sent as a group.
684        // Notes:
685        // - the group is identified by the group id in the request
686        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
687        //   flag is set.
688        // - when processing a request of a group fails, subsequent requests of that
689        //   group will not be processed.
690        //
691        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
692        if group_or_request.is_group() {
693            // Search for an existing entry that matches this group.  NOTE: This is a potentially
694            // expensive way to find a group (it's iterating over all slots in the active-requests
695            // slab).  This can be optimised easily should we need to.
696            for (key, group) in &mut active_requests.requests {
697                if group.group_or_request == group_or_request {
698                    if group.req_id.is_some() {
699                        // We have already received a request tagged as last.
700                        if group.status == zx::Status::OK {
701                            group.status = zx::Status::INVALID_ARGS;
702                        }
703                        // Ignore this request.
704                        return Err(None);
705                    }
706                    if flags.contains(BlockIoFlag::GROUP_LAST) {
707                        group.req_id = Some(request.reqid);
708                        // If the group has had an error, there is no point trying to issue this
709                        // request.
710                        if group.status != zx::Status::OK {
711                            operation = Err(group.status);
712                        }
713                    } else if group.status != zx::Status::OK {
714                        // The group has already encountered an error, so there is no point trying
715                        // to issue this request.
716                        return Err(None);
717                    }
718                    request_id = Some(RequestId(key));
719                    group.count += 1;
720                    break;
721                }
722            }
723        }
724
725        let mut retain_vmo = false;
726        let vmo = match &operation {
727            Ok(Operation::Read { .. } | Operation::Write { .. }) => {
728                self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
729                    retain_vmo = true;
730                    Ok(Some(vmo))
731                })
732            }
733            Ok(Operation::CloseVmo) => {
734                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
735                    active_requests.vmo_bin.add(vmo.clone());
736                    Ok(Some(vmo))
737                })
738            }
739            _ => Ok(None),
740        }
741        .unwrap_or_else(|e| {
742            operation = Err(e);
743            None
744        });
745
746        let trace_flow_id = NonZero::new(request.trace_flow_id);
747        let request_id = request_id.unwrap_or_else(|| {
748            let vmo_bin_key =
749                if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
750            RequestId(active_requests.requests.insert(ActiveRequest {
751                session,
752                group_or_request,
753                trace_flow_id,
754                vmo_bin_key,
755                status: zx::Status::OK,
756                count: 1,
757                req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
758                    || flags.contains(BlockIoFlag::GROUP_LAST)
759                {
760                    Some(request.reqid)
761                } else {
762                    None
763                },
764            }))
765        });
766
767        Ok(DecodedRequest {
768            request_id,
769            trace_flow_id,
770            operation: operation.map_err(|status| {
771                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
772            })?,
773            vmo,
774        })
775    }
776
777    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
778        std::mem::take(&mut *self.vmos.lock())
779    }
780
781    /// Maps the request and returns the mapped request with an optional remainder.
782    fn map_request(
783        &self,
784        mut request: DecodedRequest,
785    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
786        let mut active_requests = self.session_manager.active_requests().0.lock();
787        let active_request = &mut active_requests.requests[request.request_id.0];
788        if active_request.status != zx::Status::OK {
789            return Err(active_requests
790                .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
791                .map(|(_, r)| r));
792        }
793        let mapping = self.offset_map.mapping();
794        match (mapping, request.operation.blocks()) {
795            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
796                return Err(active_requests
797                    .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
798                    .map(|(_, r)| r));
799            }
800            _ => {}
801        }
802        let remainder = request.operation.map(
803            self.offset_map.mapping(),
804            self.offset_map.max_transfer_blocks(),
805            self.block_size,
806        );
807        if remainder.is_some() {
808            active_request.count += 1;
809        }
810        static CACHE: AtomicU64 = AtomicU64::new(0);
811        if let Some(context) =
812            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
813        {
814            use fuchsia_trace::ArgValue;
815            let trace_args = [
816                ArgValue::of("request_id", request.request_id.0),
817                ArgValue::of("opcode", request.operation.trace_label()),
818            ];
819            let _scope = fuchsia_trace::duration(
820                c"storage",
821                c"block_server::start_transaction",
822                &trace_args,
823            );
824            if let Some(trace_flow_id) = active_request.trace_flow_id {
825                fuchsia_trace::flow_step(
826                    &context,
827                    c"block_server::start_transaction",
828                    trace_flow_id.get().into(),
829                    &[],
830                );
831            }
832        }
833        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
834        Ok((request, remainder))
835    }
836
837    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
838        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
839    }
840}
841
842#[repr(transparent)]
843#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
844pub struct RequestId(usize);
845
846#[derive(Clone, Debug)]
847struct DecodedRequest {
848    request_id: RequestId,
849    trace_flow_id: TraceFlowId,
850    operation: Operation,
851    vmo: Option<Arc<zx::Vmo>>,
852}
853
854/// cbindgen:no-export
855pub type WriteOptions = block_protocol::WriteOptions;
856
857#[repr(C)]
858#[derive(Clone, Debug, PartialEq, Eq)]
859pub enum Operation {
860    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
861    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
862    // code can read `device_block_offset` from the read variant and then assume it's valid for the
863    // write variant).
864    Read {
865        device_block_offset: u64,
866        block_count: u32,
867        _unused: u32,
868        vmo_offset: u64,
869    },
870    Write {
871        device_block_offset: u64,
872        block_count: u32,
873        options: WriteOptions,
874        vmo_offset: u64,
875    },
876    Flush,
877    Trim {
878        device_block_offset: u64,
879        block_count: u32,
880    },
881    /// This will never be seen by the C interface.
882    CloseVmo,
883}
884
885impl Operation {
886    fn trace_label(&self) -> &'static str {
887        match self {
888            Operation::Read { .. } => "read",
889            Operation::Write { .. } => "write",
890            Operation::Flush { .. } => "flush",
891            Operation::Trim { .. } => "trim",
892            Operation::CloseVmo { .. } => "close_vmo",
893        }
894    }
895
896    /// Returns (offset, length).
897    fn blocks(&self) -> Option<(u64, u32)> {
898        match self {
899            Operation::Read { device_block_offset, block_count, .. }
900            | Operation::Write { device_block_offset, block_count, .. }
901            | Operation::Trim { device_block_offset, block_count, .. } => {
902                Some((*device_block_offset, *block_count))
903            }
904            _ => None,
905        }
906    }
907
908    /// Returns mutable references to (offset, length).
909    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
910        match self {
911            Operation::Read { device_block_offset, block_count, .. }
912            | Operation::Write { device_block_offset, block_count, .. }
913            | Operation::Trim { device_block_offset, block_count, .. } => {
914                Some((device_block_offset, block_count))
915            }
916            _ => None,
917        }
918    }
919
920    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
921    /// start of the operation.
922    fn map(
923        &mut self,
924        mapping: Option<&BlockOffsetMapping>,
925        max_blocks: Option<NonZero<u32>>,
926        block_size: u32,
927    ) -> Option<Self> {
928        let mut max = match self {
929            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
930            _ => None,
931        };
932        let (offset, length) = self.blocks_mut()?;
933        let orig_offset = *offset;
934        if let Some(mapping) = mapping {
935            let delta = *offset - mapping.source_block_offset;
936            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
937            *offset = mapping.target_block_offset + delta;
938            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
939            max = match max {
940                None => Some(mapping_max),
941                Some(m) => Some(std::cmp::min(m, mapping_max)),
942            };
943        };
944        if let Some(max) = max {
945            if *length as u64 > max {
946                let rem = (*length as u64 - max) as u32;
947                *length = max as u32;
948                return Some(match self {
949                    Operation::Read {
950                        device_block_offset: _,
951                        block_count: _,
952                        vmo_offset,
953                        _unused,
954                    } => Operation::Read {
955                        device_block_offset: orig_offset + max,
956                        block_count: rem,
957                        vmo_offset: *vmo_offset + max * block_size as u64,
958                        _unused: *_unused,
959                    },
960                    Operation::Write {
961                        device_block_offset: _,
962                        block_count: _,
963                        vmo_offset,
964                        options,
965                    } => {
966                        // Only send the barrier flag once per write request.
967                        let options = *options & !WriteOptions::PRE_BARRIER;
968                        Operation::Write {
969                            device_block_offset: orig_offset + max,
970                            block_count: rem,
971                            vmo_offset: *vmo_offset + max * block_size as u64,
972                            options,
973                        }
974                    }
975                    Operation::Trim { device_block_offset: _, block_count: _ } => {
976                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
977                    }
978                    _ => unreachable!(),
979                });
980            }
981        }
982        None
983    }
984}
985
986#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
987pub enum GroupOrRequest {
988    Group(u16),
989    Request(u32),
990}
991
992impl GroupOrRequest {
993    fn is_group(&self) -> bool {
994        matches!(self, Self::Group(_))
995    }
996
997    fn group_id(&self) -> Option<u16> {
998        match self {
999            Self::Group(id) => Some(*id),
1000            Self::Request(_) => None,
1001        }
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::{
1008        BlockOffsetMapping, BlockServer, DeviceInfo, Operation, PartitionInfo, TraceFlowId,
1009        FIFO_MAX_REQUESTS,
1010    };
1011    use assert_matches::assert_matches;
1012    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
1013    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1014    use fuchsia_sync::Mutex;
1015    use futures::channel::oneshot;
1016    use futures::future::BoxFuture;
1017    use futures::FutureExt as _;
1018    use std::borrow::Cow;
1019    use std::future::poll_fn;
1020    use std::num::NonZero;
1021    use std::pin::pin;
1022    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1023    use std::sync::Arc;
1024    use std::task::{Context, Poll};
1025    use zx::{AsHandleRef as _, HandleBased as _};
1026    use {
1027        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1028        fuchsia_async as fasync,
1029    };
1030
1031    #[derive(Default)]
1032    struct MockInterface {
1033        read_hook: Option<
1034            Box<
1035                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1036                    + Send
1037                    + Sync,
1038            >,
1039        >,
1040        write_hook:
1041            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1042        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1043    }
1044
1045    impl super::async_interface::Interface for MockInterface {
1046        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1047            Ok(())
1048        }
1049
1050        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1051            Ok(Cow::Owned(test_device_info()))
1052        }
1053
1054        async fn read(
1055            &self,
1056            device_block_offset: u64,
1057            block_count: u32,
1058            vmo: &Arc<zx::Vmo>,
1059            vmo_offset: u64,
1060            _trace_flow_id: TraceFlowId,
1061        ) -> Result<(), zx::Status> {
1062            if let Some(read_hook) = &self.read_hook {
1063                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1064            } else {
1065                unimplemented!();
1066            }
1067        }
1068
1069        async fn write(
1070            &self,
1071            device_block_offset: u64,
1072            _block_count: u32,
1073            _vmo: &Arc<zx::Vmo>,
1074            _vmo_offset: u64,
1075            _opts: WriteOptions,
1076            _trace_flow_id: TraceFlowId,
1077        ) -> Result<(), zx::Status> {
1078            if let Some(write_hook) = &self.write_hook {
1079                write_hook(device_block_offset).await
1080            } else {
1081                unimplemented!();
1082            }
1083        }
1084
1085        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1086            unreachable!();
1087        }
1088
1089        fn barrier(&self) -> Result<(), zx::Status> {
1090            if let Some(barrier_hook) = &self.barrier_hook {
1091                barrier_hook()
1092            } else {
1093                unimplemented!();
1094            }
1095        }
1096
1097        async fn trim(
1098            &self,
1099            _device_block_offset: u64,
1100            _block_count: u32,
1101            _trace_flow_id: TraceFlowId,
1102        ) -> Result<(), zx::Status> {
1103            unreachable!();
1104        }
1105
1106        async fn get_volume_info(
1107            &self,
1108        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1109            // Hang forever for the test_requests_dont_block_sessions test.
1110            let () = std::future::pending().await;
1111            unreachable!();
1112        }
1113    }
1114
1115    const BLOCK_SIZE: u32 = 512;
1116    const MAX_TRANSFER_BLOCKS: u32 = 10;
1117
1118    fn test_device_info() -> DeviceInfo {
1119        DeviceInfo::Partition(PartitionInfo {
1120            device_flags: fblock::Flag::READONLY,
1121            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1122            block_range: Some(0..100),
1123            type_guid: [1; 16],
1124            instance_guid: [2; 16],
1125            name: "foo".to_string(),
1126            flags: 0xabcd,
1127        })
1128    }
1129
1130    #[fuchsia::test]
1131    async fn test_split_request_only_issues_single_barrier() {
1132        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1133        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1134        let barrier_called = Arc::new(AtomicU32::new(0));
1135        let barrier_called_clone = barrier_called.clone();
1136
1137        futures::join!(
1138            async move {
1139                let block_server = BlockServer::new(
1140                    BLOCK_SIZE,
1141                    Arc::new(MockInterface {
1142                        barrier_hook: Some(Box::new(move || {
1143                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1144                            Ok(())
1145                        })),
1146                        write_hook: Some(Box::new(move |_device_block_offset| {
1147                            Box::pin(async move { Ok(()) })
1148                        })),
1149                        ..MockInterface::default()
1150                    }),
1151                );
1152                block_server.handle_requests(stream).await.unwrap();
1153            },
1154            async move {
1155                let (session_proxy, server) = fidl::endpoints::create_proxy();
1156
1157                proxy.open_session(server).unwrap();
1158
1159                let vmo_id = session_proxy
1160                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1161                    .await
1162                    .unwrap()
1163                    .unwrap();
1164                assert_ne!(vmo_id.id, 0);
1165
1166                let mut fifo =
1167                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1168                let (mut reader, mut writer) = fifo.async_io();
1169                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1170                // sub requests.
1171                writer
1172                    .write_entries(&BlockFifoRequest {
1173                        command: BlockFifoCommand {
1174                            opcode: BlockOpcode::Write.into_primitive(),
1175                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1176                            ..Default::default()
1177                        },
1178                        vmoid: vmo_id.id,
1179                        dev_offset: 0,
1180                        length: MAX_TRANSFER_BLOCKS + 1,
1181                        vmo_offset: 6,
1182                        ..Default::default()
1183                    })
1184                    .await
1185                    .unwrap();
1186
1187                let mut response = BlockFifoResponse::default();
1188                reader.read_entries(&mut response).await.unwrap();
1189                assert_eq!(response.status, zx::sys::ZX_OK);
1190
1191                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1192
1193                std::mem::drop(proxy);
1194            }
1195        );
1196    }
1197
1198    #[fuchsia::test]
1199    async fn test_barriers_not_supported() {
1200        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1201        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1202
1203        futures::join!(
1204            async move {
1205                let block_server = BlockServer::new(
1206                    BLOCK_SIZE,
1207                    Arc::new(MockInterface {
1208                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1209                        write_hook: Some(Box::new(move |_device_block_offset| {
1210                            Box::pin(async move { Ok(()) })
1211                        })),
1212                        ..MockInterface::default()
1213                    }),
1214                );
1215                block_server.handle_requests(stream).await.unwrap();
1216            },
1217            async move {
1218                let (session_proxy, server) = fidl::endpoints::create_proxy();
1219
1220                proxy.open_session(server).unwrap();
1221
1222                let vmo_id = session_proxy
1223                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1224                    .await
1225                    .unwrap()
1226                    .unwrap();
1227                assert_ne!(vmo_id.id, 0);
1228
1229                let mut fifo =
1230                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1231                let (mut reader, mut writer) = fifo.async_io();
1232
1233                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1234                // sub requests.
1235                writer
1236                    .write_entries(&BlockFifoRequest {
1237                        command: BlockFifoCommand {
1238                            opcode: BlockOpcode::Write.into_primitive(),
1239                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1240                            ..Default::default()
1241                        },
1242                        vmoid: vmo_id.id,
1243                        dev_offset: 0,
1244                        length: MAX_TRANSFER_BLOCKS + 1,
1245                        vmo_offset: 6,
1246                        ..Default::default()
1247                    })
1248                    .await
1249                    .unwrap();
1250
1251                let mut response = BlockFifoResponse::default();
1252                reader.read_entries(&mut response).await.unwrap();
1253                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1254
1255                std::mem::drop(proxy);
1256            }
1257        );
1258    }
1259
1260    #[fuchsia::test]
1261    async fn test_barriers_ordering() {
1262        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1263        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1264        let barrier_called = Arc::new(AtomicBool::new(false));
1265
1266        futures::join!(
1267            async move {
1268                let barrier_called_clone = barrier_called.clone();
1269                let block_server = BlockServer::new(
1270                    BLOCK_SIZE,
1271                    Arc::new(MockInterface {
1272                        barrier_hook: Some(Box::new(move || {
1273                            barrier_called.store(true, Ordering::Relaxed);
1274                            Ok(())
1275                        })),
1276                        write_hook: Some(Box::new(move |device_block_offset| {
1277                            let barrier_called = barrier_called_clone.clone();
1278                            Box::pin(async move {
1279                                // The sleep allows the server to reorder the fifo requests.
1280                                if device_block_offset % 2 == 0 {
1281                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1282                                        zx::MonotonicDuration::from_millis(200),
1283                                    ))
1284                                    .await;
1285                                }
1286                                assert!(barrier_called.load(Ordering::Relaxed));
1287                                Ok(())
1288                            })
1289                        })),
1290                        ..MockInterface::default()
1291                    }),
1292                );
1293                block_server.handle_requests(stream).await.unwrap();
1294            },
1295            async move {
1296                let (session_proxy, server) = fidl::endpoints::create_proxy();
1297
1298                proxy.open_session(server).unwrap();
1299
1300                let vmo_id = session_proxy
1301                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1302                    .await
1303                    .unwrap()
1304                    .unwrap();
1305                assert_ne!(vmo_id.id, 0);
1306
1307                let mut fifo =
1308                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1309                let (mut reader, mut writer) = fifo.async_io();
1310
1311                writer
1312                    .write_entries(&BlockFifoRequest {
1313                        command: BlockFifoCommand {
1314                            opcode: BlockOpcode::Write.into_primitive(),
1315                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1316                            ..Default::default()
1317                        },
1318                        vmoid: vmo_id.id,
1319                        dev_offset: 0,
1320                        length: 5,
1321                        vmo_offset: 6,
1322                        ..Default::default()
1323                    })
1324                    .await
1325                    .unwrap();
1326
1327                for i in 0..10 {
1328                    writer
1329                        .write_entries(&BlockFifoRequest {
1330                            command: BlockFifoCommand {
1331                                opcode: BlockOpcode::Write.into_primitive(),
1332                                ..Default::default()
1333                            },
1334                            vmoid: vmo_id.id,
1335                            dev_offset: i + 1,
1336                            length: 5,
1337                            vmo_offset: 6,
1338                            ..Default::default()
1339                        })
1340                        .await
1341                        .unwrap();
1342                }
1343                for _ in 0..11 {
1344                    let mut response = BlockFifoResponse::default();
1345                    reader.read_entries(&mut response).await.unwrap();
1346                    assert_eq!(response.status, zx::sys::ZX_OK);
1347                }
1348
1349                std::mem::drop(proxy);
1350            }
1351        );
1352    }
1353
1354    #[fuchsia::test]
1355    async fn test_info() {
1356        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1357
1358        futures::join!(
1359            async {
1360                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1361                block_server.handle_requests(stream).await.unwrap();
1362            },
1363            async {
1364                let expected_info = test_device_info();
1365                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1366                    info
1367                } else {
1368                    unreachable!()
1369                };
1370
1371                let block_info = proxy.get_info().await.unwrap().unwrap();
1372                assert_eq!(
1373                    block_info.block_count,
1374                    partition_info.block_range.as_ref().unwrap().end
1375                        - partition_info.block_range.as_ref().unwrap().start
1376                );
1377                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1378
1379                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1380
1381                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1382                assert_eq!(status, zx::sys::ZX_OK);
1383                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1384
1385                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1386                assert_eq!(status, zx::sys::ZX_OK);
1387                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1388
1389                let (status, name) = proxy.get_name().await.unwrap();
1390                assert_eq!(status, zx::sys::ZX_OK);
1391                assert_eq!(name.as_ref(), Some(&partition_info.name));
1392
1393                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1394                assert_eq!(metadata.name, name);
1395                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1396                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1397                assert_eq!(
1398                    metadata.start_block_offset,
1399                    Some(partition_info.block_range.as_ref().unwrap().start)
1400                );
1401                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1402                assert_eq!(metadata.flags, Some(partition_info.flags));
1403
1404                std::mem::drop(proxy);
1405            }
1406        );
1407    }
1408
1409    #[fuchsia::test]
1410    async fn test_attach_vmo() {
1411        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1412
1413        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1414        let koid = vmo.get_koid().unwrap();
1415
1416        futures::join!(
1417            async {
1418                let block_server = BlockServer::new(
1419                    BLOCK_SIZE,
1420                    Arc::new(MockInterface {
1421                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1422                            assert_eq!(vmo.get_koid().unwrap(), koid);
1423                            Box::pin(async { Ok(()) })
1424                        })),
1425                        ..MockInterface::default()
1426                    }),
1427                );
1428                block_server.handle_requests(stream).await.unwrap();
1429            },
1430            async move {
1431                let (session_proxy, server) = fidl::endpoints::create_proxy();
1432
1433                proxy.open_session(server).unwrap();
1434
1435                let vmo_id = session_proxy
1436                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1437                    .await
1438                    .unwrap()
1439                    .unwrap();
1440                assert_ne!(vmo_id.id, 0);
1441
1442                let mut fifo =
1443                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1444                let (mut reader, mut writer) = fifo.async_io();
1445
1446                // Keep attaching VMOs until we eventually hit the maximum.
1447                let mut count = 1;
1448                loop {
1449                    match session_proxy
1450                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1451                        .await
1452                        .unwrap()
1453                    {
1454                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1455                        Err(e) => {
1456                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1457                            break;
1458                        }
1459                    }
1460
1461                    // Only test every 10 to keep test time down.
1462                    if count % 10 == 0 {
1463                        writer
1464                            .write_entries(&BlockFifoRequest {
1465                                command: BlockFifoCommand {
1466                                    opcode: BlockOpcode::Read.into_primitive(),
1467                                    ..Default::default()
1468                                },
1469                                vmoid: vmo_id.id,
1470                                length: 1,
1471                                ..Default::default()
1472                            })
1473                            .await
1474                            .unwrap();
1475
1476                        let mut response = BlockFifoResponse::default();
1477                        reader.read_entries(&mut response).await.unwrap();
1478                        assert_eq!(response.status, zx::sys::ZX_OK);
1479                    }
1480
1481                    count += 1;
1482                }
1483
1484                assert_eq!(count, u16::MAX as u64);
1485
1486                // Detach the original VMO, and make sure we can then attach another one.
1487                writer
1488                    .write_entries(&BlockFifoRequest {
1489                        command: BlockFifoCommand {
1490                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1491                            ..Default::default()
1492                        },
1493                        vmoid: vmo_id.id,
1494                        ..Default::default()
1495                    })
1496                    .await
1497                    .unwrap();
1498
1499                let mut response = BlockFifoResponse::default();
1500                reader.read_entries(&mut response).await.unwrap();
1501                assert_eq!(response.status, zx::sys::ZX_OK);
1502
1503                let new_vmo_id = session_proxy
1504                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1505                    .await
1506                    .unwrap()
1507                    .unwrap();
1508                // It should reuse the same ID.
1509                assert_eq!(new_vmo_id.id, vmo_id.id);
1510
1511                std::mem::drop(proxy);
1512            }
1513        );
1514    }
1515
1516    #[fuchsia::test]
1517    async fn test_close() {
1518        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1519
1520        let mut server = std::pin::pin!(async {
1521            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1522            block_server.handle_requests(stream).await.unwrap();
1523        }
1524        .fuse());
1525
1526        let mut client = std::pin::pin!(async {
1527            let (session_proxy, server) = fidl::endpoints::create_proxy();
1528
1529            proxy.open_session(server).unwrap();
1530
1531            // Dropping the proxy should not cause the session to terminate because the session is
1532            // still live.
1533            std::mem::drop(proxy);
1534
1535            session_proxy.close().await.unwrap().unwrap();
1536
1537            // Keep the session alive.  Calling `close` should cause the server to terminate.
1538            let _: () = std::future::pending().await;
1539        }
1540        .fuse());
1541
1542        futures::select!(
1543            _ = server => {}
1544            _ = client => unreachable!(),
1545        );
1546    }
1547
1548    #[derive(Default)]
1549    struct IoMockInterface {
1550        do_checks: bool,
1551        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1552        return_errors: bool,
1553    }
1554
1555    #[derive(Debug)]
1556    enum ExpectedOp {
1557        Read(u64, u32, u64),
1558        Write(u64, u32, u64),
1559        Trim(u64, u32),
1560        Flush,
1561    }
1562
1563    impl super::async_interface::Interface for IoMockInterface {
1564        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1565            Ok(())
1566        }
1567
1568        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1569            Ok(Cow::Owned(test_device_info()))
1570        }
1571
1572        async fn read(
1573            &self,
1574            device_block_offset: u64,
1575            block_count: u32,
1576            _vmo: &Arc<zx::Vmo>,
1577            vmo_offset: u64,
1578            _trace_flow_id: TraceFlowId,
1579        ) -> Result<(), zx::Status> {
1580            if self.return_errors {
1581                Err(zx::Status::INTERNAL)
1582            } else {
1583                if self.do_checks {
1584                    assert_matches!(
1585                        self.expected_op.lock().take(),
1586                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1587                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1588                        "Read {device_block_offset} {block_count} {vmo_offset}"
1589                    );
1590                }
1591                Ok(())
1592            }
1593        }
1594
1595        async fn write(
1596            &self,
1597            device_block_offset: u64,
1598            block_count: u32,
1599            _vmo: &Arc<zx::Vmo>,
1600            vmo_offset: u64,
1601            _opts: WriteOptions,
1602            _trace_flow_id: TraceFlowId,
1603        ) -> Result<(), zx::Status> {
1604            if self.return_errors {
1605                Err(zx::Status::NOT_SUPPORTED)
1606            } else {
1607                if self.do_checks {
1608                    assert_matches!(
1609                        self.expected_op.lock().take(),
1610                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1611                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1612                        "Write {device_block_offset} {block_count} {vmo_offset}"
1613                    );
1614                }
1615                Ok(())
1616            }
1617        }
1618
1619        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1620            if self.return_errors {
1621                Err(zx::Status::NO_RESOURCES)
1622            } else {
1623                if self.do_checks {
1624                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1625                }
1626                Ok(())
1627            }
1628        }
1629
1630        fn barrier(&self) -> Result<(), zx::Status> {
1631            unreachable!()
1632        }
1633
1634        async fn trim(
1635            &self,
1636            device_block_offset: u64,
1637            block_count: u32,
1638            _trace_flow_id: TraceFlowId,
1639        ) -> Result<(), zx::Status> {
1640            if self.return_errors {
1641                Err(zx::Status::NO_MEMORY)
1642            } else {
1643                if self.do_checks {
1644                    assert_matches!(
1645                        self.expected_op.lock().take(),
1646                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1647                            block_count == b,
1648                        "Trim {device_block_offset} {block_count}"
1649                    );
1650                }
1651                Ok(())
1652            }
1653        }
1654    }
1655
1656    #[fuchsia::test]
1657    async fn test_io() {
1658        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1659
1660        let expected_op = Arc::new(Mutex::new(None));
1661        let expected_op_clone = expected_op.clone();
1662
1663        let server = async {
1664            let block_server = BlockServer::new(
1665                BLOCK_SIZE,
1666                Arc::new(IoMockInterface {
1667                    return_errors: false,
1668                    do_checks: true,
1669                    expected_op: expected_op_clone,
1670                }),
1671            );
1672            block_server.handle_requests(stream).await.unwrap();
1673        };
1674
1675        let client = async move {
1676            let (session_proxy, server) = fidl::endpoints::create_proxy();
1677
1678            proxy.open_session(server).unwrap();
1679
1680            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1681            let vmo_id = session_proxy
1682                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1683                .await
1684                .unwrap()
1685                .unwrap();
1686
1687            let mut fifo =
1688                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1689            let (mut reader, mut writer) = fifo.async_io();
1690
1691            // READ
1692            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1693            writer
1694                .write_entries(&BlockFifoRequest {
1695                    command: BlockFifoCommand {
1696                        opcode: BlockOpcode::Read.into_primitive(),
1697                        ..Default::default()
1698                    },
1699                    vmoid: vmo_id.id,
1700                    dev_offset: 1,
1701                    length: 2,
1702                    vmo_offset: 3,
1703                    ..Default::default()
1704                })
1705                .await
1706                .unwrap();
1707
1708            let mut response = BlockFifoResponse::default();
1709            reader.read_entries(&mut response).await.unwrap();
1710            assert_eq!(response.status, zx::sys::ZX_OK);
1711
1712            // WRITE
1713            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1714            writer
1715                .write_entries(&BlockFifoRequest {
1716                    command: BlockFifoCommand {
1717                        opcode: BlockOpcode::Write.into_primitive(),
1718                        ..Default::default()
1719                    },
1720                    vmoid: vmo_id.id,
1721                    dev_offset: 4,
1722                    length: 5,
1723                    vmo_offset: 6,
1724                    ..Default::default()
1725                })
1726                .await
1727                .unwrap();
1728
1729            let mut response = BlockFifoResponse::default();
1730            reader.read_entries(&mut response).await.unwrap();
1731            assert_eq!(response.status, zx::sys::ZX_OK);
1732
1733            // FLUSH
1734            *expected_op.lock() = Some(ExpectedOp::Flush);
1735            writer
1736                .write_entries(&BlockFifoRequest {
1737                    command: BlockFifoCommand {
1738                        opcode: BlockOpcode::Flush.into_primitive(),
1739                        ..Default::default()
1740                    },
1741                    ..Default::default()
1742                })
1743                .await
1744                .unwrap();
1745
1746            reader.read_entries(&mut response).await.unwrap();
1747            assert_eq!(response.status, zx::sys::ZX_OK);
1748
1749            // TRIM
1750            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1751            writer
1752                .write_entries(&BlockFifoRequest {
1753                    command: BlockFifoCommand {
1754                        opcode: BlockOpcode::Trim.into_primitive(),
1755                        ..Default::default()
1756                    },
1757                    dev_offset: 7,
1758                    length: 8,
1759                    ..Default::default()
1760                })
1761                .await
1762                .unwrap();
1763
1764            reader.read_entries(&mut response).await.unwrap();
1765            assert_eq!(response.status, zx::sys::ZX_OK);
1766
1767            std::mem::drop(proxy);
1768        };
1769
1770        futures::join!(server, client);
1771    }
1772
1773    #[fuchsia::test]
1774    async fn test_io_errors() {
1775        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1776
1777        futures::join!(
1778            async {
1779                let block_server = BlockServer::new(
1780                    BLOCK_SIZE,
1781                    Arc::new(IoMockInterface {
1782                        return_errors: true,
1783                        do_checks: false,
1784                        expected_op: Arc::new(Mutex::new(None)),
1785                    }),
1786                );
1787                block_server.handle_requests(stream).await.unwrap();
1788            },
1789            async move {
1790                let (session_proxy, server) = fidl::endpoints::create_proxy();
1791
1792                proxy.open_session(server).unwrap();
1793
1794                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1795                let vmo_id = session_proxy
1796                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1797                    .await
1798                    .unwrap()
1799                    .unwrap();
1800
1801                let mut fifo =
1802                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1803                let (mut reader, mut writer) = fifo.async_io();
1804
1805                // READ
1806                writer
1807                    .write_entries(&BlockFifoRequest {
1808                        command: BlockFifoCommand {
1809                            opcode: BlockOpcode::Read.into_primitive(),
1810                            ..Default::default()
1811                        },
1812                        vmoid: vmo_id.id,
1813                        length: 1,
1814                        reqid: 1,
1815                        ..Default::default()
1816                    })
1817                    .await
1818                    .unwrap();
1819
1820                let mut response = BlockFifoResponse::default();
1821                reader.read_entries(&mut response).await.unwrap();
1822                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1823
1824                // WRITE
1825                writer
1826                    .write_entries(&BlockFifoRequest {
1827                        command: BlockFifoCommand {
1828                            opcode: BlockOpcode::Write.into_primitive(),
1829                            ..Default::default()
1830                        },
1831                        vmoid: vmo_id.id,
1832                        length: 1,
1833                        reqid: 2,
1834                        ..Default::default()
1835                    })
1836                    .await
1837                    .unwrap();
1838
1839                reader.read_entries(&mut response).await.unwrap();
1840                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1841
1842                // FLUSH
1843                writer
1844                    .write_entries(&BlockFifoRequest {
1845                        command: BlockFifoCommand {
1846                            opcode: BlockOpcode::Flush.into_primitive(),
1847                            ..Default::default()
1848                        },
1849                        reqid: 3,
1850                        ..Default::default()
1851                    })
1852                    .await
1853                    .unwrap();
1854
1855                reader.read_entries(&mut response).await.unwrap();
1856                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1857
1858                // TRIM
1859                writer
1860                    .write_entries(&BlockFifoRequest {
1861                        command: BlockFifoCommand {
1862                            opcode: BlockOpcode::Trim.into_primitive(),
1863                            ..Default::default()
1864                        },
1865                        reqid: 4,
1866                        length: 1,
1867                        ..Default::default()
1868                    })
1869                    .await
1870                    .unwrap();
1871
1872                reader.read_entries(&mut response).await.unwrap();
1873                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1874
1875                std::mem::drop(proxy);
1876            }
1877        );
1878    }
1879
1880    #[fuchsia::test]
1881    async fn test_invalid_args() {
1882        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1883
1884        futures::join!(
1885            async {
1886                let block_server = BlockServer::new(
1887                    BLOCK_SIZE,
1888                    Arc::new(IoMockInterface {
1889                        return_errors: false,
1890                        do_checks: false,
1891                        expected_op: Arc::new(Mutex::new(None)),
1892                    }),
1893                );
1894                block_server.handle_requests(stream).await.unwrap();
1895            },
1896            async move {
1897                let (session_proxy, server) = fidl::endpoints::create_proxy();
1898
1899                proxy.open_session(server).unwrap();
1900
1901                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1902                let vmo_id = session_proxy
1903                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1904                    .await
1905                    .unwrap()
1906                    .unwrap();
1907
1908                let mut fifo =
1909                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1910
1911                async fn test(
1912                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1913                    request: BlockFifoRequest,
1914                ) -> Result<(), zx::Status> {
1915                    let (mut reader, mut writer) = fifo.async_io();
1916                    writer.write_entries(&request).await.unwrap();
1917                    let mut response = BlockFifoResponse::default();
1918                    reader.read_entries(&mut response).await.unwrap();
1919                    zx::Status::ok(response.status)
1920                }
1921
1922                // READ
1923
1924                let good_read_request = || BlockFifoRequest {
1925                    command: BlockFifoCommand {
1926                        opcode: BlockOpcode::Read.into_primitive(),
1927                        ..Default::default()
1928                    },
1929                    length: 1,
1930                    vmoid: vmo_id.id,
1931                    ..Default::default()
1932                };
1933
1934                assert_eq!(
1935                    test(
1936                        &mut fifo,
1937                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1938                    )
1939                    .await,
1940                    Err(zx::Status::IO)
1941                );
1942
1943                assert_eq!(
1944                    test(
1945                        &mut fifo,
1946                        BlockFifoRequest {
1947                            vmo_offset: 0xffff_ffff_ffff_ffff,
1948                            ..good_read_request()
1949                        }
1950                    )
1951                    .await,
1952                    Err(zx::Status::OUT_OF_RANGE)
1953                );
1954
1955                assert_eq!(
1956                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1957                    Err(zx::Status::INVALID_ARGS)
1958                );
1959
1960                // WRITE
1961
1962                let good_write_request = || BlockFifoRequest {
1963                    command: BlockFifoCommand {
1964                        opcode: BlockOpcode::Write.into_primitive(),
1965                        ..Default::default()
1966                    },
1967                    length: 1,
1968                    vmoid: vmo_id.id,
1969                    ..Default::default()
1970                };
1971
1972                assert_eq!(
1973                    test(
1974                        &mut fifo,
1975                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1976                    )
1977                    .await,
1978                    Err(zx::Status::IO)
1979                );
1980
1981                assert_eq!(
1982                    test(
1983                        &mut fifo,
1984                        BlockFifoRequest {
1985                            vmo_offset: 0xffff_ffff_ffff_ffff,
1986                            ..good_write_request()
1987                        }
1988                    )
1989                    .await,
1990                    Err(zx::Status::OUT_OF_RANGE)
1991                );
1992
1993                assert_eq!(
1994                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
1995                    Err(zx::Status::INVALID_ARGS)
1996                );
1997
1998                // CLOSE VMO
1999
2000                assert_eq!(
2001                    test(
2002                        &mut fifo,
2003                        BlockFifoRequest {
2004                            command: BlockFifoCommand {
2005                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2006                                ..Default::default()
2007                            },
2008                            vmoid: vmo_id.id + 1,
2009                            ..Default::default()
2010                        }
2011                    )
2012                    .await,
2013                    Err(zx::Status::IO)
2014                );
2015
2016                std::mem::drop(proxy);
2017            }
2018        );
2019    }
2020
2021    #[fuchsia::test]
2022    async fn test_concurrent_requests() {
2023        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2024
2025        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2026        let waiting_readers_clone = waiting_readers.clone();
2027
2028        futures::join!(
2029            async move {
2030                let block_server = BlockServer::new(
2031                    BLOCK_SIZE,
2032                    Arc::new(MockInterface {
2033                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2034                            let (tx, rx) = oneshot::channel();
2035                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2036                            Box::pin(async move {
2037                                let _ = rx.await;
2038                                Ok(())
2039                            })
2040                        })),
2041                        ..MockInterface::default()
2042                    }),
2043                );
2044                block_server.handle_requests(stream).await.unwrap();
2045            },
2046            async move {
2047                let (session_proxy, server) = fidl::endpoints::create_proxy();
2048
2049                proxy.open_session(server).unwrap();
2050
2051                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2052                let vmo_id = session_proxy
2053                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2054                    .await
2055                    .unwrap()
2056                    .unwrap();
2057
2058                let mut fifo =
2059                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2060                let (mut reader, mut writer) = fifo.async_io();
2061
2062                writer
2063                    .write_entries(&BlockFifoRequest {
2064                        command: BlockFifoCommand {
2065                            opcode: BlockOpcode::Read.into_primitive(),
2066                            ..Default::default()
2067                        },
2068                        reqid: 1,
2069                        dev_offset: 1, // Intentionally use the same as `reqid`.
2070                        vmoid: vmo_id.id,
2071                        length: 1,
2072                        ..Default::default()
2073                    })
2074                    .await
2075                    .unwrap();
2076
2077                writer
2078                    .write_entries(&BlockFifoRequest {
2079                        command: BlockFifoCommand {
2080                            opcode: BlockOpcode::Read.into_primitive(),
2081                            ..Default::default()
2082                        },
2083                        reqid: 2,
2084                        dev_offset: 2,
2085                        vmoid: vmo_id.id,
2086                        length: 1,
2087                        ..Default::default()
2088                    })
2089                    .await
2090                    .unwrap();
2091
2092                // Wait till both those entries are pending.
2093                poll_fn(|cx: &mut Context<'_>| {
2094                    if waiting_readers.lock().len() == 2 {
2095                        Poll::Ready(())
2096                    } else {
2097                        // Yield to the executor.
2098                        cx.waker().wake_by_ref();
2099                        Poll::Pending
2100                    }
2101                })
2102                .await;
2103
2104                let mut response = BlockFifoResponse::default();
2105                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2106
2107                let (id, tx) = waiting_readers.lock().pop().unwrap();
2108                tx.send(()).unwrap();
2109
2110                reader.read_entries(&mut response).await.unwrap();
2111                assert_eq!(response.status, zx::sys::ZX_OK);
2112                assert_eq!(response.reqid, id);
2113
2114                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2115
2116                let (id, tx) = waiting_readers.lock().pop().unwrap();
2117                tx.send(()).unwrap();
2118
2119                reader.read_entries(&mut response).await.unwrap();
2120                assert_eq!(response.status, zx::sys::ZX_OK);
2121                assert_eq!(response.reqid, id);
2122            }
2123        );
2124    }
2125
2126    #[fuchsia::test]
2127    async fn test_groups() {
2128        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2129
2130        futures::join!(
2131            async move {
2132                let block_server = BlockServer::new(
2133                    BLOCK_SIZE,
2134                    Arc::new(MockInterface {
2135                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2136                        ..MockInterface::default()
2137                    }),
2138                );
2139                block_server.handle_requests(stream).await.unwrap();
2140            },
2141            async move {
2142                let (session_proxy, server) = fidl::endpoints::create_proxy();
2143
2144                proxy.open_session(server).unwrap();
2145
2146                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2147                let vmo_id = session_proxy
2148                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2149                    .await
2150                    .unwrap()
2151                    .unwrap();
2152
2153                let mut fifo =
2154                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2155                let (mut reader, mut writer) = fifo.async_io();
2156
2157                writer
2158                    .write_entries(&BlockFifoRequest {
2159                        command: BlockFifoCommand {
2160                            opcode: BlockOpcode::Read.into_primitive(),
2161                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2162                            ..Default::default()
2163                        },
2164                        group: 1,
2165                        vmoid: vmo_id.id,
2166                        length: 1,
2167                        ..Default::default()
2168                    })
2169                    .await
2170                    .unwrap();
2171
2172                writer
2173                    .write_entries(&BlockFifoRequest {
2174                        command: BlockFifoCommand {
2175                            opcode: BlockOpcode::Read.into_primitive(),
2176                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2177                            ..Default::default()
2178                        },
2179                        reqid: 2,
2180                        group: 1,
2181                        vmoid: vmo_id.id,
2182                        length: 1,
2183                        ..Default::default()
2184                    })
2185                    .await
2186                    .unwrap();
2187
2188                let mut response = BlockFifoResponse::default();
2189                reader.read_entries(&mut response).await.unwrap();
2190                assert_eq!(response.status, zx::sys::ZX_OK);
2191                assert_eq!(response.reqid, 2);
2192                assert_eq!(response.group, 1);
2193            }
2194        );
2195    }
2196
2197    #[fuchsia::test]
2198    async fn test_group_error() {
2199        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2200
2201        let counter = Arc::new(AtomicU64::new(0));
2202        let counter_clone = counter.clone();
2203
2204        futures::join!(
2205            async move {
2206                let block_server = BlockServer::new(
2207                    BLOCK_SIZE,
2208                    Arc::new(MockInterface {
2209                        read_hook: Some(Box::new(move |_, _, _, _| {
2210                            counter_clone.fetch_add(1, Ordering::Relaxed);
2211                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2212                        })),
2213                        ..MockInterface::default()
2214                    }),
2215                );
2216                block_server.handle_requests(stream).await.unwrap();
2217            },
2218            async move {
2219                let (session_proxy, server) = fidl::endpoints::create_proxy();
2220
2221                proxy.open_session(server).unwrap();
2222
2223                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2224                let vmo_id = session_proxy
2225                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2226                    .await
2227                    .unwrap()
2228                    .unwrap();
2229
2230                let mut fifo =
2231                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2232                let (mut reader, mut writer) = fifo.async_io();
2233
2234                writer
2235                    .write_entries(&BlockFifoRequest {
2236                        command: BlockFifoCommand {
2237                            opcode: BlockOpcode::Read.into_primitive(),
2238                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2239                            ..Default::default()
2240                        },
2241                        group: 1,
2242                        vmoid: vmo_id.id,
2243                        length: 1,
2244                        ..Default::default()
2245                    })
2246                    .await
2247                    .unwrap();
2248
2249                // Wait until processed.
2250                poll_fn(|cx: &mut Context<'_>| {
2251                    if counter.load(Ordering::Relaxed) == 1 {
2252                        Poll::Ready(())
2253                    } else {
2254                        // Yield to the executor.
2255                        cx.waker().wake_by_ref();
2256                        Poll::Pending
2257                    }
2258                })
2259                .await;
2260
2261                let mut response = BlockFifoResponse::default();
2262                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2263
2264                writer
2265                    .write_entries(&BlockFifoRequest {
2266                        command: BlockFifoCommand {
2267                            opcode: BlockOpcode::Read.into_primitive(),
2268                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2269                            ..Default::default()
2270                        },
2271                        group: 1,
2272                        vmoid: vmo_id.id,
2273                        length: 1,
2274                        ..Default::default()
2275                    })
2276                    .await
2277                    .unwrap();
2278
2279                writer
2280                    .write_entries(&BlockFifoRequest {
2281                        command: BlockFifoCommand {
2282                            opcode: BlockOpcode::Read.into_primitive(),
2283                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2284                            ..Default::default()
2285                        },
2286                        reqid: 2,
2287                        group: 1,
2288                        vmoid: vmo_id.id,
2289                        length: 1,
2290                        ..Default::default()
2291                    })
2292                    .await
2293                    .unwrap();
2294
2295                reader.read_entries(&mut response).await.unwrap();
2296                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2297                assert_eq!(response.reqid, 2);
2298                assert_eq!(response.group, 1);
2299
2300                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2301
2302                // Only the first request should have been processed.
2303                assert_eq!(counter.load(Ordering::Relaxed), 1);
2304            }
2305        );
2306    }
2307
2308    #[fuchsia::test]
2309    async fn test_group_with_two_lasts() {
2310        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2311
2312        let (tx, rx) = oneshot::channel();
2313
2314        futures::join!(
2315            async move {
2316                let rx = Mutex::new(Some(rx));
2317                let block_server = BlockServer::new(
2318                    BLOCK_SIZE,
2319                    Arc::new(MockInterface {
2320                        read_hook: Some(Box::new(move |_, _, _, _| {
2321                            let rx = rx.lock().take().unwrap();
2322                            Box::pin(async {
2323                                let _ = rx.await;
2324                                Ok(())
2325                            })
2326                        })),
2327                        ..MockInterface::default()
2328                    }),
2329                );
2330                block_server.handle_requests(stream).await.unwrap();
2331            },
2332            async move {
2333                let (session_proxy, server) = fidl::endpoints::create_proxy();
2334
2335                proxy.open_session(server).unwrap();
2336
2337                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2338                let vmo_id = session_proxy
2339                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2340                    .await
2341                    .unwrap()
2342                    .unwrap();
2343
2344                let mut fifo =
2345                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2346                let (mut reader, mut writer) = fifo.async_io();
2347
2348                writer
2349                    .write_entries(&BlockFifoRequest {
2350                        command: BlockFifoCommand {
2351                            opcode: BlockOpcode::Read.into_primitive(),
2352                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2353                            ..Default::default()
2354                        },
2355                        reqid: 1,
2356                        group: 1,
2357                        vmoid: vmo_id.id,
2358                        length: 1,
2359                        ..Default::default()
2360                    })
2361                    .await
2362                    .unwrap();
2363
2364                writer
2365                    .write_entries(&BlockFifoRequest {
2366                        command: BlockFifoCommand {
2367                            opcode: BlockOpcode::Read.into_primitive(),
2368                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2369                            ..Default::default()
2370                        },
2371                        reqid: 2,
2372                        group: 1,
2373                        vmoid: vmo_id.id,
2374                        length: 1,
2375                        ..Default::default()
2376                    })
2377                    .await
2378                    .unwrap();
2379
2380                // Send an independent request to flush through the fifo.
2381                writer
2382                    .write_entries(&BlockFifoRequest {
2383                        command: BlockFifoCommand {
2384                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2385                            ..Default::default()
2386                        },
2387                        reqid: 3,
2388                        vmoid: vmo_id.id,
2389                        ..Default::default()
2390                    })
2391                    .await
2392                    .unwrap();
2393
2394                // It should succeed.
2395                let mut response = BlockFifoResponse::default();
2396                reader.read_entries(&mut response).await.unwrap();
2397                assert_eq!(response.status, zx::sys::ZX_OK);
2398                assert_eq!(response.reqid, 3);
2399
2400                // Now release the original request.
2401                tx.send(()).unwrap();
2402
2403                // The response should be for the first message tagged as last, and it should be
2404                // an error because we sent two messages with the LAST marker.
2405                let mut response = BlockFifoResponse::default();
2406                reader.read_entries(&mut response).await.unwrap();
2407                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2408                assert_eq!(response.reqid, 1);
2409                assert_eq!(response.group, 1);
2410            }
2411        );
2412    }
2413
2414    #[fuchsia::test(allow_stalls = false)]
2415    async fn test_requests_dont_block_sessions() {
2416        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2417
2418        let (tx, rx) = oneshot::channel();
2419
2420        fasync::Task::local(async move {
2421            let rx = Mutex::new(Some(rx));
2422            let block_server = BlockServer::new(
2423                BLOCK_SIZE,
2424                Arc::new(MockInterface {
2425                    read_hook: Some(Box::new(move |_, _, _, _| {
2426                        let rx = rx.lock().take().unwrap();
2427                        Box::pin(async {
2428                            let _ = rx.await;
2429                            Ok(())
2430                        })
2431                    })),
2432                    ..MockInterface::default()
2433                }),
2434            );
2435            block_server.handle_requests(stream).await.unwrap();
2436        })
2437        .detach();
2438
2439        let mut fut = pin!(async {
2440            let (session_proxy, server) = fidl::endpoints::create_proxy();
2441
2442            proxy.open_session(server).unwrap();
2443
2444            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2445            let vmo_id = session_proxy
2446                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2447                .await
2448                .unwrap()
2449                .unwrap();
2450
2451            let mut fifo =
2452                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2453            let (mut reader, mut writer) = fifo.async_io();
2454
2455            writer
2456                .write_entries(&BlockFifoRequest {
2457                    command: BlockFifoCommand {
2458                        opcode: BlockOpcode::Read.into_primitive(),
2459                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2460                        ..Default::default()
2461                    },
2462                    reqid: 1,
2463                    group: 1,
2464                    vmoid: vmo_id.id,
2465                    length: 1,
2466                    ..Default::default()
2467                })
2468                .await
2469                .unwrap();
2470
2471            let mut response = BlockFifoResponse::default();
2472            reader.read_entries(&mut response).await.unwrap();
2473            assert_eq!(response.status, zx::sys::ZX_OK);
2474        });
2475
2476        // The response won't come back until we send on `tx`.
2477        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2478
2479        let mut fut2 = pin!(proxy.get_volume_info());
2480
2481        // get_volume_info is set up to stall forever.
2482        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2483
2484        // If we now free up the first future, it should resolve; the stalled call to
2485        // get_volume_info should not block the fifo response.
2486        let _ = tx.send(());
2487
2488        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2489    }
2490
2491    #[fuchsia::test]
2492    async fn test_request_flow_control() {
2493        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2494
2495        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2496        // server will block until that happens.
2497        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2498        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2499        let event_clone = event.clone();
2500        futures::join!(
2501            async move {
2502                let block_server = BlockServer::new(
2503                    BLOCK_SIZE,
2504                    Arc::new(MockInterface {
2505                        read_hook: Some(Box::new(move |_, _, _, _| {
2506                            let event_clone = event_clone.clone();
2507                            Box::pin(async move {
2508                                if !event_clone.1.load(Ordering::SeqCst) {
2509                                    event_clone.0.listen().await;
2510                                }
2511                                Ok(())
2512                            })
2513                        })),
2514                        ..MockInterface::default()
2515                    }),
2516                );
2517                block_server.handle_requests(stream).await.unwrap();
2518            },
2519            async move {
2520                let (session_proxy, server) = fidl::endpoints::create_proxy();
2521
2522                proxy.open_session(server).unwrap();
2523
2524                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2525                let vmo_id = session_proxy
2526                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2527                    .await
2528                    .unwrap()
2529                    .unwrap();
2530
2531                let mut fifo =
2532                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2533                let (mut reader, mut writer) = fifo.async_io();
2534
2535                for i in 0..MAX_REQUESTS {
2536                    writer
2537                        .write_entries(&BlockFifoRequest {
2538                            command: BlockFifoCommand {
2539                                opcode: BlockOpcode::Read.into_primitive(),
2540                                ..Default::default()
2541                            },
2542                            reqid: (i + 1) as u32,
2543                            dev_offset: i,
2544                            vmoid: vmo_id.id,
2545                            length: 1,
2546                            ..Default::default()
2547                        })
2548                        .await
2549                        .unwrap();
2550                }
2551                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2552                    command: BlockFifoCommand {
2553                        opcode: BlockOpcode::Read.into_primitive(),
2554                        ..Default::default()
2555                    },
2556                    reqid: u32::MAX,
2557                    dev_offset: MAX_REQUESTS,
2558                    vmoid: vmo_id.id,
2559                    length: 1,
2560                    ..Default::default()
2561                })))
2562                .is_pending());
2563                // OK, let the server start to process.
2564                event.1.store(true, Ordering::SeqCst);
2565                event.0.notify(usize::MAX);
2566                // For each entry we read, make sure we can write a new one in.
2567                let mut finished_reqids = vec![];
2568                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2569                    let mut response = BlockFifoResponse::default();
2570                    reader.read_entries(&mut response).await.unwrap();
2571                    assert_eq!(response.status, zx::sys::ZX_OK);
2572                    finished_reqids.push(response.reqid);
2573                    writer
2574                        .write_entries(&BlockFifoRequest {
2575                            command: BlockFifoCommand {
2576                                opcode: BlockOpcode::Read.into_primitive(),
2577                                ..Default::default()
2578                            },
2579                            reqid: (i + 1) as u32,
2580                            dev_offset: i,
2581                            vmoid: vmo_id.id,
2582                            length: 1,
2583                            ..Default::default()
2584                        })
2585                        .await
2586                        .unwrap();
2587                }
2588                let mut response = BlockFifoResponse::default();
2589                for _ in 0..MAX_REQUESTS {
2590                    reader.read_entries(&mut response).await.unwrap();
2591                    assert_eq!(response.status, zx::sys::ZX_OK);
2592                    finished_reqids.push(response.reqid);
2593                }
2594                // Verify that we got a response for each request.  Note that we can't assume FIFO
2595                // ordering.
2596                finished_reqids.sort();
2597                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2598                let mut i = 1;
2599                for reqid in finished_reqids {
2600                    assert_eq!(reqid, i);
2601                    i += 1;
2602                }
2603            }
2604        );
2605    }
2606
2607    #[fuchsia::test]
2608    async fn test_passthrough_io_with_fixed_map() {
2609        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2610
2611        let expected_op = Arc::new(Mutex::new(None));
2612        let expected_op_clone = expected_op.clone();
2613        futures::join!(
2614            async {
2615                let block_server = BlockServer::new(
2616                    BLOCK_SIZE,
2617                    Arc::new(IoMockInterface {
2618                        return_errors: false,
2619                        do_checks: true,
2620                        expected_op: expected_op_clone,
2621                    }),
2622                );
2623                block_server.handle_requests(stream).await.unwrap();
2624            },
2625            async move {
2626                let (session_proxy, server) = fidl::endpoints::create_proxy();
2627
2628                let mapping = fblock::BlockOffsetMapping {
2629                    source_block_offset: 0,
2630                    target_block_offset: 10,
2631                    length: 20,
2632                };
2633                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2634
2635                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2636                let vmo_id = session_proxy
2637                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2638                    .await
2639                    .unwrap()
2640                    .unwrap();
2641
2642                let mut fifo =
2643                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2644                let (mut reader, mut writer) = fifo.async_io();
2645
2646                // READ
2647                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2648                writer
2649                    .write_entries(&BlockFifoRequest {
2650                        command: BlockFifoCommand {
2651                            opcode: BlockOpcode::Read.into_primitive(),
2652                            ..Default::default()
2653                        },
2654                        vmoid: vmo_id.id,
2655                        dev_offset: 1,
2656                        length: 2,
2657                        vmo_offset: 3,
2658                        ..Default::default()
2659                    })
2660                    .await
2661                    .unwrap();
2662
2663                let mut response = BlockFifoResponse::default();
2664                reader.read_entries(&mut response).await.unwrap();
2665                assert_eq!(response.status, zx::sys::ZX_OK);
2666
2667                // WRITE
2668                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2669                writer
2670                    .write_entries(&BlockFifoRequest {
2671                        command: BlockFifoCommand {
2672                            opcode: BlockOpcode::Write.into_primitive(),
2673                            ..Default::default()
2674                        },
2675                        vmoid: vmo_id.id,
2676                        dev_offset: 4,
2677                        length: 5,
2678                        vmo_offset: 6,
2679                        ..Default::default()
2680                    })
2681                    .await
2682                    .unwrap();
2683
2684                reader.read_entries(&mut response).await.unwrap();
2685                assert_eq!(response.status, zx::sys::ZX_OK);
2686
2687                // FLUSH
2688                *expected_op.lock() = Some(ExpectedOp::Flush);
2689                writer
2690                    .write_entries(&BlockFifoRequest {
2691                        command: BlockFifoCommand {
2692                            opcode: BlockOpcode::Flush.into_primitive(),
2693                            ..Default::default()
2694                        },
2695                        ..Default::default()
2696                    })
2697                    .await
2698                    .unwrap();
2699
2700                reader.read_entries(&mut response).await.unwrap();
2701                assert_eq!(response.status, zx::sys::ZX_OK);
2702
2703                // TRIM
2704                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2705                writer
2706                    .write_entries(&BlockFifoRequest {
2707                        command: BlockFifoCommand {
2708                            opcode: BlockOpcode::Trim.into_primitive(),
2709                            ..Default::default()
2710                        },
2711                        dev_offset: 7,
2712                        length: 3,
2713                        ..Default::default()
2714                    })
2715                    .await
2716                    .unwrap();
2717
2718                reader.read_entries(&mut response).await.unwrap();
2719                assert_eq!(response.status, zx::sys::ZX_OK);
2720
2721                // READ past window
2722                *expected_op.lock() = None;
2723                writer
2724                    .write_entries(&BlockFifoRequest {
2725                        command: BlockFifoCommand {
2726                            opcode: BlockOpcode::Read.into_primitive(),
2727                            ..Default::default()
2728                        },
2729                        vmoid: vmo_id.id,
2730                        dev_offset: 19,
2731                        length: 2,
2732                        vmo_offset: 3,
2733                        ..Default::default()
2734                    })
2735                    .await
2736                    .unwrap();
2737
2738                reader.read_entries(&mut response).await.unwrap();
2739                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2740
2741                std::mem::drop(proxy);
2742            }
2743        );
2744    }
2745
2746    #[fuchsia::test]
2747    fn operation_map() {
2748        fn expect_map_result(
2749            mut operation: Operation,
2750            mapping: Option<BlockOffsetMapping>,
2751            max_blocks: Option<NonZero<u32>>,
2752            expected_operations: Vec<Operation>,
2753        ) {
2754            let mut ops = vec![];
2755            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2756                ops.push(operation);
2757                operation = remainder;
2758            }
2759            ops.push(operation);
2760            assert_eq!(ops, expected_operations);
2761        }
2762
2763        // No limits
2764        expect_map_result(
2765            Operation::Read {
2766                device_block_offset: 10,
2767                block_count: 200,
2768                _unused: 0,
2769                vmo_offset: 0,
2770            },
2771            None,
2772            None,
2773            vec![Operation::Read {
2774                device_block_offset: 10,
2775                block_count: 200,
2776                _unused: 0,
2777                vmo_offset: 0,
2778            }],
2779        );
2780
2781        // Max block count
2782        expect_map_result(
2783            Operation::Read {
2784                device_block_offset: 10,
2785                block_count: 200,
2786                _unused: 0,
2787                vmo_offset: 0,
2788            },
2789            None,
2790            NonZero::new(120),
2791            vec![
2792                Operation::Read {
2793                    device_block_offset: 10,
2794                    block_count: 120,
2795                    _unused: 0,
2796                    vmo_offset: 0,
2797                },
2798                Operation::Read {
2799                    device_block_offset: 130,
2800                    block_count: 80,
2801                    _unused: 0,
2802                    vmo_offset: 120 * 512,
2803                },
2804            ],
2805        );
2806        expect_map_result(
2807            Operation::Write {
2808                device_block_offset: 10,
2809                block_count: 200,
2810                options: WriteOptions::PRE_BARRIER,
2811                vmo_offset: 0,
2812            },
2813            None,
2814            NonZero::new(120),
2815            vec![
2816                Operation::Write {
2817                    device_block_offset: 10,
2818                    block_count: 120,
2819                    options: WriteOptions::PRE_BARRIER,
2820                    vmo_offset: 0,
2821                },
2822                Operation::Write {
2823                    device_block_offset: 130,
2824                    block_count: 80,
2825                    options: WriteOptions::empty(),
2826                    vmo_offset: 120 * 512,
2827                },
2828            ],
2829        );
2830        expect_map_result(
2831            Operation::Trim { device_block_offset: 10, block_count: 200 },
2832            None,
2833            NonZero::new(120),
2834            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2835        );
2836
2837        // Remapping + Max block count
2838        expect_map_result(
2839            Operation::Read {
2840                device_block_offset: 10,
2841                block_count: 200,
2842                _unused: 0,
2843                vmo_offset: 0,
2844            },
2845            Some(BlockOffsetMapping {
2846                source_block_offset: 10,
2847                target_block_offset: 100,
2848                length: 200,
2849            }),
2850            NonZero::new(120),
2851            vec![
2852                Operation::Read {
2853                    device_block_offset: 100,
2854                    block_count: 120,
2855                    _unused: 0,
2856                    vmo_offset: 0,
2857                },
2858                Operation::Read {
2859                    device_block_offset: 220,
2860                    block_count: 80,
2861                    _unused: 0,
2862                    vmo_offset: 120 * 512,
2863                },
2864            ],
2865        );
2866        expect_map_result(
2867            Operation::Write {
2868                device_block_offset: 10,
2869                block_count: 200,
2870                options: WriteOptions::PRE_BARRIER,
2871                vmo_offset: 0,
2872            },
2873            Some(BlockOffsetMapping {
2874                source_block_offset: 10,
2875                target_block_offset: 100,
2876                length: 200,
2877            }),
2878            NonZero::new(120),
2879            vec![
2880                Operation::Write {
2881                    device_block_offset: 100,
2882                    block_count: 120,
2883                    options: WriteOptions::PRE_BARRIER,
2884                    vmo_offset: 0,
2885                },
2886                Operation::Write {
2887                    device_block_offset: 220,
2888                    block_count: 80,
2889                    options: WriteOptions::empty(),
2890                    vmo_offset: 120 * 512,
2891                },
2892            ],
2893        );
2894        expect_map_result(
2895            Operation::Trim { device_block_offset: 10, block_count: 200 },
2896            Some(BlockOffsetMapping {
2897                source_block_offset: 10,
2898                target_block_offset: 100,
2899                length: 200,
2900            }),
2901            NonZero::new(120),
2902            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2903        );
2904    }
2905}