block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::bin::Bin;
5use anyhow::{anyhow, Error};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use zx::HandleBased;
19use {
20    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn block_count(&self) -> Option<u64> {
40        match self {
41            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42            Self::Partition(PartitionInfo { block_range, .. }) => {
43                block_range.as_ref().map(|range| range.end - range.start)
44            }
45        }
46    }
47
48    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49        match self {
50            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52                max_transfer_blocks.clone()
53            }
54        }
55    }
56
57    fn max_transfer_size(&self, block_size: u32) -> u32 {
58        if let Some(max_blocks) = self.max_transfer_blocks() {
59            max_blocks.get() * block_size
60        } else {
61            MAX_TRANSFER_UNBOUNDED
62        }
63    }
64}
65
66/// Information associated with non-partition block devices.
67#[derive(Clone, Default)]
68pub struct BlockInfo {
69    pub device_flags: fblock::Flag,
70    pub block_count: u64,
71    pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74/// Information associated with a block device that is also a partition.
75#[derive(Clone, Default)]
76pub struct PartitionInfo {
77    /// The device flags reported by the underlying device.
78    pub device_flags: fblock::Flag,
79    pub max_transfer_blocks: Option<NonZero<u32>>,
80    /// If `block_range` is None, the partition is a volume and may not be contiguous.
81    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
82    /// slices and use that (along with the slice and block sizes) to determine the block count.
83    pub block_range: Option<Range<u64>>,
84    pub type_guid: [u8; 16],
85    pub instance_guid: [u8; 16],
86    pub name: String,
87    pub flags: u64,
88}
89
90/// We internally keep track of active requests, so that when the server is torn down, we can
91/// deallocate all of the resources for pending requests.
92struct ActiveRequest<S> {
93    session: S,
94    group_or_request: GroupOrRequest,
95    trace_flow_id: TraceFlowId,
96    vmo_bin_key: Option<usize>,
97    status: zx::Status,
98    count: u32,
99    req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105    fn default() -> Self {
106        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107    }
108}
109
110impl<S> ActiveRequests<S> {
111    fn complete_and_take_response(
112        &self,
113        request_id: RequestId,
114        status: zx::Status,
115    ) -> Option<(S, BlockFifoResponse)> {
116        self.0.lock().complete_and_take_response(request_id, status)
117    }
118}
119
120struct ActiveRequestsInner<S> {
121    requests: Slab<ActiveRequest<S>>,
122    vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125// Keeps track of all the requests that are currently being processed
126impl<S> ActiveRequestsInner<S> {
127    /// Completes a request.
128    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129        let group = &mut self.requests[request_id.0];
130
131        group.count = group.count.checked_sub(1).unwrap();
132        if status != zx::Status::OK && group.status == zx::Status::OK {
133            group.status = status
134        }
135
136        fuchsia_trace::duration!(
137            c"storage",
138            c"block_server::finish_transaction",
139            "request_id" => request_id.0,
140            "group_completed" => group.count == 0,
141            "status" => status.into_raw());
142        if let Some(trace_flow_id) = group.trace_flow_id {
143            fuchsia_trace::flow_step!(
144                c"storage",
145                c"block_server::finish_request",
146                trace_flow_id.get().into()
147            );
148        }
149    }
150
151    /// Takes the response if all requests are finished.
152    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153        let group = &self.requests[request_id.0];
154        match group.req_id {
155            Some(reqid) if group.count == 0 => {
156                let group = self.requests.remove(request_id.0);
157                if let Some(vmo_bin_key) = group.vmo_bin_key {
158                    self.vmo_bin.release(vmo_bin_key);
159                }
160                Some((
161                    group.session,
162                    BlockFifoResponse {
163                        status: group.status.into_raw(),
164                        reqid,
165                        group: group.group_or_request.group_id().unwrap_or(0),
166                        ..Default::default()
167                    },
168                ))
169            }
170            _ => None,
171        }
172    }
173
174    /// Competes the request and returns a response if the request group is finished.
175    fn complete_and_take_response(
176        &mut self,
177        request_id: RequestId,
178        status: zx::Status,
179    ) -> Option<(S, BlockFifoResponse)> {
180        self.complete(request_id, status);
181        self.take_response(request_id)
182    }
183}
184
185/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
186/// cbindgen:no-export
187pub struct BlockServer<SM> {
188    block_size: u32,
189    session_manager: Arc<SM>,
190}
191
192/// A single entry in `[OffsetMap]`.
193#[derive(Debug)]
194pub struct BlockOffsetMapping {
195    source_block_offset: u64,
196    target_block_offset: u64,
197    length: u64,
198}
199
200impl BlockOffsetMapping {
201    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202        blocks.0 >= self.source_block_offset
203            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204    }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208    type Error = zx::Status;
209
210    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213        Ok(Self {
214            source_block_offset: wire.source_block_offset,
215            target_block_offset: wire.target_block_offset,
216            length: wire.length,
217        })
218    }
219}
220
221/// Remaps the offset of block requests based on an internal map, and truncates long requests.
222pub struct OffsetMap {
223    mapping: Option<BlockOffsetMapping>,
224    max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228    /// An OffsetMap that remaps requests.
229    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230        Self { mapping: Some(mapping), max_transfer_blocks }
231    }
232
233    /// An OffsetMap that just enforces maximum request sizes.
234    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235        Self { mapping: None, max_transfer_blocks }
236    }
237
238    pub fn is_empty(&self) -> bool {
239        self.mapping.is_none()
240    }
241
242    fn mapping(&self) -> Option<&BlockOffsetMapping> {
243        self.mapping.as_ref()
244    }
245
246    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247        self.max_transfer_blocks
248    }
249}
250
251// Methods take Arc<Self> rather than &self because of
252// https://github.com/rust-lang/rust/issues/42940.
253pub trait SessionManager: 'static {
254    type Session;
255
256    fn on_attach_vmo(
257        self: Arc<Self>,
258        vmo: &Arc<zx::Vmo>,
259    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261    /// Creates a new session to handle `stream`.
262    /// The returned future should run until the session completes, for example when the client end
263    /// closes.
264    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
265    fn open_session(
266        self: Arc<Self>,
267        stream: fblock::SessionRequestStream,
268        offset_map: OffsetMap,
269        block_size: u32,
270    ) -> impl Future<Output = Result<(), Error>> + Send;
271
272    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
273    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275    /// Called to handle the GetVolumeInfo FIDL call.
276    fn get_volume_info(
277        &self,
278    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279    {
280        async { Err(zx::Status::NOT_SUPPORTED) }
281    }
282
283    /// Called to handle the QuerySlices FIDL call.
284    fn query_slices(
285        &self,
286        _start_slices: &[u64],
287    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288        async { Err(zx::Status::NOT_SUPPORTED) }
289    }
290
291    /// Called to handle the Shrink FIDL call.
292    fn extend(
293        &self,
294        _start_slice: u64,
295        _slice_count: u64,
296    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297        async { Err(zx::Status::NOT_SUPPORTED) }
298    }
299
300    /// Called to handle the Shrink FIDL call.
301    fn shrink(
302        &self,
303        _start_slice: u64,
304        _slice_count: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306        async { Err(zx::Status::NOT_SUPPORTED) }
307    }
308
309    /// Returns the active requests.
310    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314    type SM: SessionManager;
315
316    fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321        Self { block_size, session_manager: session_manager.into_session_manager() }
322    }
323
324    pub fn session_manager(&self) -> &SM {
325        self.session_manager.as_ref()
326    }
327
328    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
329    pub async fn handle_requests(
330        &self,
331        mut requests: fvolume::VolumeRequestStream,
332    ) -> Result<(), Error> {
333        let scope = fasync::Scope::new();
334        while let Some(request) = requests.try_next().await.unwrap() {
335            if let Some(session) = self.handle_request(request).await? {
336                scope.spawn(session.map(|_| ()));
337            }
338        }
339        scope.await;
340        Ok(())
341    }
342
343    /// Processes a partition request.  If a new session task is created in response to the request,
344    /// it is returned.
345    async fn handle_request(
346        &self,
347        request: fvolume::VolumeRequest,
348    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
349        match request {
350            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
351                Ok(info) => {
352                    let max_transfer_size = info.max_transfer_size(self.block_size);
353                    let (block_count, flags) = match info.as_ref() {
354                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
355                            (*block_count, *device_flags)
356                        }
357                        DeviceInfo::Partition(partition_info) => {
358                            let block_count = if let Some(range) =
359                                partition_info.block_range.as_ref()
360                            {
361                                range.end - range.start
362                            } else {
363                                let volume_info = self.session_manager.get_volume_info().await?;
364                                volume_info.0.slice_size * volume_info.1.partition_slice_count
365                                    / self.block_size as u64
366                            };
367                            (block_count, partition_info.device_flags)
368                        }
369                    };
370                    responder.send(Ok(&fblock::BlockInfo {
371                        block_count,
372                        block_size: self.block_size,
373                        max_transfer_size,
374                        flags,
375                    }))?;
376                }
377                Err(status) => responder.send(Err(status.into_raw()))?,
378            },
379            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
380                match self.device_info().await {
381                    Ok(info) => {
382                        return Ok(Some(self.session_manager.clone().open_session(
383                            session.into_stream(),
384                            OffsetMap::empty(info.max_transfer_blocks()),
385                            self.block_size,
386                        )));
387                    }
388                    Err(status) => session.close_with_epitaph(status)?,
389                }
390            }
391            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
392                session,
393                mapping,
394                control_handle: _,
395            } => match self.device_info().await {
396                Ok(info) => {
397                    let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
398                        Ok(m) => m,
399                        Err(status) => {
400                            session.close_with_epitaph(status)?;
401                            return Ok(None);
402                        }
403                    };
404                    if let Some(max) = info.block_count() {
405                        if initial_mapping.target_block_offset + initial_mapping.length > max {
406                            log::warn!(
407                                "Invalid mapping for session: {initial_mapping:?} (max {max})"
408                            );
409                            session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
410                            return Ok(None);
411                        }
412                    }
413                    return Ok(Some(self.session_manager.clone().open_session(
414                        session.into_stream(),
415                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
416                        self.block_size,
417                    )));
418                }
419                Err(status) => session.close_with_epitaph(status)?,
420            },
421            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
422                Ok(info) => {
423                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
424                        let mut guid =
425                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
426                        guid.value.copy_from_slice(&partition_info.type_guid);
427                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
428                    } else {
429                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
430                    }
431                }
432                Err(status) => {
433                    responder.send(status.into_raw(), None)?;
434                }
435            },
436            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
437                match self.device_info().await {
438                    Ok(info) => {
439                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
440                            let mut guid =
441                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
442                            guid.value.copy_from_slice(&partition_info.instance_guid);
443                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
444                        } else {
445                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
446                        }
447                    }
448                    Err(status) => {
449                        responder.send(status.into_raw(), None)?;
450                    }
451                }
452            }
453            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
454                Ok(info) => {
455                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
456                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
457                    } else {
458                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
459                    }
460                }
461                Err(status) => {
462                    responder.send(status.into_raw(), None)?;
463                }
464            },
465            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
466                Ok(info) => {
467                    if let DeviceInfo::Partition(info) = info.as_ref() {
468                        let mut type_guid =
469                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
470                        type_guid.value.copy_from_slice(&info.type_guid);
471                        let mut instance_guid =
472                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
473                        instance_guid.value.copy_from_slice(&info.instance_guid);
474                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
475                            name: Some(info.name.clone()),
476                            type_guid: Some(type_guid),
477                            instance_guid: Some(instance_guid),
478                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
479                            num_blocks: info
480                                .block_range
481                                .as_ref()
482                                .map(|range| range.end - range.start),
483                            flags: Some(info.flags),
484                            ..Default::default()
485                        }))?;
486                    }
487                }
488                Err(status) => responder.send(Err(status.into_raw()))?,
489            },
490            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
491                match self.session_manager.query_slices(&start_slices).await {
492                    Ok(mut results) => {
493                        let results_len = results.len();
494                        assert!(results_len <= 16);
495                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
496                        responder.send(
497                            zx::sys::ZX_OK,
498                            &results.try_into().unwrap(),
499                            results_len as u64,
500                        )?;
501                    }
502                    Err(s) => {
503                        responder.send(
504                            s.into_raw(),
505                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
506                            0,
507                        )?;
508                    }
509                }
510            }
511            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
512                match self.session_manager.get_volume_info().await {
513                    Ok((manager_info, volume_info)) => {
514                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
515                    }
516                    Err(s) => responder.send(s.into_raw(), None, None)?,
517                }
518            }
519            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
520                responder.send(
521                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
522                        .into_raw(),
523                )?;
524            }
525            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
526                responder.send(
527                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
528                        .into_raw(),
529                )?;
530            }
531            fvolume::VolumeRequest::Destroy { responder, .. } => {
532                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
533            }
534        }
535        Ok(None)
536    }
537
538    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
539        self.session_manager.get_info().await
540    }
541}
542
543struct SessionHelper<SM: SessionManager> {
544    session_manager: Arc<SM>,
545    offset_map: OffsetMap,
546    block_size: u32,
547    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
548    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
549}
550
551impl<SM: SessionManager> SessionHelper<SM> {
552    fn new(
553        session_manager: Arc<SM>,
554        offset_map: OffsetMap,
555        block_size: u32,
556    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
557        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
558        Ok((
559            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
560            fifo,
561        ))
562    }
563
564    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
565        match request {
566            fblock::SessionRequest::GetFifo { responder } => {
567                let rights = zx::Rights::TRANSFER
568                    | zx::Rights::READ
569                    | zx::Rights::WRITE
570                    | zx::Rights::SIGNAL
571                    | zx::Rights::WAIT;
572                match self.peer_fifo.duplicate_handle(rights) {
573                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
574                    Err(s) => responder.send(Err(s.into_raw()))?,
575                }
576                Ok(())
577            }
578            fblock::SessionRequest::AttachVmo { vmo, responder } => {
579                let vmo = Arc::new(vmo);
580                let vmo_id = {
581                    let mut vmos = self.vmos.lock();
582                    if vmos.len() == u16::MAX as usize {
583                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
584                        return Ok(());
585                    } else {
586                        let vmo_id = match vmos.last_entry() {
587                            None => 1,
588                            Some(o) => {
589                                o.key().checked_add(1).unwrap_or_else(|| {
590                                    let mut vmo_id = 1;
591                                    // Find the first gap...
592                                    for (&id, _) in &*vmos {
593                                        if id > vmo_id {
594                                            break;
595                                        }
596                                        vmo_id = id + 1;
597                                    }
598                                    vmo_id
599                                })
600                            }
601                        };
602                        vmos.insert(vmo_id, vmo.clone());
603                        vmo_id
604                    }
605                };
606                self.session_manager.clone().on_attach_vmo(&vmo).await?;
607                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
608                Ok(())
609            }
610            fblock::SessionRequest::Close { responder } => {
611                responder.send(Ok(()))?;
612                Err(anyhow!("Closed"))
613            }
614        }
615    }
616
617    /// Decodes `request`.
618    fn decode_fifo_request(
619        &self,
620        session: SM::Session,
621        request: &BlockFifoRequest,
622    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
623        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
624        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
625            .ok_or(zx::Status::INVALID_ARGS)
626            .and_then(|code| {
627                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
628                    if request.length == 0 {
629                        return Err(zx::Status::INVALID_ARGS);
630                    }
631                    // Make sure the end offsets won't wrap.
632                    if request.dev_offset.checked_add(request.length as u64).is_none()
633                        || (code != BlockOpcode::Trim
634                            && (request.length as u64 * self.block_size as u64)
635                                .checked_add(request.vmo_offset)
636                                .is_none())
637                    {
638                        return Err(zx::Status::OUT_OF_RANGE);
639                    }
640                }
641                Ok(match code {
642                    BlockOpcode::Read => Operation::Read {
643                        device_block_offset: request.dev_offset,
644                        block_count: request.length,
645                        _unused: 0,
646                        vmo_offset: request
647                            .vmo_offset
648                            .checked_mul(self.block_size as u64)
649                            .ok_or(zx::Status::OUT_OF_RANGE)?,
650                    },
651                    BlockOpcode::Write => {
652                        let mut options = WriteOptions::empty();
653                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
654                            options |= WriteOptions::FORCE_ACCESS;
655                        }
656                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
657                            options |= WriteOptions::PRE_BARRIER;
658                        }
659                        Operation::Write {
660                            device_block_offset: request.dev_offset,
661                            block_count: request.length,
662                            options: options,
663                            vmo_offset: request
664                                .vmo_offset
665                                .checked_mul(self.block_size as u64)
666                                .ok_or(zx::Status::OUT_OF_RANGE)?,
667                        }
668                    }
669                    BlockOpcode::Flush => Operation::Flush,
670                    BlockOpcode::Trim => Operation::Trim {
671                        device_block_offset: request.dev_offset,
672                        block_count: request.length,
673                    },
674                    BlockOpcode::CloseVmo => Operation::CloseVmo,
675                })
676            });
677
678        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
679            GroupOrRequest::Group(request.group)
680        } else {
681            GroupOrRequest::Request(request.reqid)
682        };
683
684        let mut active_requests = self.session_manager.active_requests().0.lock();
685        let mut request_id = None;
686
687        // Multiple Block I/O request may be sent as a group.
688        // Notes:
689        // - the group is identified by the group id in the request
690        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
691        //   flag is set.
692        // - when processing a request of a group fails, subsequent requests of that
693        //   group will not be processed.
694        //
695        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
696        if group_or_request.is_group() {
697            // Search for an existing entry that matches this group.  NOTE: This is a potentially
698            // expensive way to find a group (it's iterating over all slots in the active-requests
699            // slab).  This can be optimised easily should we need to.
700            for (key, group) in &mut active_requests.requests {
701                if group.group_or_request == group_or_request {
702                    if group.req_id.is_some() {
703                        // We have already received a request tagged as last.
704                        if group.status == zx::Status::OK {
705                            group.status = zx::Status::INVALID_ARGS;
706                        }
707                        // Ignore this request.
708                        return Err(None);
709                    }
710                    if flags.contains(BlockIoFlag::GROUP_LAST) {
711                        group.req_id = Some(request.reqid);
712                        // If the group has had an error, there is no point trying to issue this
713                        // request.
714                        if group.status != zx::Status::OK {
715                            operation = Err(group.status);
716                        }
717                    } else if group.status != zx::Status::OK {
718                        // The group has already encountered an error, so there is no point trying
719                        // to issue this request.
720                        return Err(None);
721                    }
722                    request_id = Some(RequestId(key));
723                    group.count += 1;
724                    break;
725                }
726            }
727        }
728
729        let mut retain_vmo = false;
730        let vmo = match &operation {
731            Ok(Operation::Read { .. } | Operation::Write { .. }) => {
732                self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
733                    retain_vmo = true;
734                    Ok(Some(vmo))
735                })
736            }
737            Ok(Operation::CloseVmo) => {
738                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
739                    active_requests.vmo_bin.add(vmo.clone());
740                    Ok(Some(vmo))
741                })
742            }
743            _ => Ok(None),
744        }
745        .unwrap_or_else(|e| {
746            operation = Err(e);
747            None
748        });
749
750        let trace_flow_id = NonZero::new(request.trace_flow_id);
751        let request_id = request_id.unwrap_or_else(|| {
752            let vmo_bin_key =
753                if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
754            RequestId(active_requests.requests.insert(ActiveRequest {
755                session,
756                group_or_request,
757                trace_flow_id,
758                vmo_bin_key,
759                status: zx::Status::OK,
760                count: 1,
761                req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
762                    || flags.contains(BlockIoFlag::GROUP_LAST)
763                {
764                    Some(request.reqid)
765                } else {
766                    None
767                },
768            }))
769        });
770
771        Ok(DecodedRequest {
772            request_id,
773            trace_flow_id,
774            operation: operation.map_err(|status| {
775                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
776            })?,
777            vmo,
778        })
779    }
780
781    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
782        std::mem::take(&mut *self.vmos.lock())
783    }
784
785    /// Maps the request and returns the mapped request with an optional remainder.
786    fn map_request(
787        &self,
788        mut request: DecodedRequest,
789    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
790        let mut active_requests = self.session_manager.active_requests().0.lock();
791        let active_request = &mut active_requests.requests[request.request_id.0];
792        if active_request.status != zx::Status::OK {
793            return Err(active_requests
794                .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
795                .map(|(_, r)| r));
796        }
797        let mapping = self.offset_map.mapping();
798        match (mapping, request.operation.blocks()) {
799            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
800                return Err(active_requests
801                    .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
802                    .map(|(_, r)| r));
803            }
804            _ => {}
805        }
806        let remainder = request.operation.map(
807            self.offset_map.mapping(),
808            self.offset_map.max_transfer_blocks(),
809            self.block_size,
810        );
811        if remainder.is_some() {
812            active_request.count += 1;
813        }
814        static CACHE: AtomicU64 = AtomicU64::new(0);
815        if let Some(context) =
816            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
817        {
818            use fuchsia_trace::ArgValue;
819            let trace_args = [
820                ArgValue::of("request_id", request.request_id.0),
821                ArgValue::of("opcode", request.operation.trace_label()),
822            ];
823            let _scope = fuchsia_trace::duration(
824                c"storage",
825                c"block_server::start_transaction",
826                &trace_args,
827            );
828            if let Some(trace_flow_id) = active_request.trace_flow_id {
829                fuchsia_trace::flow_step(
830                    &context,
831                    c"block_server::start_transaction",
832                    trace_flow_id.get().into(),
833                    &[],
834                );
835            }
836        }
837        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
838        Ok((request, remainder))
839    }
840
841    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
842        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
843    }
844}
845
846#[repr(transparent)]
847#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
848pub struct RequestId(usize);
849
850#[derive(Clone, Debug)]
851struct DecodedRequest {
852    request_id: RequestId,
853    trace_flow_id: TraceFlowId,
854    operation: Operation,
855    vmo: Option<Arc<zx::Vmo>>,
856}
857
858/// cbindgen:no-export
859pub type WriteOptions = block_protocol::WriteOptions;
860
861#[repr(C)]
862#[derive(Clone, Debug, PartialEq, Eq)]
863pub enum Operation {
864    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
865    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
866    // code can read `device_block_offset` from the read variant and then assume it's valid for the
867    // write variant).
868    Read {
869        device_block_offset: u64,
870        block_count: u32,
871        _unused: u32,
872        vmo_offset: u64,
873    },
874    Write {
875        device_block_offset: u64,
876        block_count: u32,
877        options: WriteOptions,
878        vmo_offset: u64,
879    },
880    Flush,
881    Trim {
882        device_block_offset: u64,
883        block_count: u32,
884    },
885    /// This will never be seen by the C interface.
886    CloseVmo,
887}
888
889impl Operation {
890    fn trace_label(&self) -> &'static str {
891        match self {
892            Operation::Read { .. } => "read",
893            Operation::Write { .. } => "write",
894            Operation::Flush { .. } => "flush",
895            Operation::Trim { .. } => "trim",
896            Operation::CloseVmo { .. } => "close_vmo",
897        }
898    }
899
900    /// Returns (offset, length).
901    fn blocks(&self) -> Option<(u64, u32)> {
902        match self {
903            Operation::Read { device_block_offset, block_count, .. }
904            | Operation::Write { device_block_offset, block_count, .. }
905            | Operation::Trim { device_block_offset, block_count, .. } => {
906                Some((*device_block_offset, *block_count))
907            }
908            _ => None,
909        }
910    }
911
912    /// Returns mutable references to (offset, length).
913    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
914        match self {
915            Operation::Read { device_block_offset, block_count, .. }
916            | Operation::Write { device_block_offset, block_count, .. }
917            | Operation::Trim { device_block_offset, block_count, .. } => {
918                Some((device_block_offset, block_count))
919            }
920            _ => None,
921        }
922    }
923
924    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
925    /// start of the operation.
926    fn map(
927        &mut self,
928        mapping: Option<&BlockOffsetMapping>,
929        max_blocks: Option<NonZero<u32>>,
930        block_size: u32,
931    ) -> Option<Self> {
932        let mut max = match self {
933            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
934            _ => None,
935        };
936        let (offset, length) = self.blocks_mut()?;
937        let orig_offset = *offset;
938        if let Some(mapping) = mapping {
939            let delta = *offset - mapping.source_block_offset;
940            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
941            *offset = mapping.target_block_offset + delta;
942            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
943            max = match max {
944                None => Some(mapping_max),
945                Some(m) => Some(std::cmp::min(m, mapping_max)),
946            };
947        };
948        if let Some(max) = max {
949            if *length as u64 > max {
950                let rem = (*length as u64 - max) as u32;
951                *length = max as u32;
952                return Some(match self {
953                    Operation::Read {
954                        device_block_offset: _,
955                        block_count: _,
956                        vmo_offset,
957                        _unused,
958                    } => Operation::Read {
959                        device_block_offset: orig_offset + max,
960                        block_count: rem,
961                        vmo_offset: *vmo_offset + max * block_size as u64,
962                        _unused: *_unused,
963                    },
964                    Operation::Write {
965                        device_block_offset: _,
966                        block_count: _,
967                        vmo_offset,
968                        options,
969                    } => {
970                        // Only send the barrier flag once per write request.
971                        let options = *options & !WriteOptions::PRE_BARRIER;
972                        Operation::Write {
973                            device_block_offset: orig_offset + max,
974                            block_count: rem,
975                            vmo_offset: *vmo_offset + max * block_size as u64,
976                            options,
977                        }
978                    }
979                    Operation::Trim { device_block_offset: _, block_count: _ } => {
980                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
981                    }
982                    _ => unreachable!(),
983                });
984            }
985        }
986        None
987    }
988}
989
990#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
991pub enum GroupOrRequest {
992    Group(u16),
993    Request(u32),
994}
995
996impl GroupOrRequest {
997    fn is_group(&self) -> bool {
998        matches!(self, Self::Group(_))
999    }
1000
1001    fn group_id(&self) -> Option<u16> {
1002        match self {
1003            Self::Group(id) => Some(*id),
1004            Self::Request(_) => None,
1005        }
1006    }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::{
1012        BlockOffsetMapping, BlockServer, DeviceInfo, Operation, PartitionInfo, TraceFlowId,
1013        FIFO_MAX_REQUESTS,
1014    };
1015    use assert_matches::assert_matches;
1016    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
1017    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1018    use fuchsia_sync::Mutex;
1019    use futures::channel::oneshot;
1020    use futures::future::BoxFuture;
1021    use futures::FutureExt as _;
1022    use std::borrow::Cow;
1023    use std::future::poll_fn;
1024    use std::num::NonZero;
1025    use std::pin::pin;
1026    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1027    use std::sync::Arc;
1028    use std::task::{Context, Poll};
1029    use zx::{AsHandleRef as _, HandleBased as _};
1030    use {
1031        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1032        fuchsia_async as fasync,
1033    };
1034
1035    #[derive(Default)]
1036    struct MockInterface {
1037        read_hook: Option<
1038            Box<
1039                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1040                    + Send
1041                    + Sync,
1042            >,
1043        >,
1044        write_hook:
1045            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1046        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1047    }
1048
1049    impl super::async_interface::Interface for MockInterface {
1050        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1051            Ok(())
1052        }
1053
1054        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1055            Ok(Cow::Owned(test_device_info()))
1056        }
1057
1058        async fn read(
1059            &self,
1060            device_block_offset: u64,
1061            block_count: u32,
1062            vmo: &Arc<zx::Vmo>,
1063            vmo_offset: u64,
1064            _trace_flow_id: TraceFlowId,
1065        ) -> Result<(), zx::Status> {
1066            if let Some(read_hook) = &self.read_hook {
1067                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1068            } else {
1069                unimplemented!();
1070            }
1071        }
1072
1073        async fn write(
1074            &self,
1075            device_block_offset: u64,
1076            _block_count: u32,
1077            _vmo: &Arc<zx::Vmo>,
1078            _vmo_offset: u64,
1079            _opts: WriteOptions,
1080            _trace_flow_id: TraceFlowId,
1081        ) -> Result<(), zx::Status> {
1082            if let Some(write_hook) = &self.write_hook {
1083                write_hook(device_block_offset).await
1084            } else {
1085                unimplemented!();
1086            }
1087        }
1088
1089        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1090            unreachable!();
1091        }
1092
1093        fn barrier(&self) -> Result<(), zx::Status> {
1094            if let Some(barrier_hook) = &self.barrier_hook {
1095                barrier_hook()
1096            } else {
1097                unimplemented!();
1098            }
1099        }
1100
1101        async fn trim(
1102            &self,
1103            _device_block_offset: u64,
1104            _block_count: u32,
1105            _trace_flow_id: TraceFlowId,
1106        ) -> Result<(), zx::Status> {
1107            unreachable!();
1108        }
1109
1110        async fn get_volume_info(
1111            &self,
1112        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1113            // Hang forever for the test_requests_dont_block_sessions test.
1114            let () = std::future::pending().await;
1115            unreachable!();
1116        }
1117    }
1118
1119    const BLOCK_SIZE: u32 = 512;
1120    const MAX_TRANSFER_BLOCKS: u32 = 10;
1121
1122    fn test_device_info() -> DeviceInfo {
1123        DeviceInfo::Partition(PartitionInfo {
1124            device_flags: fblock::Flag::READONLY,
1125            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1126            block_range: Some(0..100),
1127            type_guid: [1; 16],
1128            instance_guid: [2; 16],
1129            name: "foo".to_string(),
1130            flags: 0xabcd,
1131        })
1132    }
1133
1134    #[fuchsia::test]
1135    async fn test_split_request_only_issues_single_barrier() {
1136        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1137        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1138        let barrier_called = Arc::new(AtomicU32::new(0));
1139        let barrier_called_clone = barrier_called.clone();
1140
1141        futures::join!(
1142            async move {
1143                let block_server = BlockServer::new(
1144                    BLOCK_SIZE,
1145                    Arc::new(MockInterface {
1146                        barrier_hook: Some(Box::new(move || {
1147                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1148                            Ok(())
1149                        })),
1150                        write_hook: Some(Box::new(move |_device_block_offset| {
1151                            Box::pin(async move { Ok(()) })
1152                        })),
1153                        ..MockInterface::default()
1154                    }),
1155                );
1156                block_server.handle_requests(stream).await.unwrap();
1157            },
1158            async move {
1159                let (session_proxy, server) = fidl::endpoints::create_proxy();
1160
1161                proxy.open_session(server).unwrap();
1162
1163                let vmo_id = session_proxy
1164                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1165                    .await
1166                    .unwrap()
1167                    .unwrap();
1168                assert_ne!(vmo_id.id, 0);
1169
1170                let mut fifo =
1171                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1172                let (mut reader, mut writer) = fifo.async_io();
1173                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1174                // sub requests.
1175                writer
1176                    .write_entries(&BlockFifoRequest {
1177                        command: BlockFifoCommand {
1178                            opcode: BlockOpcode::Write.into_primitive(),
1179                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1180                            ..Default::default()
1181                        },
1182                        vmoid: vmo_id.id,
1183                        dev_offset: 0,
1184                        length: MAX_TRANSFER_BLOCKS + 1,
1185                        vmo_offset: 6,
1186                        ..Default::default()
1187                    })
1188                    .await
1189                    .unwrap();
1190
1191                let mut response = BlockFifoResponse::default();
1192                reader.read_entries(&mut response).await.unwrap();
1193                assert_eq!(response.status, zx::sys::ZX_OK);
1194
1195                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1196
1197                std::mem::drop(proxy);
1198            }
1199        );
1200    }
1201
1202    #[fuchsia::test]
1203    async fn test_barriers_not_supported() {
1204        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1205        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1206
1207        futures::join!(
1208            async move {
1209                let block_server = BlockServer::new(
1210                    BLOCK_SIZE,
1211                    Arc::new(MockInterface {
1212                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1213                        write_hook: Some(Box::new(move |_device_block_offset| {
1214                            Box::pin(async move { Ok(()) })
1215                        })),
1216                        ..MockInterface::default()
1217                    }),
1218                );
1219                block_server.handle_requests(stream).await.unwrap();
1220            },
1221            async move {
1222                let (session_proxy, server) = fidl::endpoints::create_proxy();
1223
1224                proxy.open_session(server).unwrap();
1225
1226                let vmo_id = session_proxy
1227                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1228                    .await
1229                    .unwrap()
1230                    .unwrap();
1231                assert_ne!(vmo_id.id, 0);
1232
1233                let mut fifo =
1234                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1235                let (mut reader, mut writer) = fifo.async_io();
1236
1237                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1238                // sub requests.
1239                writer
1240                    .write_entries(&BlockFifoRequest {
1241                        command: BlockFifoCommand {
1242                            opcode: BlockOpcode::Write.into_primitive(),
1243                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1244                            ..Default::default()
1245                        },
1246                        vmoid: vmo_id.id,
1247                        dev_offset: 0,
1248                        length: MAX_TRANSFER_BLOCKS + 1,
1249                        vmo_offset: 6,
1250                        ..Default::default()
1251                    })
1252                    .await
1253                    .unwrap();
1254
1255                let mut response = BlockFifoResponse::default();
1256                reader.read_entries(&mut response).await.unwrap();
1257                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1258
1259                std::mem::drop(proxy);
1260            }
1261        );
1262    }
1263
1264    #[fuchsia::test]
1265    async fn test_barriers_ordering() {
1266        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1267        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1268        let barrier_called = Arc::new(AtomicBool::new(false));
1269
1270        futures::join!(
1271            async move {
1272                let barrier_called_clone = barrier_called.clone();
1273                let block_server = BlockServer::new(
1274                    BLOCK_SIZE,
1275                    Arc::new(MockInterface {
1276                        barrier_hook: Some(Box::new(move || {
1277                            barrier_called.store(true, Ordering::Relaxed);
1278                            Ok(())
1279                        })),
1280                        write_hook: Some(Box::new(move |device_block_offset| {
1281                            let barrier_called = barrier_called_clone.clone();
1282                            Box::pin(async move {
1283                                // The sleep allows the server to reorder the fifo requests.
1284                                if device_block_offset % 2 == 0 {
1285                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1286                                        zx::MonotonicDuration::from_millis(200),
1287                                    ))
1288                                    .await;
1289                                }
1290                                assert!(barrier_called.load(Ordering::Relaxed));
1291                                Ok(())
1292                            })
1293                        })),
1294                        ..MockInterface::default()
1295                    }),
1296                );
1297                block_server.handle_requests(stream).await.unwrap();
1298            },
1299            async move {
1300                let (session_proxy, server) = fidl::endpoints::create_proxy();
1301
1302                proxy.open_session(server).unwrap();
1303
1304                let vmo_id = session_proxy
1305                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1306                    .await
1307                    .unwrap()
1308                    .unwrap();
1309                assert_ne!(vmo_id.id, 0);
1310
1311                let mut fifo =
1312                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1313                let (mut reader, mut writer) = fifo.async_io();
1314
1315                writer
1316                    .write_entries(&BlockFifoRequest {
1317                        command: BlockFifoCommand {
1318                            opcode: BlockOpcode::Write.into_primitive(),
1319                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1320                            ..Default::default()
1321                        },
1322                        vmoid: vmo_id.id,
1323                        dev_offset: 0,
1324                        length: 5,
1325                        vmo_offset: 6,
1326                        ..Default::default()
1327                    })
1328                    .await
1329                    .unwrap();
1330
1331                for i in 0..10 {
1332                    writer
1333                        .write_entries(&BlockFifoRequest {
1334                            command: BlockFifoCommand {
1335                                opcode: BlockOpcode::Write.into_primitive(),
1336                                ..Default::default()
1337                            },
1338                            vmoid: vmo_id.id,
1339                            dev_offset: i + 1,
1340                            length: 5,
1341                            vmo_offset: 6,
1342                            ..Default::default()
1343                        })
1344                        .await
1345                        .unwrap();
1346                }
1347                for _ in 0..11 {
1348                    let mut response = BlockFifoResponse::default();
1349                    reader.read_entries(&mut response).await.unwrap();
1350                    assert_eq!(response.status, zx::sys::ZX_OK);
1351                }
1352
1353                std::mem::drop(proxy);
1354            }
1355        );
1356    }
1357
1358    #[fuchsia::test]
1359    async fn test_info() {
1360        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1361
1362        futures::join!(
1363            async {
1364                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1365                block_server.handle_requests(stream).await.unwrap();
1366            },
1367            async {
1368                let expected_info = test_device_info();
1369                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1370                    info
1371                } else {
1372                    unreachable!()
1373                };
1374
1375                let block_info = proxy.get_info().await.unwrap().unwrap();
1376                assert_eq!(
1377                    block_info.block_count,
1378                    partition_info.block_range.as_ref().unwrap().end
1379                        - partition_info.block_range.as_ref().unwrap().start
1380                );
1381                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1382
1383                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1384
1385                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1386                assert_eq!(status, zx::sys::ZX_OK);
1387                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1388
1389                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1390                assert_eq!(status, zx::sys::ZX_OK);
1391                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1392
1393                let (status, name) = proxy.get_name().await.unwrap();
1394                assert_eq!(status, zx::sys::ZX_OK);
1395                assert_eq!(name.as_ref(), Some(&partition_info.name));
1396
1397                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1398                assert_eq!(metadata.name, name);
1399                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1400                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1401                assert_eq!(
1402                    metadata.start_block_offset,
1403                    Some(partition_info.block_range.as_ref().unwrap().start)
1404                );
1405                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1406                assert_eq!(metadata.flags, Some(partition_info.flags));
1407
1408                std::mem::drop(proxy);
1409            }
1410        );
1411    }
1412
1413    #[fuchsia::test]
1414    async fn test_attach_vmo() {
1415        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1416
1417        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1418        let koid = vmo.get_koid().unwrap();
1419
1420        futures::join!(
1421            async {
1422                let block_server = BlockServer::new(
1423                    BLOCK_SIZE,
1424                    Arc::new(MockInterface {
1425                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1426                            assert_eq!(vmo.get_koid().unwrap(), koid);
1427                            Box::pin(async { Ok(()) })
1428                        })),
1429                        ..MockInterface::default()
1430                    }),
1431                );
1432                block_server.handle_requests(stream).await.unwrap();
1433            },
1434            async move {
1435                let (session_proxy, server) = fidl::endpoints::create_proxy();
1436
1437                proxy.open_session(server).unwrap();
1438
1439                let vmo_id = session_proxy
1440                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1441                    .await
1442                    .unwrap()
1443                    .unwrap();
1444                assert_ne!(vmo_id.id, 0);
1445
1446                let mut fifo =
1447                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1448                let (mut reader, mut writer) = fifo.async_io();
1449
1450                // Keep attaching VMOs until we eventually hit the maximum.
1451                let mut count = 1;
1452                loop {
1453                    match session_proxy
1454                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1455                        .await
1456                        .unwrap()
1457                    {
1458                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1459                        Err(e) => {
1460                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1461                            break;
1462                        }
1463                    }
1464
1465                    // Only test every 10 to keep test time down.
1466                    if count % 10 == 0 {
1467                        writer
1468                            .write_entries(&BlockFifoRequest {
1469                                command: BlockFifoCommand {
1470                                    opcode: BlockOpcode::Read.into_primitive(),
1471                                    ..Default::default()
1472                                },
1473                                vmoid: vmo_id.id,
1474                                length: 1,
1475                                ..Default::default()
1476                            })
1477                            .await
1478                            .unwrap();
1479
1480                        let mut response = BlockFifoResponse::default();
1481                        reader.read_entries(&mut response).await.unwrap();
1482                        assert_eq!(response.status, zx::sys::ZX_OK);
1483                    }
1484
1485                    count += 1;
1486                }
1487
1488                assert_eq!(count, u16::MAX as u64);
1489
1490                // Detach the original VMO, and make sure we can then attach another one.
1491                writer
1492                    .write_entries(&BlockFifoRequest {
1493                        command: BlockFifoCommand {
1494                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1495                            ..Default::default()
1496                        },
1497                        vmoid: vmo_id.id,
1498                        ..Default::default()
1499                    })
1500                    .await
1501                    .unwrap();
1502
1503                let mut response = BlockFifoResponse::default();
1504                reader.read_entries(&mut response).await.unwrap();
1505                assert_eq!(response.status, zx::sys::ZX_OK);
1506
1507                let new_vmo_id = session_proxy
1508                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1509                    .await
1510                    .unwrap()
1511                    .unwrap();
1512                // It should reuse the same ID.
1513                assert_eq!(new_vmo_id.id, vmo_id.id);
1514
1515                std::mem::drop(proxy);
1516            }
1517        );
1518    }
1519
1520    #[fuchsia::test]
1521    async fn test_close() {
1522        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1523
1524        let mut server = std::pin::pin!(async {
1525            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1526            block_server.handle_requests(stream).await.unwrap();
1527        }
1528        .fuse());
1529
1530        let mut client = std::pin::pin!(async {
1531            let (session_proxy, server) = fidl::endpoints::create_proxy();
1532
1533            proxy.open_session(server).unwrap();
1534
1535            // Dropping the proxy should not cause the session to terminate because the session is
1536            // still live.
1537            std::mem::drop(proxy);
1538
1539            session_proxy.close().await.unwrap().unwrap();
1540
1541            // Keep the session alive.  Calling `close` should cause the server to terminate.
1542            let _: () = std::future::pending().await;
1543        }
1544        .fuse());
1545
1546        futures::select!(
1547            _ = server => {}
1548            _ = client => unreachable!(),
1549        );
1550    }
1551
1552    #[derive(Default)]
1553    struct IoMockInterface {
1554        do_checks: bool,
1555        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1556        return_errors: bool,
1557    }
1558
1559    #[derive(Debug)]
1560    enum ExpectedOp {
1561        Read(u64, u32, u64),
1562        Write(u64, u32, u64),
1563        Trim(u64, u32),
1564        Flush,
1565    }
1566
1567    impl super::async_interface::Interface for IoMockInterface {
1568        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1569            Ok(())
1570        }
1571
1572        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1573            Ok(Cow::Owned(test_device_info()))
1574        }
1575
1576        async fn read(
1577            &self,
1578            device_block_offset: u64,
1579            block_count: u32,
1580            _vmo: &Arc<zx::Vmo>,
1581            vmo_offset: u64,
1582            _trace_flow_id: TraceFlowId,
1583        ) -> Result<(), zx::Status> {
1584            if self.return_errors {
1585                Err(zx::Status::INTERNAL)
1586            } else {
1587                if self.do_checks {
1588                    assert_matches!(
1589                        self.expected_op.lock().take(),
1590                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1591                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1592                        "Read {device_block_offset} {block_count} {vmo_offset}"
1593                    );
1594                }
1595                Ok(())
1596            }
1597        }
1598
1599        async fn write(
1600            &self,
1601            device_block_offset: u64,
1602            block_count: u32,
1603            _vmo: &Arc<zx::Vmo>,
1604            vmo_offset: u64,
1605            _opts: WriteOptions,
1606            _trace_flow_id: TraceFlowId,
1607        ) -> Result<(), zx::Status> {
1608            if self.return_errors {
1609                Err(zx::Status::NOT_SUPPORTED)
1610            } else {
1611                if self.do_checks {
1612                    assert_matches!(
1613                        self.expected_op.lock().take(),
1614                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1615                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1616                        "Write {device_block_offset} {block_count} {vmo_offset}"
1617                    );
1618                }
1619                Ok(())
1620            }
1621        }
1622
1623        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1624            if self.return_errors {
1625                Err(zx::Status::NO_RESOURCES)
1626            } else {
1627                if self.do_checks {
1628                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1629                }
1630                Ok(())
1631            }
1632        }
1633
1634        fn barrier(&self) -> Result<(), zx::Status> {
1635            unreachable!()
1636        }
1637
1638        async fn trim(
1639            &self,
1640            device_block_offset: u64,
1641            block_count: u32,
1642            _trace_flow_id: TraceFlowId,
1643        ) -> Result<(), zx::Status> {
1644            if self.return_errors {
1645                Err(zx::Status::NO_MEMORY)
1646            } else {
1647                if self.do_checks {
1648                    assert_matches!(
1649                        self.expected_op.lock().take(),
1650                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1651                            block_count == b,
1652                        "Trim {device_block_offset} {block_count}"
1653                    );
1654                }
1655                Ok(())
1656            }
1657        }
1658    }
1659
1660    #[fuchsia::test]
1661    async fn test_io() {
1662        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1663
1664        let expected_op = Arc::new(Mutex::new(None));
1665        let expected_op_clone = expected_op.clone();
1666
1667        let server = async {
1668            let block_server = BlockServer::new(
1669                BLOCK_SIZE,
1670                Arc::new(IoMockInterface {
1671                    return_errors: false,
1672                    do_checks: true,
1673                    expected_op: expected_op_clone,
1674                }),
1675            );
1676            block_server.handle_requests(stream).await.unwrap();
1677        };
1678
1679        let client = async move {
1680            let (session_proxy, server) = fidl::endpoints::create_proxy();
1681
1682            proxy.open_session(server).unwrap();
1683
1684            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1685            let vmo_id = session_proxy
1686                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1687                .await
1688                .unwrap()
1689                .unwrap();
1690
1691            let mut fifo =
1692                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1693            let (mut reader, mut writer) = fifo.async_io();
1694
1695            // READ
1696            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1697            writer
1698                .write_entries(&BlockFifoRequest {
1699                    command: BlockFifoCommand {
1700                        opcode: BlockOpcode::Read.into_primitive(),
1701                        ..Default::default()
1702                    },
1703                    vmoid: vmo_id.id,
1704                    dev_offset: 1,
1705                    length: 2,
1706                    vmo_offset: 3,
1707                    ..Default::default()
1708                })
1709                .await
1710                .unwrap();
1711
1712            let mut response = BlockFifoResponse::default();
1713            reader.read_entries(&mut response).await.unwrap();
1714            assert_eq!(response.status, zx::sys::ZX_OK);
1715
1716            // WRITE
1717            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1718            writer
1719                .write_entries(&BlockFifoRequest {
1720                    command: BlockFifoCommand {
1721                        opcode: BlockOpcode::Write.into_primitive(),
1722                        ..Default::default()
1723                    },
1724                    vmoid: vmo_id.id,
1725                    dev_offset: 4,
1726                    length: 5,
1727                    vmo_offset: 6,
1728                    ..Default::default()
1729                })
1730                .await
1731                .unwrap();
1732
1733            let mut response = BlockFifoResponse::default();
1734            reader.read_entries(&mut response).await.unwrap();
1735            assert_eq!(response.status, zx::sys::ZX_OK);
1736
1737            // FLUSH
1738            *expected_op.lock() = Some(ExpectedOp::Flush);
1739            writer
1740                .write_entries(&BlockFifoRequest {
1741                    command: BlockFifoCommand {
1742                        opcode: BlockOpcode::Flush.into_primitive(),
1743                        ..Default::default()
1744                    },
1745                    ..Default::default()
1746                })
1747                .await
1748                .unwrap();
1749
1750            reader.read_entries(&mut response).await.unwrap();
1751            assert_eq!(response.status, zx::sys::ZX_OK);
1752
1753            // TRIM
1754            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1755            writer
1756                .write_entries(&BlockFifoRequest {
1757                    command: BlockFifoCommand {
1758                        opcode: BlockOpcode::Trim.into_primitive(),
1759                        ..Default::default()
1760                    },
1761                    dev_offset: 7,
1762                    length: 8,
1763                    ..Default::default()
1764                })
1765                .await
1766                .unwrap();
1767
1768            reader.read_entries(&mut response).await.unwrap();
1769            assert_eq!(response.status, zx::sys::ZX_OK);
1770
1771            std::mem::drop(proxy);
1772        };
1773
1774        futures::join!(server, client);
1775    }
1776
1777    #[fuchsia::test]
1778    async fn test_io_errors() {
1779        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1780
1781        futures::join!(
1782            async {
1783                let block_server = BlockServer::new(
1784                    BLOCK_SIZE,
1785                    Arc::new(IoMockInterface {
1786                        return_errors: true,
1787                        do_checks: false,
1788                        expected_op: Arc::new(Mutex::new(None)),
1789                    }),
1790                );
1791                block_server.handle_requests(stream).await.unwrap();
1792            },
1793            async move {
1794                let (session_proxy, server) = fidl::endpoints::create_proxy();
1795
1796                proxy.open_session(server).unwrap();
1797
1798                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1799                let vmo_id = session_proxy
1800                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1801                    .await
1802                    .unwrap()
1803                    .unwrap();
1804
1805                let mut fifo =
1806                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1807                let (mut reader, mut writer) = fifo.async_io();
1808
1809                // READ
1810                writer
1811                    .write_entries(&BlockFifoRequest {
1812                        command: BlockFifoCommand {
1813                            opcode: BlockOpcode::Read.into_primitive(),
1814                            ..Default::default()
1815                        },
1816                        vmoid: vmo_id.id,
1817                        length: 1,
1818                        reqid: 1,
1819                        ..Default::default()
1820                    })
1821                    .await
1822                    .unwrap();
1823
1824                let mut response = BlockFifoResponse::default();
1825                reader.read_entries(&mut response).await.unwrap();
1826                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1827
1828                // WRITE
1829                writer
1830                    .write_entries(&BlockFifoRequest {
1831                        command: BlockFifoCommand {
1832                            opcode: BlockOpcode::Write.into_primitive(),
1833                            ..Default::default()
1834                        },
1835                        vmoid: vmo_id.id,
1836                        length: 1,
1837                        reqid: 2,
1838                        ..Default::default()
1839                    })
1840                    .await
1841                    .unwrap();
1842
1843                reader.read_entries(&mut response).await.unwrap();
1844                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1845
1846                // FLUSH
1847                writer
1848                    .write_entries(&BlockFifoRequest {
1849                        command: BlockFifoCommand {
1850                            opcode: BlockOpcode::Flush.into_primitive(),
1851                            ..Default::default()
1852                        },
1853                        reqid: 3,
1854                        ..Default::default()
1855                    })
1856                    .await
1857                    .unwrap();
1858
1859                reader.read_entries(&mut response).await.unwrap();
1860                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1861
1862                // TRIM
1863                writer
1864                    .write_entries(&BlockFifoRequest {
1865                        command: BlockFifoCommand {
1866                            opcode: BlockOpcode::Trim.into_primitive(),
1867                            ..Default::default()
1868                        },
1869                        reqid: 4,
1870                        length: 1,
1871                        ..Default::default()
1872                    })
1873                    .await
1874                    .unwrap();
1875
1876                reader.read_entries(&mut response).await.unwrap();
1877                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1878
1879                std::mem::drop(proxy);
1880            }
1881        );
1882    }
1883
1884    #[fuchsia::test]
1885    async fn test_invalid_args() {
1886        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1887
1888        futures::join!(
1889            async {
1890                let block_server = BlockServer::new(
1891                    BLOCK_SIZE,
1892                    Arc::new(IoMockInterface {
1893                        return_errors: false,
1894                        do_checks: false,
1895                        expected_op: Arc::new(Mutex::new(None)),
1896                    }),
1897                );
1898                block_server.handle_requests(stream).await.unwrap();
1899            },
1900            async move {
1901                let (session_proxy, server) = fidl::endpoints::create_proxy();
1902
1903                proxy.open_session(server).unwrap();
1904
1905                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1906                let vmo_id = session_proxy
1907                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1908                    .await
1909                    .unwrap()
1910                    .unwrap();
1911
1912                let mut fifo =
1913                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1914
1915                async fn test(
1916                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1917                    request: BlockFifoRequest,
1918                ) -> Result<(), zx::Status> {
1919                    let (mut reader, mut writer) = fifo.async_io();
1920                    writer.write_entries(&request).await.unwrap();
1921                    let mut response = BlockFifoResponse::default();
1922                    reader.read_entries(&mut response).await.unwrap();
1923                    zx::Status::ok(response.status)
1924                }
1925
1926                // READ
1927
1928                let good_read_request = || BlockFifoRequest {
1929                    command: BlockFifoCommand {
1930                        opcode: BlockOpcode::Read.into_primitive(),
1931                        ..Default::default()
1932                    },
1933                    length: 1,
1934                    vmoid: vmo_id.id,
1935                    ..Default::default()
1936                };
1937
1938                assert_eq!(
1939                    test(
1940                        &mut fifo,
1941                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1942                    )
1943                    .await,
1944                    Err(zx::Status::IO)
1945                );
1946
1947                assert_eq!(
1948                    test(
1949                        &mut fifo,
1950                        BlockFifoRequest {
1951                            vmo_offset: 0xffff_ffff_ffff_ffff,
1952                            ..good_read_request()
1953                        }
1954                    )
1955                    .await,
1956                    Err(zx::Status::OUT_OF_RANGE)
1957                );
1958
1959                assert_eq!(
1960                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1961                    Err(zx::Status::INVALID_ARGS)
1962                );
1963
1964                // WRITE
1965
1966                let good_write_request = || BlockFifoRequest {
1967                    command: BlockFifoCommand {
1968                        opcode: BlockOpcode::Write.into_primitive(),
1969                        ..Default::default()
1970                    },
1971                    length: 1,
1972                    vmoid: vmo_id.id,
1973                    ..Default::default()
1974                };
1975
1976                assert_eq!(
1977                    test(
1978                        &mut fifo,
1979                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1980                    )
1981                    .await,
1982                    Err(zx::Status::IO)
1983                );
1984
1985                assert_eq!(
1986                    test(
1987                        &mut fifo,
1988                        BlockFifoRequest {
1989                            vmo_offset: 0xffff_ffff_ffff_ffff,
1990                            ..good_write_request()
1991                        }
1992                    )
1993                    .await,
1994                    Err(zx::Status::OUT_OF_RANGE)
1995                );
1996
1997                assert_eq!(
1998                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
1999                    Err(zx::Status::INVALID_ARGS)
2000                );
2001
2002                // CLOSE VMO
2003
2004                assert_eq!(
2005                    test(
2006                        &mut fifo,
2007                        BlockFifoRequest {
2008                            command: BlockFifoCommand {
2009                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2010                                ..Default::default()
2011                            },
2012                            vmoid: vmo_id.id + 1,
2013                            ..Default::default()
2014                        }
2015                    )
2016                    .await,
2017                    Err(zx::Status::IO)
2018                );
2019
2020                std::mem::drop(proxy);
2021            }
2022        );
2023    }
2024
2025    #[fuchsia::test]
2026    async fn test_concurrent_requests() {
2027        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2028
2029        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2030        let waiting_readers_clone = waiting_readers.clone();
2031
2032        futures::join!(
2033            async move {
2034                let block_server = BlockServer::new(
2035                    BLOCK_SIZE,
2036                    Arc::new(MockInterface {
2037                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2038                            let (tx, rx) = oneshot::channel();
2039                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2040                            Box::pin(async move {
2041                                let _ = rx.await;
2042                                Ok(())
2043                            })
2044                        })),
2045                        ..MockInterface::default()
2046                    }),
2047                );
2048                block_server.handle_requests(stream).await.unwrap();
2049            },
2050            async move {
2051                let (session_proxy, server) = fidl::endpoints::create_proxy();
2052
2053                proxy.open_session(server).unwrap();
2054
2055                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2056                let vmo_id = session_proxy
2057                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2058                    .await
2059                    .unwrap()
2060                    .unwrap();
2061
2062                let mut fifo =
2063                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2064                let (mut reader, mut writer) = fifo.async_io();
2065
2066                writer
2067                    .write_entries(&BlockFifoRequest {
2068                        command: BlockFifoCommand {
2069                            opcode: BlockOpcode::Read.into_primitive(),
2070                            ..Default::default()
2071                        },
2072                        reqid: 1,
2073                        dev_offset: 1, // Intentionally use the same as `reqid`.
2074                        vmoid: vmo_id.id,
2075                        length: 1,
2076                        ..Default::default()
2077                    })
2078                    .await
2079                    .unwrap();
2080
2081                writer
2082                    .write_entries(&BlockFifoRequest {
2083                        command: BlockFifoCommand {
2084                            opcode: BlockOpcode::Read.into_primitive(),
2085                            ..Default::default()
2086                        },
2087                        reqid: 2,
2088                        dev_offset: 2,
2089                        vmoid: vmo_id.id,
2090                        length: 1,
2091                        ..Default::default()
2092                    })
2093                    .await
2094                    .unwrap();
2095
2096                // Wait till both those entries are pending.
2097                poll_fn(|cx: &mut Context<'_>| {
2098                    if waiting_readers.lock().len() == 2 {
2099                        Poll::Ready(())
2100                    } else {
2101                        // Yield to the executor.
2102                        cx.waker().wake_by_ref();
2103                        Poll::Pending
2104                    }
2105                })
2106                .await;
2107
2108                let mut response = BlockFifoResponse::default();
2109                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2110
2111                let (id, tx) = waiting_readers.lock().pop().unwrap();
2112                tx.send(()).unwrap();
2113
2114                reader.read_entries(&mut response).await.unwrap();
2115                assert_eq!(response.status, zx::sys::ZX_OK);
2116                assert_eq!(response.reqid, id);
2117
2118                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2119
2120                let (id, tx) = waiting_readers.lock().pop().unwrap();
2121                tx.send(()).unwrap();
2122
2123                reader.read_entries(&mut response).await.unwrap();
2124                assert_eq!(response.status, zx::sys::ZX_OK);
2125                assert_eq!(response.reqid, id);
2126            }
2127        );
2128    }
2129
2130    #[fuchsia::test]
2131    async fn test_groups() {
2132        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2133
2134        futures::join!(
2135            async move {
2136                let block_server = BlockServer::new(
2137                    BLOCK_SIZE,
2138                    Arc::new(MockInterface {
2139                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2140                        ..MockInterface::default()
2141                    }),
2142                );
2143                block_server.handle_requests(stream).await.unwrap();
2144            },
2145            async move {
2146                let (session_proxy, server) = fidl::endpoints::create_proxy();
2147
2148                proxy.open_session(server).unwrap();
2149
2150                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2151                let vmo_id = session_proxy
2152                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2153                    .await
2154                    .unwrap()
2155                    .unwrap();
2156
2157                let mut fifo =
2158                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2159                let (mut reader, mut writer) = fifo.async_io();
2160
2161                writer
2162                    .write_entries(&BlockFifoRequest {
2163                        command: BlockFifoCommand {
2164                            opcode: BlockOpcode::Read.into_primitive(),
2165                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2166                            ..Default::default()
2167                        },
2168                        group: 1,
2169                        vmoid: vmo_id.id,
2170                        length: 1,
2171                        ..Default::default()
2172                    })
2173                    .await
2174                    .unwrap();
2175
2176                writer
2177                    .write_entries(&BlockFifoRequest {
2178                        command: BlockFifoCommand {
2179                            opcode: BlockOpcode::Read.into_primitive(),
2180                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2181                            ..Default::default()
2182                        },
2183                        reqid: 2,
2184                        group: 1,
2185                        vmoid: vmo_id.id,
2186                        length: 1,
2187                        ..Default::default()
2188                    })
2189                    .await
2190                    .unwrap();
2191
2192                let mut response = BlockFifoResponse::default();
2193                reader.read_entries(&mut response).await.unwrap();
2194                assert_eq!(response.status, zx::sys::ZX_OK);
2195                assert_eq!(response.reqid, 2);
2196                assert_eq!(response.group, 1);
2197            }
2198        );
2199    }
2200
2201    #[fuchsia::test]
2202    async fn test_group_error() {
2203        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2204
2205        let counter = Arc::new(AtomicU64::new(0));
2206        let counter_clone = counter.clone();
2207
2208        futures::join!(
2209            async move {
2210                let block_server = BlockServer::new(
2211                    BLOCK_SIZE,
2212                    Arc::new(MockInterface {
2213                        read_hook: Some(Box::new(move |_, _, _, _| {
2214                            counter_clone.fetch_add(1, Ordering::Relaxed);
2215                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2216                        })),
2217                        ..MockInterface::default()
2218                    }),
2219                );
2220                block_server.handle_requests(stream).await.unwrap();
2221            },
2222            async move {
2223                let (session_proxy, server) = fidl::endpoints::create_proxy();
2224
2225                proxy.open_session(server).unwrap();
2226
2227                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2228                let vmo_id = session_proxy
2229                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2230                    .await
2231                    .unwrap()
2232                    .unwrap();
2233
2234                let mut fifo =
2235                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2236                let (mut reader, mut writer) = fifo.async_io();
2237
2238                writer
2239                    .write_entries(&BlockFifoRequest {
2240                        command: BlockFifoCommand {
2241                            opcode: BlockOpcode::Read.into_primitive(),
2242                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2243                            ..Default::default()
2244                        },
2245                        group: 1,
2246                        vmoid: vmo_id.id,
2247                        length: 1,
2248                        ..Default::default()
2249                    })
2250                    .await
2251                    .unwrap();
2252
2253                // Wait until processed.
2254                poll_fn(|cx: &mut Context<'_>| {
2255                    if counter.load(Ordering::Relaxed) == 1 {
2256                        Poll::Ready(())
2257                    } else {
2258                        // Yield to the executor.
2259                        cx.waker().wake_by_ref();
2260                        Poll::Pending
2261                    }
2262                })
2263                .await;
2264
2265                let mut response = BlockFifoResponse::default();
2266                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2267
2268                writer
2269                    .write_entries(&BlockFifoRequest {
2270                        command: BlockFifoCommand {
2271                            opcode: BlockOpcode::Read.into_primitive(),
2272                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2273                            ..Default::default()
2274                        },
2275                        group: 1,
2276                        vmoid: vmo_id.id,
2277                        length: 1,
2278                        ..Default::default()
2279                    })
2280                    .await
2281                    .unwrap();
2282
2283                writer
2284                    .write_entries(&BlockFifoRequest {
2285                        command: BlockFifoCommand {
2286                            opcode: BlockOpcode::Read.into_primitive(),
2287                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2288                            ..Default::default()
2289                        },
2290                        reqid: 2,
2291                        group: 1,
2292                        vmoid: vmo_id.id,
2293                        length: 1,
2294                        ..Default::default()
2295                    })
2296                    .await
2297                    .unwrap();
2298
2299                reader.read_entries(&mut response).await.unwrap();
2300                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2301                assert_eq!(response.reqid, 2);
2302                assert_eq!(response.group, 1);
2303
2304                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2305
2306                // Only the first request should have been processed.
2307                assert_eq!(counter.load(Ordering::Relaxed), 1);
2308            }
2309        );
2310    }
2311
2312    #[fuchsia::test]
2313    async fn test_group_with_two_lasts() {
2314        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2315
2316        let (tx, rx) = oneshot::channel();
2317
2318        futures::join!(
2319            async move {
2320                let rx = Mutex::new(Some(rx));
2321                let block_server = BlockServer::new(
2322                    BLOCK_SIZE,
2323                    Arc::new(MockInterface {
2324                        read_hook: Some(Box::new(move |_, _, _, _| {
2325                            let rx = rx.lock().take().unwrap();
2326                            Box::pin(async {
2327                                let _ = rx.await;
2328                                Ok(())
2329                            })
2330                        })),
2331                        ..MockInterface::default()
2332                    }),
2333                );
2334                block_server.handle_requests(stream).await.unwrap();
2335            },
2336            async move {
2337                let (session_proxy, server) = fidl::endpoints::create_proxy();
2338
2339                proxy.open_session(server).unwrap();
2340
2341                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2342                let vmo_id = session_proxy
2343                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2344                    .await
2345                    .unwrap()
2346                    .unwrap();
2347
2348                let mut fifo =
2349                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2350                let (mut reader, mut writer) = fifo.async_io();
2351
2352                writer
2353                    .write_entries(&BlockFifoRequest {
2354                        command: BlockFifoCommand {
2355                            opcode: BlockOpcode::Read.into_primitive(),
2356                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2357                            ..Default::default()
2358                        },
2359                        reqid: 1,
2360                        group: 1,
2361                        vmoid: vmo_id.id,
2362                        length: 1,
2363                        ..Default::default()
2364                    })
2365                    .await
2366                    .unwrap();
2367
2368                writer
2369                    .write_entries(&BlockFifoRequest {
2370                        command: BlockFifoCommand {
2371                            opcode: BlockOpcode::Read.into_primitive(),
2372                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2373                            ..Default::default()
2374                        },
2375                        reqid: 2,
2376                        group: 1,
2377                        vmoid: vmo_id.id,
2378                        length: 1,
2379                        ..Default::default()
2380                    })
2381                    .await
2382                    .unwrap();
2383
2384                // Send an independent request to flush through the fifo.
2385                writer
2386                    .write_entries(&BlockFifoRequest {
2387                        command: BlockFifoCommand {
2388                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2389                            ..Default::default()
2390                        },
2391                        reqid: 3,
2392                        vmoid: vmo_id.id,
2393                        ..Default::default()
2394                    })
2395                    .await
2396                    .unwrap();
2397
2398                // It should succeed.
2399                let mut response = BlockFifoResponse::default();
2400                reader.read_entries(&mut response).await.unwrap();
2401                assert_eq!(response.status, zx::sys::ZX_OK);
2402                assert_eq!(response.reqid, 3);
2403
2404                // Now release the original request.
2405                tx.send(()).unwrap();
2406
2407                // The response should be for the first message tagged as last, and it should be
2408                // an error because we sent two messages with the LAST marker.
2409                let mut response = BlockFifoResponse::default();
2410                reader.read_entries(&mut response).await.unwrap();
2411                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2412                assert_eq!(response.reqid, 1);
2413                assert_eq!(response.group, 1);
2414            }
2415        );
2416    }
2417
2418    #[fuchsia::test(allow_stalls = false)]
2419    async fn test_requests_dont_block_sessions() {
2420        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2421
2422        let (tx, rx) = oneshot::channel();
2423
2424        fasync::Task::local(async move {
2425            let rx = Mutex::new(Some(rx));
2426            let block_server = BlockServer::new(
2427                BLOCK_SIZE,
2428                Arc::new(MockInterface {
2429                    read_hook: Some(Box::new(move |_, _, _, _| {
2430                        let rx = rx.lock().take().unwrap();
2431                        Box::pin(async {
2432                            let _ = rx.await;
2433                            Ok(())
2434                        })
2435                    })),
2436                    ..MockInterface::default()
2437                }),
2438            );
2439            block_server.handle_requests(stream).await.unwrap();
2440        })
2441        .detach();
2442
2443        let mut fut = pin!(async {
2444            let (session_proxy, server) = fidl::endpoints::create_proxy();
2445
2446            proxy.open_session(server).unwrap();
2447
2448            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2449            let vmo_id = session_proxy
2450                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2451                .await
2452                .unwrap()
2453                .unwrap();
2454
2455            let mut fifo =
2456                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2457            let (mut reader, mut writer) = fifo.async_io();
2458
2459            writer
2460                .write_entries(&BlockFifoRequest {
2461                    command: BlockFifoCommand {
2462                        opcode: BlockOpcode::Read.into_primitive(),
2463                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2464                        ..Default::default()
2465                    },
2466                    reqid: 1,
2467                    group: 1,
2468                    vmoid: vmo_id.id,
2469                    length: 1,
2470                    ..Default::default()
2471                })
2472                .await
2473                .unwrap();
2474
2475            let mut response = BlockFifoResponse::default();
2476            reader.read_entries(&mut response).await.unwrap();
2477            assert_eq!(response.status, zx::sys::ZX_OK);
2478        });
2479
2480        // The response won't come back until we send on `tx`.
2481        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2482
2483        let mut fut2 = pin!(proxy.get_volume_info());
2484
2485        // get_volume_info is set up to stall forever.
2486        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2487
2488        // If we now free up the first future, it should resolve; the stalled call to
2489        // get_volume_info should not block the fifo response.
2490        let _ = tx.send(());
2491
2492        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2493    }
2494
2495    #[fuchsia::test]
2496    async fn test_request_flow_control() {
2497        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2498
2499        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2500        // server will block until that happens.
2501        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2502        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2503        let event_clone = event.clone();
2504        futures::join!(
2505            async move {
2506                let block_server = BlockServer::new(
2507                    BLOCK_SIZE,
2508                    Arc::new(MockInterface {
2509                        read_hook: Some(Box::new(move |_, _, _, _| {
2510                            let event_clone = event_clone.clone();
2511                            Box::pin(async move {
2512                                if !event_clone.1.load(Ordering::SeqCst) {
2513                                    event_clone.0.listen().await;
2514                                }
2515                                Ok(())
2516                            })
2517                        })),
2518                        ..MockInterface::default()
2519                    }),
2520                );
2521                block_server.handle_requests(stream).await.unwrap();
2522            },
2523            async move {
2524                let (session_proxy, server) = fidl::endpoints::create_proxy();
2525
2526                proxy.open_session(server).unwrap();
2527
2528                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2529                let vmo_id = session_proxy
2530                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2531                    .await
2532                    .unwrap()
2533                    .unwrap();
2534
2535                let mut fifo =
2536                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2537                let (mut reader, mut writer) = fifo.async_io();
2538
2539                for i in 0..MAX_REQUESTS {
2540                    writer
2541                        .write_entries(&BlockFifoRequest {
2542                            command: BlockFifoCommand {
2543                                opcode: BlockOpcode::Read.into_primitive(),
2544                                ..Default::default()
2545                            },
2546                            reqid: (i + 1) as u32,
2547                            dev_offset: i,
2548                            vmoid: vmo_id.id,
2549                            length: 1,
2550                            ..Default::default()
2551                        })
2552                        .await
2553                        .unwrap();
2554                }
2555                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2556                    command: BlockFifoCommand {
2557                        opcode: BlockOpcode::Read.into_primitive(),
2558                        ..Default::default()
2559                    },
2560                    reqid: u32::MAX,
2561                    dev_offset: MAX_REQUESTS,
2562                    vmoid: vmo_id.id,
2563                    length: 1,
2564                    ..Default::default()
2565                })))
2566                .is_pending());
2567                // OK, let the server start to process.
2568                event.1.store(true, Ordering::SeqCst);
2569                event.0.notify(usize::MAX);
2570                // For each entry we read, make sure we can write a new one in.
2571                let mut finished_reqids = vec![];
2572                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2573                    let mut response = BlockFifoResponse::default();
2574                    reader.read_entries(&mut response).await.unwrap();
2575                    assert_eq!(response.status, zx::sys::ZX_OK);
2576                    finished_reqids.push(response.reqid);
2577                    writer
2578                        .write_entries(&BlockFifoRequest {
2579                            command: BlockFifoCommand {
2580                                opcode: BlockOpcode::Read.into_primitive(),
2581                                ..Default::default()
2582                            },
2583                            reqid: (i + 1) as u32,
2584                            dev_offset: i,
2585                            vmoid: vmo_id.id,
2586                            length: 1,
2587                            ..Default::default()
2588                        })
2589                        .await
2590                        .unwrap();
2591                }
2592                let mut response = BlockFifoResponse::default();
2593                for _ in 0..MAX_REQUESTS {
2594                    reader.read_entries(&mut response).await.unwrap();
2595                    assert_eq!(response.status, zx::sys::ZX_OK);
2596                    finished_reqids.push(response.reqid);
2597                }
2598                // Verify that we got a response for each request.  Note that we can't assume FIFO
2599                // ordering.
2600                finished_reqids.sort();
2601                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2602                let mut i = 1;
2603                for reqid in finished_reqids {
2604                    assert_eq!(reqid, i);
2605                    i += 1;
2606                }
2607            }
2608        );
2609    }
2610
2611    #[fuchsia::test]
2612    async fn test_passthrough_io_with_fixed_map() {
2613        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2614
2615        let expected_op = Arc::new(Mutex::new(None));
2616        let expected_op_clone = expected_op.clone();
2617        futures::join!(
2618            async {
2619                let block_server = BlockServer::new(
2620                    BLOCK_SIZE,
2621                    Arc::new(IoMockInterface {
2622                        return_errors: false,
2623                        do_checks: true,
2624                        expected_op: expected_op_clone,
2625                    }),
2626                );
2627                block_server.handle_requests(stream).await.unwrap();
2628            },
2629            async move {
2630                let (session_proxy, server) = fidl::endpoints::create_proxy();
2631
2632                let mapping = fblock::BlockOffsetMapping {
2633                    source_block_offset: 0,
2634                    target_block_offset: 10,
2635                    length: 20,
2636                };
2637                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2638
2639                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2640                let vmo_id = session_proxy
2641                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2642                    .await
2643                    .unwrap()
2644                    .unwrap();
2645
2646                let mut fifo =
2647                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2648                let (mut reader, mut writer) = fifo.async_io();
2649
2650                // READ
2651                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2652                writer
2653                    .write_entries(&BlockFifoRequest {
2654                        command: BlockFifoCommand {
2655                            opcode: BlockOpcode::Read.into_primitive(),
2656                            ..Default::default()
2657                        },
2658                        vmoid: vmo_id.id,
2659                        dev_offset: 1,
2660                        length: 2,
2661                        vmo_offset: 3,
2662                        ..Default::default()
2663                    })
2664                    .await
2665                    .unwrap();
2666
2667                let mut response = BlockFifoResponse::default();
2668                reader.read_entries(&mut response).await.unwrap();
2669                assert_eq!(response.status, zx::sys::ZX_OK);
2670
2671                // WRITE
2672                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2673                writer
2674                    .write_entries(&BlockFifoRequest {
2675                        command: BlockFifoCommand {
2676                            opcode: BlockOpcode::Write.into_primitive(),
2677                            ..Default::default()
2678                        },
2679                        vmoid: vmo_id.id,
2680                        dev_offset: 4,
2681                        length: 5,
2682                        vmo_offset: 6,
2683                        ..Default::default()
2684                    })
2685                    .await
2686                    .unwrap();
2687
2688                reader.read_entries(&mut response).await.unwrap();
2689                assert_eq!(response.status, zx::sys::ZX_OK);
2690
2691                // FLUSH
2692                *expected_op.lock() = Some(ExpectedOp::Flush);
2693                writer
2694                    .write_entries(&BlockFifoRequest {
2695                        command: BlockFifoCommand {
2696                            opcode: BlockOpcode::Flush.into_primitive(),
2697                            ..Default::default()
2698                        },
2699                        ..Default::default()
2700                    })
2701                    .await
2702                    .unwrap();
2703
2704                reader.read_entries(&mut response).await.unwrap();
2705                assert_eq!(response.status, zx::sys::ZX_OK);
2706
2707                // TRIM
2708                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2709                writer
2710                    .write_entries(&BlockFifoRequest {
2711                        command: BlockFifoCommand {
2712                            opcode: BlockOpcode::Trim.into_primitive(),
2713                            ..Default::default()
2714                        },
2715                        dev_offset: 7,
2716                        length: 3,
2717                        ..Default::default()
2718                    })
2719                    .await
2720                    .unwrap();
2721
2722                reader.read_entries(&mut response).await.unwrap();
2723                assert_eq!(response.status, zx::sys::ZX_OK);
2724
2725                // READ past window
2726                *expected_op.lock() = None;
2727                writer
2728                    .write_entries(&BlockFifoRequest {
2729                        command: BlockFifoCommand {
2730                            opcode: BlockOpcode::Read.into_primitive(),
2731                            ..Default::default()
2732                        },
2733                        vmoid: vmo_id.id,
2734                        dev_offset: 19,
2735                        length: 2,
2736                        vmo_offset: 3,
2737                        ..Default::default()
2738                    })
2739                    .await
2740                    .unwrap();
2741
2742                reader.read_entries(&mut response).await.unwrap();
2743                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2744
2745                std::mem::drop(proxy);
2746            }
2747        );
2748    }
2749
2750    #[fuchsia::test]
2751    fn operation_map() {
2752        fn expect_map_result(
2753            mut operation: Operation,
2754            mapping: Option<BlockOffsetMapping>,
2755            max_blocks: Option<NonZero<u32>>,
2756            expected_operations: Vec<Operation>,
2757        ) {
2758            let mut ops = vec![];
2759            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2760                ops.push(operation);
2761                operation = remainder;
2762            }
2763            ops.push(operation);
2764            assert_eq!(ops, expected_operations);
2765        }
2766
2767        // No limits
2768        expect_map_result(
2769            Operation::Read {
2770                device_block_offset: 10,
2771                block_count: 200,
2772                _unused: 0,
2773                vmo_offset: 0,
2774            },
2775            None,
2776            None,
2777            vec![Operation::Read {
2778                device_block_offset: 10,
2779                block_count: 200,
2780                _unused: 0,
2781                vmo_offset: 0,
2782            }],
2783        );
2784
2785        // Max block count
2786        expect_map_result(
2787            Operation::Read {
2788                device_block_offset: 10,
2789                block_count: 200,
2790                _unused: 0,
2791                vmo_offset: 0,
2792            },
2793            None,
2794            NonZero::new(120),
2795            vec![
2796                Operation::Read {
2797                    device_block_offset: 10,
2798                    block_count: 120,
2799                    _unused: 0,
2800                    vmo_offset: 0,
2801                },
2802                Operation::Read {
2803                    device_block_offset: 130,
2804                    block_count: 80,
2805                    _unused: 0,
2806                    vmo_offset: 120 * 512,
2807                },
2808            ],
2809        );
2810        expect_map_result(
2811            Operation::Write {
2812                device_block_offset: 10,
2813                block_count: 200,
2814                options: WriteOptions::PRE_BARRIER,
2815                vmo_offset: 0,
2816            },
2817            None,
2818            NonZero::new(120),
2819            vec![
2820                Operation::Write {
2821                    device_block_offset: 10,
2822                    block_count: 120,
2823                    options: WriteOptions::PRE_BARRIER,
2824                    vmo_offset: 0,
2825                },
2826                Operation::Write {
2827                    device_block_offset: 130,
2828                    block_count: 80,
2829                    options: WriteOptions::empty(),
2830                    vmo_offset: 120 * 512,
2831                },
2832            ],
2833        );
2834        expect_map_result(
2835            Operation::Trim { device_block_offset: 10, block_count: 200 },
2836            None,
2837            NonZero::new(120),
2838            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2839        );
2840
2841        // Remapping + Max block count
2842        expect_map_result(
2843            Operation::Read {
2844                device_block_offset: 10,
2845                block_count: 200,
2846                _unused: 0,
2847                vmo_offset: 0,
2848            },
2849            Some(BlockOffsetMapping {
2850                source_block_offset: 10,
2851                target_block_offset: 100,
2852                length: 200,
2853            }),
2854            NonZero::new(120),
2855            vec![
2856                Operation::Read {
2857                    device_block_offset: 100,
2858                    block_count: 120,
2859                    _unused: 0,
2860                    vmo_offset: 0,
2861                },
2862                Operation::Read {
2863                    device_block_offset: 220,
2864                    block_count: 80,
2865                    _unused: 0,
2866                    vmo_offset: 120 * 512,
2867                },
2868            ],
2869        );
2870        expect_map_result(
2871            Operation::Write {
2872                device_block_offset: 10,
2873                block_count: 200,
2874                options: WriteOptions::PRE_BARRIER,
2875                vmo_offset: 0,
2876            },
2877            Some(BlockOffsetMapping {
2878                source_block_offset: 10,
2879                target_block_offset: 100,
2880                length: 200,
2881            }),
2882            NonZero::new(120),
2883            vec![
2884                Operation::Write {
2885                    device_block_offset: 100,
2886                    block_count: 120,
2887                    options: WriteOptions::PRE_BARRIER,
2888                    vmo_offset: 0,
2889                },
2890                Operation::Write {
2891                    device_block_offset: 220,
2892                    block_count: 80,
2893                    options: WriteOptions::empty(),
2894                    vmo_offset: 120 * 512,
2895                },
2896            ],
2897        );
2898        expect_map_result(
2899            Operation::Trim { device_block_offset: 10, block_count: 200 },
2900            Some(BlockOffsetMapping {
2901                source_block_offset: 10,
2902                target_block_offset: 100,
2903                length: 200,
2904            }),
2905            NonZero::new(120),
2906            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2907        );
2908    }
2909}