block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_sync::Mutex;
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use std::borrow::Cow;
11use std::collections::btree_map::Entry;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::atomic::AtomicU64;
16use std::sync::Arc;
17use zx::HandleBased;
18use {
19    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
20    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
21};
22
23pub mod async_interface;
24pub mod c_interface;
25
26pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
27
28#[derive(Clone)]
29pub enum DeviceInfo {
30    Block(BlockInfo),
31    Partition(PartitionInfo),
32}
33
34impl DeviceInfo {
35    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
36        match self {
37            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
38            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
39                max_transfer_blocks.clone()
40            }
41        }
42    }
43
44    fn max_transfer_size(&self, block_size: u32) -> u32 {
45        if let Some(max_blocks) = self.max_transfer_blocks() {
46            max_blocks.get() * block_size
47        } else {
48            MAX_TRANSFER_UNBOUNDED
49        }
50    }
51}
52
53/// Information associated with non-partition block devices.
54#[derive(Clone)]
55pub struct BlockInfo {
56    pub device_flags: fblock::Flag,
57    pub block_count: u64,
58    pub max_transfer_blocks: Option<NonZero<u32>>,
59}
60
61/// Information associated with a block device that is also a partition.
62#[derive(Clone)]
63pub struct PartitionInfo {
64    /// The device flags reported by the underlying device.
65    pub device_flags: fblock::Flag,
66    pub max_transfer_blocks: Option<NonZero<u32>>,
67    /// If `block_range` is None, the partition is a volume and may not be contiguous.
68    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
69    /// slices and use that (along with the slice and block sizes) to determine the block count.
70    pub block_range: Option<Range<u64>>,
71    pub type_guid: [u8; 16],
72    pub instance_guid: [u8; 16],
73    pub name: String,
74    pub flags: u64,
75}
76
77// Multiple Block I/O request may be sent as a group.
78// Notes:
79// - the group is identified by the group id in the request
80// - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
81//   flag is set.
82// - when processing a request of a group fails, subsequent requests of that
83//   group will not be processed.
84//
85// Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
86//
87// FifoMessageGroup keeps track of the relevant BlockFifoResponse field for
88// a group requests. Only `status` and `count` needs to be updated.
89struct FifoMessageGroup {
90    status: zx::Status,
91    count: u32,
92    req_id: Option<u32>,
93}
94
95impl FifoMessageGroup {
96    fn new() -> Self {
97        FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
98    }
99}
100
101// Keeps track of all the group requests that are currently being processed
102#[derive(Default)]
103struct FifoMessageGroups(Mutex<BTreeMap<GroupOrRequest, FifoMessageGroup>>);
104
105impl FifoMessageGroups {
106    /// Completes a request and returns a response to be sent if it's the last outstanding request
107    /// for this group.
108    fn complete(&self, group_id: GroupOrRequest, status: zx::Status) -> Option<BlockFifoResponse> {
109        let mut map = self.0.lock();
110        let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
111        let group = o.get_mut();
112        if group.count == 1 {
113            if let Some(reqid) = group.req_id {
114                let status =
115                    if group.status != zx::Status::OK { group.status } else { status }.into_raw();
116
117                o.remove();
118
119                return Some(BlockFifoResponse {
120                    status,
121                    reqid,
122                    group: group_id.group_id(),
123                    ..Default::default()
124                });
125            }
126        }
127
128        group.count = group.count.checked_sub(1).unwrap();
129        if status != zx::Status::OK && group.status == zx::Status::OK {
130            group.status = status
131        }
132        None
133    }
134}
135
136/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
137/// cbindgen:no-export
138pub struct BlockServer<SM> {
139    block_size: u32,
140    session_manager: Arc<SM>,
141}
142
143/// A single entry in `[OffsetMap]`.
144pub struct BlockOffsetMapping {
145    source_block_offset: u64,
146    target_block_offset: u64,
147    length: u64,
148}
149
150impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
151    type Error = zx::Status;
152
153    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
154        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
155        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
156        Ok(Self {
157            source_block_offset: wire.source_block_offset,
158            target_block_offset: wire.target_block_offset,
159            length: wire.length,
160        })
161    }
162}
163
164/// Remaps the offset of block requests based on an internal map, and truncates long requests.
165/// TODO(https://fxbug.dev/402515764): For now, this just supports a single entry in the map, which
166/// is all that is required for GPT partitions.  If we want to support this for FVM, we will need
167/// to support multiple entries, which requires changing the block server to support request
168/// splitting.
169pub struct OffsetMap {
170    mapping: Option<BlockOffsetMapping>,
171    max_transfer_blocks: Option<NonZero<u32>>,
172}
173
174impl OffsetMap {
175    /// An OffsetMap that remaps requests.
176    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
177        Self { mapping: Some(mapping), max_transfer_blocks }
178    }
179
180    /// An OffsetMap that just enforces maximum request sizes.
181    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
182        Self { mapping: None, max_transfer_blocks }
183    }
184
185    pub fn is_empty(&self) -> bool {
186        self.mapping.is_some()
187    }
188
189    /// Adjusts the requested range, returning the new range.
190    /// Returns None if the request would exceed the ranges known to OffsetMap.
191    pub fn adjust_request(&self, dev_offset: u64, length: u32, is_rw: bool) -> Option<(u64, u32)> {
192        // Only apply max_transfer_blocks to RW requests.
193        let length = if is_rw {
194            if let Some(max) = self.max_transfer_blocks {
195                std::cmp::min(max.get(), length)
196            } else {
197                length
198            }
199        } else {
200            length
201        };
202        let end = dev_offset.checked_add(length as u64)?;
203        if let Some(mapping) = &self.mapping {
204            if mapping.source_block_offset > dev_offset
205                || end > mapping.source_block_offset + mapping.length
206            {
207                return None;
208            }
209            let delta = dev_offset - mapping.source_block_offset;
210            Some((mapping.target_block_offset + delta, length))
211        } else {
212            Some((dev_offset, length))
213        }
214    }
215}
216
217// Methods take Arc<Self> rather than &self because of
218// https://github.com/rust-lang/rust/issues/42940.
219pub trait SessionManager: 'static {
220    fn on_attach_vmo(
221        self: Arc<Self>,
222        vmo: &Arc<zx::Vmo>,
223    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
224
225    /// Creates a new session to handle `stream`.
226    /// The returned future should run until the session completes, for example when the client end
227    /// closes.
228    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
229    fn open_session(
230        self: Arc<Self>,
231        stream: fblock::SessionRequestStream,
232        offset_map: OffsetMap,
233        block_size: u32,
234    ) -> impl Future<Output = Result<(), Error>> + Send;
235
236    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
237    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
238
239    /// Called to handle the GetVolumeInfo FIDL call.
240    fn get_volume_info(
241        &self,
242    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
243    {
244        async { Err(zx::Status::NOT_SUPPORTED) }
245    }
246
247    /// Called to handle the QuerySlices FIDL call.
248    fn query_slices(
249        &self,
250        _start_slices: &[u64],
251    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
252        async { Err(zx::Status::NOT_SUPPORTED) }
253    }
254
255    /// Called to handle the Shrink FIDL call.
256    fn extend(
257        &self,
258        _start_slice: u64,
259        _slice_count: u64,
260    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
261        async { Err(zx::Status::NOT_SUPPORTED) }
262    }
263
264    /// Called to handle the Shrink FIDL call.
265    fn shrink(
266        &self,
267        _start_slice: u64,
268        _slice_count: u64,
269    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
270        async { Err(zx::Status::NOT_SUPPORTED) }
271    }
272}
273
274pub trait IntoSessionManager {
275    type SM: SessionManager;
276
277    fn into_session_manager(self) -> Arc<Self::SM>;
278}
279
280impl<SM: SessionManager> BlockServer<SM> {
281    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
282        Self { block_size, session_manager: session_manager.into_session_manager() }
283    }
284
285    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
286    pub async fn handle_requests(
287        &self,
288        mut requests: fvolume::VolumeRequestStream,
289    ) -> Result<(), Error> {
290        let scope = fasync::Scope::new();
291        while let Some(request) = requests.try_next().await.unwrap() {
292            if let Some(session) = self.handle_request(request).await? {
293                scope.spawn(session.map(|_| ()));
294            }
295        }
296        scope.await;
297        Ok(())
298    }
299
300    /// Processes a partition request.  If a new session task is created in response to the request,
301    /// it is returned.
302    async fn handle_request(
303        &self,
304        request: fvolume::VolumeRequest,
305    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
306        match request {
307            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
308                Ok(info) => {
309                    let max_transfer_size = info.max_transfer_size(self.block_size);
310                    let (block_count, flags) = match info.as_ref() {
311                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
312                            (*block_count, *device_flags)
313                        }
314                        DeviceInfo::Partition(partition_info) => {
315                            let block_count = if let Some(range) =
316                                partition_info.block_range.as_ref()
317                            {
318                                range.end - range.start
319                            } else {
320                                let volume_info = self.session_manager.get_volume_info().await?;
321                                volume_info.0.slice_size * volume_info.1.partition_slice_count
322                                    / self.block_size as u64
323                            };
324                            (block_count, partition_info.device_flags)
325                        }
326                    };
327                    responder.send(Ok(&fblock::BlockInfo {
328                        block_count,
329                        block_size: self.block_size,
330                        max_transfer_size,
331                        flags,
332                    }))?;
333                }
334                Err(status) => responder.send(Err(status.into_raw()))?,
335            },
336            fvolume::VolumeRequest::GetStats { clear: _, responder } => {
337                // TODO(https://fxbug.dev/348077960): Implement this
338                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
339            }
340            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
341                match self.device_info().await {
342                    Ok(info) => {
343                        return Ok(Some(self.session_manager.clone().open_session(
344                            session.into_stream(),
345                            OffsetMap::empty(info.max_transfer_blocks()),
346                            self.block_size,
347                        )));
348                    }
349                    Err(status) => session.close_with_epitaph(status)?,
350                }
351            }
352            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
353                session,
354                offset_map,
355                initial_mappings,
356                control_handle: _,
357            } => match self.device_info().await {
358                Ok(info) => {
359                    if offset_map.is_some()
360                        || initial_mappings.as_ref().is_none_or(|m| m.len() != 1)
361                    {
362                        // TODO(https://fxbug.dev/402515764): Support multiple mappings and
363                        // dynamic querying for FVM as needed.  A single static map is
364                        // sufficient for GPT.
365                        session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
366                        return Ok(None);
367                    }
368                    let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into()
369                    {
370                        Ok(m) => m,
371                        Err(status) => {
372                            session.close_with_epitaph(status)?;
373                            return Ok(None);
374                        }
375                    };
376                    return Ok(Some(self.session_manager.clone().open_session(
377                        session.into_stream(),
378                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
379                        self.block_size,
380                    )));
381                }
382                Err(status) => session.close_with_epitaph(status)?,
383            },
384            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
385                Ok(info) => {
386                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
387                        let mut guid =
388                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
389                        guid.value.copy_from_slice(&partition_info.type_guid);
390                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
391                    } else {
392                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
393                    }
394                }
395                Err(status) => {
396                    responder.send(status.into_raw(), None)?;
397                }
398            },
399            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
400                match self.device_info().await {
401                    Ok(info) => {
402                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
403                            let mut guid =
404                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
405                            guid.value.copy_from_slice(&partition_info.instance_guid);
406                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
407                        } else {
408                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
409                        }
410                    }
411                    Err(status) => {
412                        responder.send(status.into_raw(), None)?;
413                    }
414                }
415            }
416            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
417                Ok(info) => {
418                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
419                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
420                    } else {
421                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
422                    }
423                }
424                Err(status) => {
425                    responder.send(status.into_raw(), None)?;
426                }
427            },
428            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
429                Ok(info) => {
430                    if let DeviceInfo::Partition(info) = info.as_ref() {
431                        let mut type_guid =
432                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
433                        type_guid.value.copy_from_slice(&info.type_guid);
434                        let mut instance_guid =
435                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
436                        instance_guid.value.copy_from_slice(&info.instance_guid);
437                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
438                            name: Some(info.name.clone()),
439                            type_guid: Some(type_guid),
440                            instance_guid: Some(instance_guid),
441                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
442                            num_blocks: info
443                                .block_range
444                                .as_ref()
445                                .map(|range| range.end - range.start),
446                            flags: Some(info.flags),
447                            ..Default::default()
448                        }))?;
449                    }
450                }
451                Err(status) => responder.send(Err(status.into_raw()))?,
452            },
453            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
454                match self.session_manager.query_slices(&start_slices).await {
455                    Ok(mut results) => {
456                        let results_len = results.len();
457                        assert!(results_len <= 16);
458                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
459                        responder.send(
460                            zx::sys::ZX_OK,
461                            &results.try_into().unwrap(),
462                            results_len as u64,
463                        )?;
464                    }
465                    Err(s) => {
466                        responder.send(
467                            s.into_raw(),
468                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
469                            0,
470                        )?;
471                    }
472                }
473            }
474            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
475                match self.session_manager.get_volume_info().await {
476                    Ok((manager_info, volume_info)) => {
477                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
478                    }
479                    Err(s) => responder.send(s.into_raw(), None, None)?,
480                }
481            }
482            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
483                responder.send(
484                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
485                        .into_raw(),
486                )?;
487            }
488            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
489                responder.send(
490                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
491                        .into_raw(),
492                )?;
493            }
494            fvolume::VolumeRequest::Destroy { responder, .. } => {
495                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
496            }
497        }
498        Ok(None)
499    }
500
501    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
502        self.session_manager.get_info().await
503    }
504}
505
506struct SessionHelper<SM: SessionManager> {
507    session_manager: Arc<SM>,
508    offset_map: OffsetMap,
509    block_size: u32,
510    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
511    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
512    message_groups: FifoMessageGroups,
513}
514
515impl<SM: SessionManager> SessionHelper<SM> {
516    fn new(
517        session_manager: Arc<SM>,
518        offset_map: OffsetMap,
519        block_size: u32,
520    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
521        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
522        Ok((
523            Self {
524                session_manager,
525                offset_map,
526                block_size,
527                peer_fifo,
528                vmos: Mutex::default(),
529                message_groups: FifoMessageGroups::default(),
530            },
531            fifo,
532        ))
533    }
534
535    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
536        match request {
537            fblock::SessionRequest::GetFifo { responder } => {
538                let rights = zx::Rights::TRANSFER
539                    | zx::Rights::READ
540                    | zx::Rights::WRITE
541                    | zx::Rights::SIGNAL
542                    | zx::Rights::WAIT;
543                match self.peer_fifo.duplicate_handle(rights) {
544                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
545                    Err(s) => responder.send(Err(s.into_raw()))?,
546                }
547                Ok(())
548            }
549            fblock::SessionRequest::AttachVmo { vmo, responder } => {
550                let vmo = Arc::new(vmo);
551                let vmo_id = {
552                    let mut vmos = self.vmos.lock();
553                    if vmos.len() == u16::MAX as usize {
554                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
555                        return Ok(());
556                    } else {
557                        let vmo_id = match vmos.last_entry() {
558                            None => 1,
559                            Some(o) => {
560                                o.key().checked_add(1).unwrap_or_else(|| {
561                                    let mut vmo_id = 1;
562                                    // Find the first gap...
563                                    for (&id, _) in &*vmos {
564                                        if id > vmo_id {
565                                            break;
566                                        }
567                                        vmo_id = id + 1;
568                                    }
569                                    vmo_id
570                                })
571                            }
572                        };
573                        vmos.insert(vmo_id, vmo.clone());
574                        vmo_id
575                    }
576                };
577                self.session_manager.clone().on_attach_vmo(&vmo).await?;
578                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
579                Ok(())
580            }
581            fblock::SessionRequest::Close { responder } => {
582                responder.send(Ok(()))?;
583                Err(anyhow!("Closed"))
584            }
585        }
586    }
587
588    fn finish_fifo_request(
589        &self,
590        request: RequestTracking,
591        status: zx::Status,
592    ) -> Option<BlockFifoResponse> {
593        match request.group_or_request {
594            group @ GroupOrRequest::Group(_) => {
595                let response = self.message_groups.complete(group, status);
596                fuchsia_trace::duration!(
597                    c"storage",
598                    c"block_server::finish_transaction_in_group",
599                    "group" => group.id(),
600                    "group_completed" => response.is_some(),
601                    "status" => status.into_raw());
602                if let Some(trace_flow_id) = request.trace_flow_id {
603                    fuchsia_trace::flow_step!(
604                        c"storage",
605                        c"block_server::finish_transaction",
606                        trace_flow_id.get().into()
607                    );
608                }
609                response
610            }
611            group @ GroupOrRequest::OneShotGroup(_) => {
612                let response = self.message_groups.complete(group, status);
613                fuchsia_trace::duration!(
614                    c"storage",
615                    c"block_server::finish_transaction_in_oneshot_group",
616                    "group" => group.id(),
617                    "group_completed" => response.is_some(),
618                    "status" => status.into_raw());
619                if let Some(trace_flow_id) = request.trace_flow_id {
620                    fuchsia_trace::flow_step!(
621                        c"storage",
622                        c"block_server::finish_transaction",
623                        trace_flow_id.get().into()
624                    );
625                }
626                response
627            }
628            GroupOrRequest::Request(reqid) => {
629                fuchsia_trace::duration!(
630                    c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
631                if let Some(trace_flow_id) = request.trace_flow_id {
632                    fuchsia_trace::flow_step!(
633                        c"storage",
634                        c"block_server::finish_transaction",
635                        trace_flow_id.get().into()
636                    );
637                }
638                Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
639            }
640        }
641    }
642
643    // Decodes `request`.  If the request is a continuation of a DecodeResult::Split, `is_split`
644    // should be set.
645    fn decode_fifo_request(
646        &self,
647        request: &mut BlockFifoRequest,
648        mut is_split: bool,
649    ) -> DecodeResult {
650        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
651        let old_length = request.length;
652        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
653            .ok_or(zx::Status::INVALID_ARGS)
654            .and_then(|code| {
655                let op = match code {
656                    BlockOpcode::Read => Operation::Read {
657                        device_block_offset: request.dev_offset,
658                        block_count: request.length,
659                        _unused: 0,
660                        vmo_offset: request
661                            .vmo_offset
662                            .checked_mul(self.block_size as u64)
663                            .ok_or(zx::Status::OUT_OF_RANGE)?,
664                    },
665                    BlockOpcode::Write => Operation::Write {
666                        device_block_offset: request.dev_offset,
667                        block_count: request.length,
668                        options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
669                            WriteOptions::FORCE_ACCESS
670                        } else {
671                            WriteOptions::empty()
672                        },
673                        vmo_offset: request
674                            .vmo_offset
675                            .checked_mul(self.block_size as u64)
676                            .ok_or(zx::Status::OUT_OF_RANGE)?,
677                    },
678                    BlockOpcode::Flush => Operation::Flush,
679                    BlockOpcode::Trim => Operation::Trim {
680                        device_block_offset: request.dev_offset,
681                        block_count: request.length,
682                    },
683                    BlockOpcode::CloseVmo => Operation::CloseVmo,
684                };
685                op.validate_and_adjust_range(&self.offset_map)
686            });
687
688        let sub_request_length =
689            if let Ok(Operation::Read { block_count, .. } | Operation::Write { block_count, .. }) =
690                &operation
691            {
692                if *block_count < old_length {
693                    Some(*block_count)
694                } else {
695                    None
696                }
697            } else {
698                None
699            };
700        is_split |= sub_request_length.is_some();
701
702        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
703            GroupOrRequest::Group(request.group)
704        } else if is_split {
705            GroupOrRequest::OneShotGroup(request.reqid)
706        } else {
707            GroupOrRequest::Request(request.reqid)
708        };
709
710        if group_or_request.is_group() {
711            let finishes_group = (!flags.contains(BlockIoFlag::GROUP_ITEM)
712                || flags.contains(BlockIoFlag::GROUP_LAST))
713                && (!is_split || sub_request_length.is_none());
714            let mut groups = self.message_groups.0.lock();
715            let group = groups.entry(group_or_request).or_insert_with(|| FifoMessageGroup::new());
716            if group.req_id.is_some() {
717                // We have already received a request tagged as last.
718                if group.status == zx::Status::OK {
719                    group.status = zx::Status::INVALID_ARGS;
720                }
721                return DecodeResult::IgnoreRequest;
722            }
723            if finishes_group {
724                group.req_id = Some(request.reqid);
725                // If the group has had an error, there is no point trying to issue this request.
726                if group.status != zx::Status::OK {
727                    operation = Err(group.status);
728                }
729            } else if group.status != zx::Status::OK {
730                // The group has already encountered an error, so there is no point trying to issue
731                // this request.
732                return DecodeResult::IgnoreRequest;
733            }
734            group.count += 1;
735            group_or_request
736        } else {
737            GroupOrRequest::Request(request.reqid)
738        };
739        let request_tracking = RequestTracking {
740            group_or_request,
741            trace_flow_id: NonZero::new(request.trace_flow_id),
742        };
743
744        let vmo = match &operation {
745            Ok(Operation::Read { .. } | Operation::Write { .. }) => self
746                .vmos
747                .lock()
748                .get(&request.vmoid)
749                .cloned()
750                .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
751            Ok(Operation::CloseVmo) => self
752                .vmos
753                .lock()
754                .remove(&request.vmoid)
755                .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
756            _ => Ok(None),
757        }
758        .unwrap_or_else(|e| {
759            operation = Err(e);
760            None
761        });
762
763        let operation = match operation {
764            Ok(operation) => operation,
765            Err(status) => return DecodeResult::InvalidRequest(request_tracking, status),
766        };
767
768        {
769            use fuchsia_trace::ArgValue;
770            static CACHE: AtomicU64 = AtomicU64::new(0);
771            if let Some(context) =
772                fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
773            {
774                let trace_args_with_group = [
775                    ArgValue::of("group", u32::from(group_or_request.id())),
776                    ArgValue::of("oneshot", group_or_request.is_oneshot()),
777                    ArgValue::of("opcode", operation.trace_label()),
778                ];
779                let trace_args = [ArgValue::of("opcode", operation.trace_label())];
780                let _scope = if group_or_request.is_group() {
781                    fuchsia_trace::duration(
782                        c"storage",
783                        c"block_server::start_transaction",
784                        &trace_args_with_group,
785                    )
786                } else {
787                    fuchsia_trace::duration(
788                        c"storage",
789                        c"block_server::start_transaction",
790                        &trace_args,
791                    )
792                };
793                let trace_flow_id = NonZero::new(request.trace_flow_id);
794                if let Some(trace_flow_id) = trace_flow_id.clone() {
795                    fuchsia_trace::flow_step(
796                        &context,
797                        c"block_server::start_transaction",
798                        trace_flow_id.get().into(),
799                        &[],
800                    );
801                }
802            }
803        }
804
805        if let Some(sub_request_length) = sub_request_length {
806            request.dev_offset += sub_request_length as u64;
807            request.vmo_offset += sub_request_length as u64;
808            request.length -= sub_request_length;
809            DecodeResult::Split(DecodedRequest { request_tracking, operation, vmo })
810        } else {
811            DecodeResult::Ok(DecodedRequest { request_tracking, operation, vmo })
812        }
813    }
814
815    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
816        std::mem::take(&mut *self.vmos.lock())
817    }
818}
819
820#[derive(Debug)]
821enum DecodeResult {
822    // The request was successfully decoded and should be submitted.
823    Ok(DecodedRequest),
824    // The request was successfully decoded and should be submitted, but a sub-request was split off
825    // because the request was too big.  The caller should call decode_fifo_request again (with the
826    // same request) until Ok is returned.
827    Split(DecodedRequest),
828    // The request was invalid, and we should send the status back to the client (unless ignored).
829    InvalidRequest(RequestTracking, zx::Status),
830    // The request shouldn't be submitted, but we should not send any status to the client. This
831    // occurs when a request is part of a group, and the request is invalid or the group already
832    // failed.  The entire group will latch an error, but we won't send that error back until the
833    // group completes.
834    IgnoreRequest,
835}
836
837#[derive(Debug)]
838struct DecodedRequest {
839    request_tracking: RequestTracking,
840    // Operation is now guaranteed to be Ok when wrapped in DecodeResult::Ok.
841    operation: Operation,
842    vmo: Option<Arc<zx::Vmo>>,
843}
844
845/// cbindgen:no-export
846pub type WriteOptions = block_protocol::WriteOptions;
847
848#[repr(C)]
849#[derive(Debug)]
850pub enum Operation {
851    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
852    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
853    // code can read `device_block_offset` from the read variant and then assume it's valid for the
854    // write variant).
855    Read {
856        device_block_offset: u64,
857        block_count: u32,
858        _unused: u32,
859        vmo_offset: u64,
860    },
861    Write {
862        device_block_offset: u64,
863        block_count: u32,
864        options: WriteOptions,
865        vmo_offset: u64,
866    },
867    Flush,
868    Trim {
869        device_block_offset: u64,
870        block_count: u32,
871    },
872    /// This will never be seen by the C interface.
873    CloseVmo,
874}
875
876impl Operation {
877    // Adjusts the operation's block range via `OffsetMap` and `maximum_transfer_size`.  If the
878    // request would exceed the range known to the map, or is otherwise invalid, returns an error.
879    // Returns the original length of the request on success, so the caller can determine if the
880    // operation was truncated (due to fragmented extents or exceeding the max transfer size).
881    fn validate_and_adjust_range(
882        mut self,
883        offset_map: &OffsetMap,
884    ) -> Result<Operation, zx::Status> {
885        let adjust_fn = |dev_offset, length, is_rw| {
886            // For compatibility with the C++ server, always ensure the length is nonzero
887            if length == 0 {
888                return Err(zx::Status::INVALID_ARGS);
889            }
890            offset_map.adjust_request(dev_offset, length, is_rw).ok_or(zx::Status::OUT_OF_RANGE)
891        };
892        match &mut self {
893            Operation::Read { device_block_offset, block_count, .. } => {
894                (*device_block_offset, *block_count) =
895                    adjust_fn(*device_block_offset, *block_count, true)?;
896            }
897            Operation::Write { device_block_offset, block_count, .. } => {
898                (*device_block_offset, *block_count) =
899                    adjust_fn(*device_block_offset, *block_count, true)?;
900            }
901            Operation::Trim { device_block_offset, block_count, .. } => {
902                (*device_block_offset, *block_count) =
903                    adjust_fn(*device_block_offset, *block_count, false)?;
904            }
905            _ => {}
906        }
907        Ok(self)
908    }
909    fn trace_label(&self) -> &'static str {
910        match self {
911            Operation::Read { .. } => "read",
912            Operation::Write { .. } => "write",
913            Operation::Flush { .. } => "flush",
914            Operation::Trim { .. } => "trim",
915            Operation::CloseVmo { .. } => "close_vmo",
916        }
917    }
918}
919
920#[derive(Clone, Copy, Debug)]
921pub struct RequestTracking {
922    group_or_request: GroupOrRequest,
923    trace_flow_id: Option<NonZero<u64>>,
924}
925
926#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
927pub enum GroupOrRequest {
928    Group(u16),
929    // A one-shot group is an ad-hoc group created to split up a large request.  The ID is the
930    // original request ID.
931    OneShotGroup(u32),
932    Request(u32),
933}
934
935impl GroupOrRequest {
936    fn is_group(&self) -> bool {
937        matches!(self, Self::Group(_) | Self::OneShotGroup(_))
938    }
939
940    fn is_oneshot(&self) -> bool {
941        matches!(self, Self::OneShotGroup(_))
942    }
943
944    // Note that OneShotGroups will not report a group_id.
945    fn group_id(&self) -> u16 {
946        match self {
947            Self::Group(id) => *id,
948            Self::Request(_) | Self::OneShotGroup(_) => 0,
949        }
950    }
951
952    fn id(&self) -> u32 {
953        match self {
954            Self::Group(id) => *id as u32,
955            Self::Request(id) => *id,
956            Self::OneShotGroup(id) => *id,
957        }
958    }
959}
960
961#[cfg(test)]
962mod tests {
963    use super::{BlockServer, DeviceInfo, PartitionInfo, FIFO_MAX_REQUESTS};
964    use assert_matches::assert_matches;
965    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
966    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
967    use fuchsia_sync::Mutex;
968    use futures::channel::oneshot;
969    use futures::future::BoxFuture;
970    use futures::FutureExt as _;
971    use std::borrow::Cow;
972    use std::future::poll_fn;
973    use std::num::NonZero;
974    use std::pin::pin;
975    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
976    use std::sync::Arc;
977    use std::task::{Context, Poll};
978    use zx::{AsHandleRef as _, HandleBased as _};
979    use {
980        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
981        fuchsia_async as fasync,
982    };
983
984    #[derive(Default)]
985    struct MockInterface {
986        read_hook: Option<
987            Box<
988                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
989                    + Send
990                    + Sync,
991            >,
992        >,
993    }
994
995    impl super::async_interface::Interface for MockInterface {
996        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
997            Ok(())
998        }
999
1000        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1001            Ok(Cow::Owned(test_device_info()))
1002        }
1003
1004        async fn read(
1005            &self,
1006            device_block_offset: u64,
1007            block_count: u32,
1008            vmo: &Arc<zx::Vmo>,
1009            vmo_offset: u64,
1010            _trace_flow_id: Option<NonZero<u64>>,
1011        ) -> Result<(), zx::Status> {
1012            if let Some(read_hook) = &self.read_hook {
1013                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1014            } else {
1015                unimplemented!();
1016            }
1017        }
1018
1019        async fn write(
1020            &self,
1021            _device_block_offset: u64,
1022            _block_count: u32,
1023            _vmo: &Arc<zx::Vmo>,
1024            _vmo_offset: u64,
1025            _opts: WriteOptions,
1026            _trace_flow_id: Option<NonZero<u64>>,
1027        ) -> Result<(), zx::Status> {
1028            unreachable!();
1029        }
1030
1031        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1032            unreachable!();
1033        }
1034
1035        async fn trim(
1036            &self,
1037            _device_block_offset: u64,
1038            _block_count: u32,
1039            _trace_flow_id: Option<NonZero<u64>>,
1040        ) -> Result<(), zx::Status> {
1041            unreachable!();
1042        }
1043
1044        async fn get_volume_info(
1045            &self,
1046        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1047            // Hang forever for the test_requests_dont_block_sessions test.
1048            let () = std::future::pending().await;
1049            unreachable!();
1050        }
1051    }
1052
1053    const BLOCK_SIZE: u32 = 512;
1054
1055    fn test_device_info() -> DeviceInfo {
1056        DeviceInfo::Partition(PartitionInfo {
1057            device_flags: fblock::Flag::READONLY,
1058            max_transfer_blocks: None,
1059            block_range: Some(12..34),
1060            type_guid: [1; 16],
1061            instance_guid: [2; 16],
1062            name: "foo".to_string(),
1063            flags: 0xabcd,
1064        })
1065    }
1066
1067    #[fuchsia::test]
1068    async fn test_info() {
1069        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1070
1071        futures::join!(
1072            async {
1073                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1074                block_server.handle_requests(stream).await.unwrap();
1075            },
1076            async {
1077                let expected_info = test_device_info();
1078                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1079                    info
1080                } else {
1081                    unreachable!()
1082                };
1083
1084                let block_info = proxy.get_info().await.unwrap().unwrap();
1085                assert_eq!(
1086                    block_info.block_count,
1087                    partition_info.block_range.as_ref().unwrap().end
1088                        - partition_info.block_range.as_ref().unwrap().start
1089                );
1090                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1091
1092                // TODO(https://fxbug.dev/348077960): Check max_transfer_size
1093
1094                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1095                assert_eq!(status, zx::sys::ZX_OK);
1096                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1097
1098                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1099                assert_eq!(status, zx::sys::ZX_OK);
1100                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1101
1102                let (status, name) = proxy.get_name().await.unwrap();
1103                assert_eq!(status, zx::sys::ZX_OK);
1104                assert_eq!(name.as_ref(), Some(&partition_info.name));
1105
1106                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1107                assert_eq!(metadata.name, name);
1108                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1109                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1110                assert_eq!(
1111                    metadata.start_block_offset,
1112                    Some(partition_info.block_range.as_ref().unwrap().start)
1113                );
1114                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1115                assert_eq!(metadata.flags, Some(partition_info.flags));
1116
1117                std::mem::drop(proxy);
1118            }
1119        );
1120    }
1121
1122    #[fuchsia::test]
1123    async fn test_attach_vmo() {
1124        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1125
1126        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1127        let koid = vmo.get_koid().unwrap();
1128
1129        futures::join!(
1130            async {
1131                let block_server = BlockServer::new(
1132                    BLOCK_SIZE,
1133                    Arc::new(MockInterface {
1134                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1135                            assert_eq!(vmo.get_koid().unwrap(), koid);
1136                            Box::pin(async { Ok(()) })
1137                        })),
1138                        ..MockInterface::default()
1139                    }),
1140                );
1141                block_server.handle_requests(stream).await.unwrap();
1142            },
1143            async move {
1144                let (session_proxy, server) = fidl::endpoints::create_proxy();
1145
1146                proxy.open_session(server).unwrap();
1147
1148                let vmo_id = session_proxy
1149                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1150                    .await
1151                    .unwrap()
1152                    .unwrap();
1153                assert_ne!(vmo_id.id, 0);
1154
1155                let mut fifo =
1156                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1157                let (mut reader, mut writer) = fifo.async_io();
1158
1159                // Keep attaching VMOs until we eventually hit the maximum.
1160                let mut count = 1;
1161                loop {
1162                    match session_proxy
1163                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1164                        .await
1165                        .unwrap()
1166                    {
1167                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1168                        Err(e) => {
1169                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1170                            break;
1171                        }
1172                    }
1173
1174                    // Only test every 10 to keep test time down.
1175                    if count % 10 == 0 {
1176                        writer
1177                            .write_entries(&BlockFifoRequest {
1178                                command: BlockFifoCommand {
1179                                    opcode: BlockOpcode::Read.into_primitive(),
1180                                    ..Default::default()
1181                                },
1182                                vmoid: vmo_id.id,
1183                                length: 1,
1184                                ..Default::default()
1185                            })
1186                            .await
1187                            .unwrap();
1188
1189                        let mut response = BlockFifoResponse::default();
1190                        reader.read_entries(&mut response).await.unwrap();
1191                        assert_eq!(response.status, zx::sys::ZX_OK);
1192                    }
1193
1194                    count += 1;
1195                }
1196
1197                assert_eq!(count, u16::MAX as u64);
1198
1199                // Detach the original VMO, and make sure we can then attach another one.
1200                writer
1201                    .write_entries(&BlockFifoRequest {
1202                        command: BlockFifoCommand {
1203                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1204                            ..Default::default()
1205                        },
1206                        vmoid: vmo_id.id,
1207                        ..Default::default()
1208                    })
1209                    .await
1210                    .unwrap();
1211
1212                let mut response = BlockFifoResponse::default();
1213                reader.read_entries(&mut response).await.unwrap();
1214                assert_eq!(response.status, zx::sys::ZX_OK);
1215
1216                let new_vmo_id = session_proxy
1217                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1218                    .await
1219                    .unwrap()
1220                    .unwrap();
1221                // It should reuse the same ID.
1222                assert_eq!(new_vmo_id.id, vmo_id.id);
1223
1224                std::mem::drop(proxy);
1225            }
1226        );
1227    }
1228
1229    #[fuchsia::test]
1230    async fn test_close() {
1231        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1232
1233        let mut server = std::pin::pin!(async {
1234            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1235            block_server.handle_requests(stream).await.unwrap();
1236        }
1237        .fuse());
1238
1239        let mut client = std::pin::pin!(async {
1240            let (session_proxy, server) = fidl::endpoints::create_proxy();
1241
1242            proxy.open_session(server).unwrap();
1243
1244            // Dropping the proxy should not cause the session to terminate because the session is
1245            // still live.
1246            std::mem::drop(proxy);
1247
1248            session_proxy.close().await.unwrap().unwrap();
1249
1250            // Keep the session alive.  Calling `close` should cause the server to terminate.
1251            let _: () = std::future::pending().await;
1252        }
1253        .fuse());
1254
1255        futures::select!(
1256            _ = server => {}
1257            _ = client => unreachable!(),
1258        );
1259    }
1260
1261    #[derive(Default)]
1262    struct IoMockInterface {
1263        do_checks: bool,
1264        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1265        return_errors: bool,
1266    }
1267
1268    #[derive(Debug)]
1269    enum ExpectedOp {
1270        Read(u64, u32, u64),
1271        Write(u64, u32, u64),
1272        Trim(u64, u32),
1273        Flush,
1274    }
1275
1276    impl super::async_interface::Interface for IoMockInterface {
1277        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1278            Ok(())
1279        }
1280
1281        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1282            Ok(Cow::Owned(test_device_info()))
1283        }
1284
1285        async fn read(
1286            &self,
1287            device_block_offset: u64,
1288            block_count: u32,
1289            _vmo: &Arc<zx::Vmo>,
1290            vmo_offset: u64,
1291            _trace_flow_id: Option<NonZero<u64>>,
1292        ) -> Result<(), zx::Status> {
1293            if self.return_errors {
1294                Err(zx::Status::INTERNAL)
1295            } else {
1296                if self.do_checks {
1297                    assert_matches!(
1298                        self.expected_op.lock().take(),
1299                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1300                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1301                        "Read {device_block_offset} {block_count} {vmo_offset}"
1302                    );
1303                }
1304                Ok(())
1305            }
1306        }
1307
1308        async fn write(
1309            &self,
1310            device_block_offset: u64,
1311            block_count: u32,
1312            _vmo: &Arc<zx::Vmo>,
1313            vmo_offset: u64,
1314            _opts: WriteOptions,
1315            _trace_flow_id: Option<NonZero<u64>>,
1316        ) -> Result<(), zx::Status> {
1317            if self.return_errors {
1318                Err(zx::Status::NOT_SUPPORTED)
1319            } else {
1320                if self.do_checks {
1321                    assert_matches!(
1322                        self.expected_op.lock().take(),
1323                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1324                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1325                        "Write {device_block_offset} {block_count} {vmo_offset}"
1326                    );
1327                }
1328                Ok(())
1329            }
1330        }
1331
1332        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1333            if self.return_errors {
1334                Err(zx::Status::NO_RESOURCES)
1335            } else {
1336                if self.do_checks {
1337                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1338                }
1339                Ok(())
1340            }
1341        }
1342
1343        async fn trim(
1344            &self,
1345            device_block_offset: u64,
1346            block_count: u32,
1347            _trace_flow_id: Option<NonZero<u64>>,
1348        ) -> Result<(), zx::Status> {
1349            if self.return_errors {
1350                Err(zx::Status::NO_MEMORY)
1351            } else {
1352                if self.do_checks {
1353                    assert_matches!(
1354                        self.expected_op.lock().take(),
1355                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1356                            block_count == b,
1357                        "Trim {device_block_offset} {block_count}"
1358                    );
1359                }
1360                Ok(())
1361            }
1362        }
1363    }
1364
1365    #[fuchsia::test]
1366    async fn test_io() {
1367        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1368
1369        let expected_op = Arc::new(Mutex::new(None));
1370        let expected_op_clone = expected_op.clone();
1371
1372        let server = async {
1373            let block_server = BlockServer::new(
1374                BLOCK_SIZE,
1375                Arc::new(IoMockInterface {
1376                    return_errors: false,
1377                    do_checks: true,
1378                    expected_op: expected_op_clone,
1379                }),
1380            );
1381            block_server.handle_requests(stream).await.unwrap();
1382        };
1383
1384        let client = async move {
1385            let (session_proxy, server) = fidl::endpoints::create_proxy();
1386
1387            proxy.open_session(server).unwrap();
1388
1389            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1390            let vmo_id = session_proxy
1391                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1392                .await
1393                .unwrap()
1394                .unwrap();
1395
1396            let mut fifo =
1397                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1398            let (mut reader, mut writer) = fifo.async_io();
1399
1400            // READ
1401            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1402            writer
1403                .write_entries(&BlockFifoRequest {
1404                    command: BlockFifoCommand {
1405                        opcode: BlockOpcode::Read.into_primitive(),
1406                        ..Default::default()
1407                    },
1408                    vmoid: vmo_id.id,
1409                    dev_offset: 1,
1410                    length: 2,
1411                    vmo_offset: 3,
1412                    ..Default::default()
1413                })
1414                .await
1415                .unwrap();
1416
1417            let mut response = BlockFifoResponse::default();
1418            reader.read_entries(&mut response).await.unwrap();
1419            assert_eq!(response.status, zx::sys::ZX_OK);
1420
1421            // WRITE
1422            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1423            writer
1424                .write_entries(&BlockFifoRequest {
1425                    command: BlockFifoCommand {
1426                        opcode: BlockOpcode::Write.into_primitive(),
1427                        ..Default::default()
1428                    },
1429                    vmoid: vmo_id.id,
1430                    dev_offset: 4,
1431                    length: 5,
1432                    vmo_offset: 6,
1433                    ..Default::default()
1434                })
1435                .await
1436                .unwrap();
1437
1438            let mut response = BlockFifoResponse::default();
1439            reader.read_entries(&mut response).await.unwrap();
1440            assert_eq!(response.status, zx::sys::ZX_OK);
1441
1442            // FLUSH
1443            *expected_op.lock() = Some(ExpectedOp::Flush);
1444            writer
1445                .write_entries(&BlockFifoRequest {
1446                    command: BlockFifoCommand {
1447                        opcode: BlockOpcode::Flush.into_primitive(),
1448                        ..Default::default()
1449                    },
1450                    ..Default::default()
1451                })
1452                .await
1453                .unwrap();
1454
1455            reader.read_entries(&mut response).await.unwrap();
1456            assert_eq!(response.status, zx::sys::ZX_OK);
1457
1458            // TRIM
1459            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1460            writer
1461                .write_entries(&BlockFifoRequest {
1462                    command: BlockFifoCommand {
1463                        opcode: BlockOpcode::Trim.into_primitive(),
1464                        ..Default::default()
1465                    },
1466                    dev_offset: 7,
1467                    length: 8,
1468                    ..Default::default()
1469                })
1470                .await
1471                .unwrap();
1472
1473            reader.read_entries(&mut response).await.unwrap();
1474            assert_eq!(response.status, zx::sys::ZX_OK);
1475
1476            std::mem::drop(proxy);
1477        };
1478
1479        futures::join!(server, client);
1480    }
1481
1482    #[fuchsia::test]
1483    async fn test_io_errors() {
1484        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1485
1486        futures::join!(
1487            async {
1488                let block_server = BlockServer::new(
1489                    BLOCK_SIZE,
1490                    Arc::new(IoMockInterface {
1491                        return_errors: true,
1492                        do_checks: false,
1493                        expected_op: Arc::new(Mutex::new(None)),
1494                    }),
1495                );
1496                block_server.handle_requests(stream).await.unwrap();
1497            },
1498            async move {
1499                let (session_proxy, server) = fidl::endpoints::create_proxy();
1500
1501                proxy.open_session(server).unwrap();
1502
1503                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1504                let vmo_id = session_proxy
1505                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1506                    .await
1507                    .unwrap()
1508                    .unwrap();
1509
1510                let mut fifo =
1511                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1512                let (mut reader, mut writer) = fifo.async_io();
1513
1514                // READ
1515                writer
1516                    .write_entries(&BlockFifoRequest {
1517                        command: BlockFifoCommand {
1518                            opcode: BlockOpcode::Read.into_primitive(),
1519                            ..Default::default()
1520                        },
1521                        vmoid: vmo_id.id,
1522                        length: 1,
1523                        reqid: 1,
1524                        ..Default::default()
1525                    })
1526                    .await
1527                    .unwrap();
1528
1529                let mut response = BlockFifoResponse::default();
1530                reader.read_entries(&mut response).await.unwrap();
1531                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1532
1533                // WRITE
1534                writer
1535                    .write_entries(&BlockFifoRequest {
1536                        command: BlockFifoCommand {
1537                            opcode: BlockOpcode::Write.into_primitive(),
1538                            ..Default::default()
1539                        },
1540                        vmoid: vmo_id.id,
1541                        length: 1,
1542                        reqid: 2,
1543                        ..Default::default()
1544                    })
1545                    .await
1546                    .unwrap();
1547
1548                reader.read_entries(&mut response).await.unwrap();
1549                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1550
1551                // FLUSH
1552                writer
1553                    .write_entries(&BlockFifoRequest {
1554                        command: BlockFifoCommand {
1555                            opcode: BlockOpcode::Flush.into_primitive(),
1556                            ..Default::default()
1557                        },
1558                        reqid: 3,
1559                        ..Default::default()
1560                    })
1561                    .await
1562                    .unwrap();
1563
1564                reader.read_entries(&mut response).await.unwrap();
1565                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1566
1567                // TRIM
1568                writer
1569                    .write_entries(&BlockFifoRequest {
1570                        command: BlockFifoCommand {
1571                            opcode: BlockOpcode::Trim.into_primitive(),
1572                            ..Default::default()
1573                        },
1574                        reqid: 4,
1575                        length: 1,
1576                        ..Default::default()
1577                    })
1578                    .await
1579                    .unwrap();
1580
1581                reader.read_entries(&mut response).await.unwrap();
1582                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1583
1584                std::mem::drop(proxy);
1585            }
1586        );
1587    }
1588
1589    #[fuchsia::test]
1590    async fn test_invalid_args() {
1591        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1592
1593        futures::join!(
1594            async {
1595                let block_server = BlockServer::new(
1596                    BLOCK_SIZE,
1597                    Arc::new(IoMockInterface {
1598                        return_errors: false,
1599                        do_checks: false,
1600                        expected_op: Arc::new(Mutex::new(None)),
1601                    }),
1602                );
1603                block_server.handle_requests(stream).await.unwrap();
1604            },
1605            async move {
1606                let (session_proxy, server) = fidl::endpoints::create_proxy();
1607
1608                proxy.open_session(server).unwrap();
1609
1610                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1611                let vmo_id = session_proxy
1612                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1613                    .await
1614                    .unwrap()
1615                    .unwrap();
1616
1617                let mut fifo =
1618                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1619
1620                async fn test(
1621                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1622                    request: BlockFifoRequest,
1623                ) -> Result<(), zx::Status> {
1624                    let (mut reader, mut writer) = fifo.async_io();
1625                    writer.write_entries(&request).await.unwrap();
1626                    let mut response = BlockFifoResponse::default();
1627                    reader.read_entries(&mut response).await.unwrap();
1628                    zx::Status::ok(response.status)
1629                }
1630
1631                // READ
1632
1633                let good_read_request = || BlockFifoRequest {
1634                    command: BlockFifoCommand {
1635                        opcode: BlockOpcode::Read.into_primitive(),
1636                        ..Default::default()
1637                    },
1638                    vmoid: vmo_id.id,
1639                    ..Default::default()
1640                };
1641
1642                assert_eq!(
1643                    test(
1644                        &mut fifo,
1645                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1646                    )
1647                    .await,
1648                    Err(zx::Status::INVALID_ARGS)
1649                );
1650
1651                assert_eq!(
1652                    test(
1653                        &mut fifo,
1654                        BlockFifoRequest {
1655                            vmo_offset: 0xffff_ffff_ffff_ffff,
1656                            ..good_read_request()
1657                        }
1658                    )
1659                    .await,
1660                    Err(zx::Status::OUT_OF_RANGE)
1661                );
1662
1663                // WRITE
1664
1665                let good_write_request = || BlockFifoRequest {
1666                    command: BlockFifoCommand {
1667                        opcode: BlockOpcode::Write.into_primitive(),
1668                        ..Default::default()
1669                    },
1670                    vmoid: vmo_id.id,
1671                    ..Default::default()
1672                };
1673
1674                assert_eq!(
1675                    test(
1676                        &mut fifo,
1677                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1678                    )
1679                    .await,
1680                    Err(zx::Status::INVALID_ARGS)
1681                );
1682
1683                assert_eq!(
1684                    test(
1685                        &mut fifo,
1686                        BlockFifoRequest {
1687                            vmo_offset: 0xffff_ffff_ffff_ffff,
1688                            ..good_write_request()
1689                        }
1690                    )
1691                    .await,
1692                    Err(zx::Status::OUT_OF_RANGE)
1693                );
1694
1695                // CLOSE VMO
1696
1697                assert_eq!(
1698                    test(
1699                        &mut fifo,
1700                        BlockFifoRequest {
1701                            command: BlockFifoCommand {
1702                                opcode: BlockOpcode::CloseVmo.into_primitive(),
1703                                ..Default::default()
1704                            },
1705                            vmoid: vmo_id.id + 1,
1706                            ..Default::default()
1707                        }
1708                    )
1709                    .await,
1710                    Err(zx::Status::IO)
1711                );
1712
1713                std::mem::drop(proxy);
1714            }
1715        );
1716    }
1717
1718    #[fuchsia::test]
1719    async fn test_concurrent_requests() {
1720        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1721
1722        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1723        let waiting_readers_clone = waiting_readers.clone();
1724
1725        futures::join!(
1726            async move {
1727                let block_server = BlockServer::new(
1728                    BLOCK_SIZE,
1729                    Arc::new(MockInterface {
1730                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1731                            let (tx, rx) = oneshot::channel();
1732                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
1733                            Box::pin(async move {
1734                                let _ = rx.await;
1735                                Ok(())
1736                            })
1737                        })),
1738                    }),
1739                );
1740                block_server.handle_requests(stream).await.unwrap();
1741            },
1742            async move {
1743                let (session_proxy, server) = fidl::endpoints::create_proxy();
1744
1745                proxy.open_session(server).unwrap();
1746
1747                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1748                let vmo_id = session_proxy
1749                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1750                    .await
1751                    .unwrap()
1752                    .unwrap();
1753
1754                let mut fifo =
1755                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1756                let (mut reader, mut writer) = fifo.async_io();
1757
1758                writer
1759                    .write_entries(&BlockFifoRequest {
1760                        command: BlockFifoCommand {
1761                            opcode: BlockOpcode::Read.into_primitive(),
1762                            ..Default::default()
1763                        },
1764                        reqid: 1,
1765                        dev_offset: 1, // Intentionally use the same as `reqid`.
1766                        vmoid: vmo_id.id,
1767                        length: 1,
1768                        ..Default::default()
1769                    })
1770                    .await
1771                    .unwrap();
1772
1773                writer
1774                    .write_entries(&BlockFifoRequest {
1775                        command: BlockFifoCommand {
1776                            opcode: BlockOpcode::Read.into_primitive(),
1777                            ..Default::default()
1778                        },
1779                        reqid: 2,
1780                        dev_offset: 2,
1781                        vmoid: vmo_id.id,
1782                        length: 1,
1783                        ..Default::default()
1784                    })
1785                    .await
1786                    .unwrap();
1787
1788                // Wait till both those entries are pending.
1789                poll_fn(|cx: &mut Context<'_>| {
1790                    if waiting_readers.lock().len() == 2 {
1791                        Poll::Ready(())
1792                    } else {
1793                        // Yield to the executor.
1794                        cx.waker().wake_by_ref();
1795                        Poll::Pending
1796                    }
1797                })
1798                .await;
1799
1800                let mut response = BlockFifoResponse::default();
1801                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1802
1803                let (id, tx) = waiting_readers.lock().pop().unwrap();
1804                tx.send(()).unwrap();
1805
1806                reader.read_entries(&mut response).await.unwrap();
1807                assert_eq!(response.status, zx::sys::ZX_OK);
1808                assert_eq!(response.reqid, id);
1809
1810                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1811
1812                let (id, tx) = waiting_readers.lock().pop().unwrap();
1813                tx.send(()).unwrap();
1814
1815                reader.read_entries(&mut response).await.unwrap();
1816                assert_eq!(response.status, zx::sys::ZX_OK);
1817                assert_eq!(response.reqid, id);
1818            }
1819        );
1820    }
1821
1822    #[fuchsia::test]
1823    async fn test_groups() {
1824        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1825
1826        futures::join!(
1827            async move {
1828                let block_server = BlockServer::new(
1829                    BLOCK_SIZE,
1830                    Arc::new(MockInterface {
1831                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1832                    }),
1833                );
1834                block_server.handle_requests(stream).await.unwrap();
1835            },
1836            async move {
1837                let (session_proxy, server) = fidl::endpoints::create_proxy();
1838
1839                proxy.open_session(server).unwrap();
1840
1841                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1842                let vmo_id = session_proxy
1843                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1844                    .await
1845                    .unwrap()
1846                    .unwrap();
1847
1848                let mut fifo =
1849                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1850                let (mut reader, mut writer) = fifo.async_io();
1851
1852                writer
1853                    .write_entries(&BlockFifoRequest {
1854                        command: BlockFifoCommand {
1855                            opcode: BlockOpcode::Read.into_primitive(),
1856                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1857                            ..Default::default()
1858                        },
1859                        group: 1,
1860                        vmoid: vmo_id.id,
1861                        length: 1,
1862                        ..Default::default()
1863                    })
1864                    .await
1865                    .unwrap();
1866
1867                writer
1868                    .write_entries(&BlockFifoRequest {
1869                        command: BlockFifoCommand {
1870                            opcode: BlockOpcode::Read.into_primitive(),
1871                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1872                            ..Default::default()
1873                        },
1874                        reqid: 2,
1875                        group: 1,
1876                        vmoid: vmo_id.id,
1877                        length: 1,
1878                        ..Default::default()
1879                    })
1880                    .await
1881                    .unwrap();
1882
1883                let mut response = BlockFifoResponse::default();
1884                reader.read_entries(&mut response).await.unwrap();
1885                assert_eq!(response.status, zx::sys::ZX_OK);
1886                assert_eq!(response.reqid, 2);
1887                assert_eq!(response.group, 1);
1888            }
1889        );
1890    }
1891
1892    #[fuchsia::test]
1893    async fn test_group_error() {
1894        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1895
1896        let counter = Arc::new(AtomicU64::new(0));
1897        let counter_clone = counter.clone();
1898
1899        futures::join!(
1900            async move {
1901                let block_server = BlockServer::new(
1902                    BLOCK_SIZE,
1903                    Arc::new(MockInterface {
1904                        read_hook: Some(Box::new(move |_, _, _, _| {
1905                            counter_clone.fetch_add(1, Ordering::Relaxed);
1906                            Box::pin(async { Err(zx::Status::BAD_STATE) })
1907                        })),
1908                    }),
1909                );
1910                block_server.handle_requests(stream).await.unwrap();
1911            },
1912            async move {
1913                let (session_proxy, server) = fidl::endpoints::create_proxy();
1914
1915                proxy.open_session(server).unwrap();
1916
1917                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1918                let vmo_id = session_proxy
1919                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1920                    .await
1921                    .unwrap()
1922                    .unwrap();
1923
1924                let mut fifo =
1925                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1926                let (mut reader, mut writer) = fifo.async_io();
1927
1928                writer
1929                    .write_entries(&BlockFifoRequest {
1930                        command: BlockFifoCommand {
1931                            opcode: BlockOpcode::Read.into_primitive(),
1932                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1933                            ..Default::default()
1934                        },
1935                        group: 1,
1936                        vmoid: vmo_id.id,
1937                        length: 1,
1938                        ..Default::default()
1939                    })
1940                    .await
1941                    .unwrap();
1942
1943                // Wait until processed.
1944                poll_fn(|cx: &mut Context<'_>| {
1945                    if counter.load(Ordering::Relaxed) == 1 {
1946                        Poll::Ready(())
1947                    } else {
1948                        // Yield to the executor.
1949                        cx.waker().wake_by_ref();
1950                        Poll::Pending
1951                    }
1952                })
1953                .await;
1954
1955                let mut response = BlockFifoResponse::default();
1956                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1957
1958                writer
1959                    .write_entries(&BlockFifoRequest {
1960                        command: BlockFifoCommand {
1961                            opcode: BlockOpcode::Read.into_primitive(),
1962                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1963                            ..Default::default()
1964                        },
1965                        group: 1,
1966                        vmoid: vmo_id.id,
1967                        length: 1,
1968                        ..Default::default()
1969                    })
1970                    .await
1971                    .unwrap();
1972
1973                writer
1974                    .write_entries(&BlockFifoRequest {
1975                        command: BlockFifoCommand {
1976                            opcode: BlockOpcode::Read.into_primitive(),
1977                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1978                            ..Default::default()
1979                        },
1980                        reqid: 2,
1981                        group: 1,
1982                        vmoid: vmo_id.id,
1983                        length: 1,
1984                        ..Default::default()
1985                    })
1986                    .await
1987                    .unwrap();
1988
1989                reader.read_entries(&mut response).await.unwrap();
1990                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1991                assert_eq!(response.reqid, 2);
1992                assert_eq!(response.group, 1);
1993
1994                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1995
1996                // Only the first request should have been processed.
1997                assert_eq!(counter.load(Ordering::Relaxed), 1);
1998            }
1999        );
2000    }
2001
2002    #[fuchsia::test]
2003    async fn test_group_with_two_lasts() {
2004        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2005
2006        let (tx, rx) = oneshot::channel();
2007
2008        futures::join!(
2009            async move {
2010                let rx = Mutex::new(Some(rx));
2011                let block_server = BlockServer::new(
2012                    BLOCK_SIZE,
2013                    Arc::new(MockInterface {
2014                        read_hook: Some(Box::new(move |_, _, _, _| {
2015                            let rx = rx.lock().take().unwrap();
2016                            Box::pin(async {
2017                                let _ = rx.await;
2018                                Ok(())
2019                            })
2020                        })),
2021                    }),
2022                );
2023                block_server.handle_requests(stream).await.unwrap();
2024            },
2025            async move {
2026                let (session_proxy, server) = fidl::endpoints::create_proxy();
2027
2028                proxy.open_session(server).unwrap();
2029
2030                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2031                let vmo_id = session_proxy
2032                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2033                    .await
2034                    .unwrap()
2035                    .unwrap();
2036
2037                let mut fifo =
2038                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2039                let (mut reader, mut writer) = fifo.async_io();
2040
2041                writer
2042                    .write_entries(&BlockFifoRequest {
2043                        command: BlockFifoCommand {
2044                            opcode: BlockOpcode::Read.into_primitive(),
2045                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2046                            ..Default::default()
2047                        },
2048                        reqid: 1,
2049                        group: 1,
2050                        vmoid: vmo_id.id,
2051                        length: 1,
2052                        ..Default::default()
2053                    })
2054                    .await
2055                    .unwrap();
2056
2057                writer
2058                    .write_entries(&BlockFifoRequest {
2059                        command: BlockFifoCommand {
2060                            opcode: BlockOpcode::Read.into_primitive(),
2061                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2062                            ..Default::default()
2063                        },
2064                        reqid: 2,
2065                        group: 1,
2066                        vmoid: vmo_id.id,
2067                        length: 1,
2068                        ..Default::default()
2069                    })
2070                    .await
2071                    .unwrap();
2072
2073                // Send an independent request to flush through the fifo.
2074                writer
2075                    .write_entries(&BlockFifoRequest {
2076                        command: BlockFifoCommand {
2077                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2078                            ..Default::default()
2079                        },
2080                        reqid: 3,
2081                        vmoid: vmo_id.id,
2082                        ..Default::default()
2083                    })
2084                    .await
2085                    .unwrap();
2086
2087                // It should succeed.
2088                let mut response = BlockFifoResponse::default();
2089                reader.read_entries(&mut response).await.unwrap();
2090                assert_eq!(response.status, zx::sys::ZX_OK);
2091                assert_eq!(response.reqid, 3);
2092
2093                // Now release the original request.
2094                tx.send(()).unwrap();
2095
2096                // The response should be for the first message tagged as last, and it should be
2097                // an error because we sent two messages with the LAST marker.
2098                let mut response = BlockFifoResponse::default();
2099                reader.read_entries(&mut response).await.unwrap();
2100                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2101                assert_eq!(response.reqid, 1);
2102                assert_eq!(response.group, 1);
2103            }
2104        );
2105    }
2106
2107    #[fuchsia::test(allow_stalls = false)]
2108    async fn test_requests_dont_block_sessions() {
2109        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2110
2111        let (tx, rx) = oneshot::channel();
2112
2113        fasync::Task::local(async move {
2114            let rx = Mutex::new(Some(rx));
2115            let block_server = BlockServer::new(
2116                BLOCK_SIZE,
2117                Arc::new(MockInterface {
2118                    read_hook: Some(Box::new(move |_, _, _, _| {
2119                        let rx = rx.lock().take().unwrap();
2120                        Box::pin(async {
2121                            let _ = rx.await;
2122                            Ok(())
2123                        })
2124                    })),
2125                }),
2126            );
2127            block_server.handle_requests(stream).await.unwrap();
2128        })
2129        .detach();
2130
2131        let mut fut = pin!(async {
2132            let (session_proxy, server) = fidl::endpoints::create_proxy();
2133
2134            proxy.open_session(server).unwrap();
2135
2136            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2137            let vmo_id = session_proxy
2138                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2139                .await
2140                .unwrap()
2141                .unwrap();
2142
2143            let mut fifo =
2144                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2145            let (mut reader, mut writer) = fifo.async_io();
2146
2147            writer
2148                .write_entries(&BlockFifoRequest {
2149                    command: BlockFifoCommand {
2150                        opcode: BlockOpcode::Read.into_primitive(),
2151                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2152                        ..Default::default()
2153                    },
2154                    reqid: 1,
2155                    group: 1,
2156                    vmoid: vmo_id.id,
2157                    length: 1,
2158                    ..Default::default()
2159                })
2160                .await
2161                .unwrap();
2162
2163            let mut response = BlockFifoResponse::default();
2164            reader.read_entries(&mut response).await.unwrap();
2165            assert_eq!(response.status, zx::sys::ZX_OK);
2166        });
2167
2168        // The response won't come back until we send on `tx`.
2169        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2170
2171        let mut fut2 = pin!(proxy.get_volume_info());
2172
2173        // get_volume_info is set up to stall forever.
2174        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2175
2176        // If we now free up the first future, it should resolve; the stalled call to
2177        // get_volume_info should not block the fifo response.
2178        let _ = tx.send(());
2179
2180        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2181    }
2182
2183    #[fuchsia::test]
2184    async fn test_request_flow_control() {
2185        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2186
2187        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2188        // server will block until that happens.
2189        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2190        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2191        let event_clone = event.clone();
2192        futures::join!(
2193            async move {
2194                let block_server = BlockServer::new(
2195                    BLOCK_SIZE,
2196                    Arc::new(MockInterface {
2197                        read_hook: Some(Box::new(move |_, _, _, _| {
2198                            let event_clone = event_clone.clone();
2199                            Box::pin(async move {
2200                                if !event_clone.1.load(Ordering::SeqCst) {
2201                                    event_clone.0.listen().await;
2202                                }
2203                                Ok(())
2204                            })
2205                        })),
2206                    }),
2207                );
2208                block_server.handle_requests(stream).await.unwrap();
2209            },
2210            async move {
2211                let (session_proxy, server) = fidl::endpoints::create_proxy();
2212
2213                proxy.open_session(server).unwrap();
2214
2215                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2216                let vmo_id = session_proxy
2217                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2218                    .await
2219                    .unwrap()
2220                    .unwrap();
2221
2222                let mut fifo =
2223                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2224                let (mut reader, mut writer) = fifo.async_io();
2225
2226                for i in 0..MAX_REQUESTS {
2227                    writer
2228                        .write_entries(&BlockFifoRequest {
2229                            command: BlockFifoCommand {
2230                                opcode: BlockOpcode::Read.into_primitive(),
2231                                ..Default::default()
2232                            },
2233                            reqid: (i + 1) as u32,
2234                            dev_offset: i,
2235                            vmoid: vmo_id.id,
2236                            length: 1,
2237                            ..Default::default()
2238                        })
2239                        .await
2240                        .unwrap();
2241                }
2242                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2243                    command: BlockFifoCommand {
2244                        opcode: BlockOpcode::Read.into_primitive(),
2245                        ..Default::default()
2246                    },
2247                    reqid: u32::MAX,
2248                    dev_offset: MAX_REQUESTS,
2249                    vmoid: vmo_id.id,
2250                    length: 1,
2251                    ..Default::default()
2252                })))
2253                .is_pending());
2254                // OK, let the server start to process.
2255                event.1.store(true, Ordering::SeqCst);
2256                event.0.notify(usize::MAX);
2257                // For each entry we read, make sure we can write a new one in.
2258                let mut finished_reqids = vec![];
2259                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2260                    let mut response = BlockFifoResponse::default();
2261                    reader.read_entries(&mut response).await.unwrap();
2262                    assert_eq!(response.status, zx::sys::ZX_OK);
2263                    finished_reqids.push(response.reqid);
2264                    writer
2265                        .write_entries(&BlockFifoRequest {
2266                            command: BlockFifoCommand {
2267                                opcode: BlockOpcode::Read.into_primitive(),
2268                                ..Default::default()
2269                            },
2270                            reqid: (i + 1) as u32,
2271                            dev_offset: i,
2272                            vmoid: vmo_id.id,
2273                            length: 1,
2274                            ..Default::default()
2275                        })
2276                        .await
2277                        .unwrap();
2278                }
2279                let mut response = BlockFifoResponse::default();
2280                for _ in 0..MAX_REQUESTS {
2281                    reader.read_entries(&mut response).await.unwrap();
2282                    assert_eq!(response.status, zx::sys::ZX_OK);
2283                    finished_reqids.push(response.reqid);
2284                }
2285                // Verify that we got a response for each request.  Note that we can't assume FIFO
2286                // ordering.
2287                finished_reqids.sort();
2288                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2289                let mut i = 1;
2290                for reqid in finished_reqids {
2291                    assert_eq!(reqid, i);
2292                    i += 1;
2293                }
2294            }
2295        );
2296    }
2297
2298    #[fuchsia::test]
2299    async fn test_passthrough_io_with_fixed_map() {
2300        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2301
2302        let expected_op = Arc::new(Mutex::new(None));
2303        let expected_op_clone = expected_op.clone();
2304        futures::join!(
2305            async {
2306                let block_server = BlockServer::new(
2307                    BLOCK_SIZE,
2308                    Arc::new(IoMockInterface {
2309                        return_errors: false,
2310                        do_checks: true,
2311                        expected_op: expected_op_clone,
2312                    }),
2313                );
2314                block_server.handle_requests(stream).await.unwrap();
2315            },
2316            async move {
2317                let (session_proxy, server) = fidl::endpoints::create_proxy();
2318
2319                let mappings = [fblock::BlockOffsetMapping {
2320                    source_block_offset: 0,
2321                    target_block_offset: 10,
2322                    length: 20,
2323                }];
2324                proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2325
2326                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2327                let vmo_id = session_proxy
2328                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2329                    .await
2330                    .unwrap()
2331                    .unwrap();
2332
2333                let mut fifo =
2334                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2335                let (mut reader, mut writer) = fifo.async_io();
2336
2337                // READ
2338                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2339                writer
2340                    .write_entries(&BlockFifoRequest {
2341                        command: BlockFifoCommand {
2342                            opcode: BlockOpcode::Read.into_primitive(),
2343                            ..Default::default()
2344                        },
2345                        vmoid: vmo_id.id,
2346                        dev_offset: 1,
2347                        length: 2,
2348                        vmo_offset: 3,
2349                        ..Default::default()
2350                    })
2351                    .await
2352                    .unwrap();
2353
2354                let mut response = BlockFifoResponse::default();
2355                reader.read_entries(&mut response).await.unwrap();
2356                assert_eq!(response.status, zx::sys::ZX_OK);
2357
2358                // WRITE
2359                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2360                writer
2361                    .write_entries(&BlockFifoRequest {
2362                        command: BlockFifoCommand {
2363                            opcode: BlockOpcode::Write.into_primitive(),
2364                            ..Default::default()
2365                        },
2366                        vmoid: vmo_id.id,
2367                        dev_offset: 4,
2368                        length: 5,
2369                        vmo_offset: 6,
2370                        ..Default::default()
2371                    })
2372                    .await
2373                    .unwrap();
2374
2375                reader.read_entries(&mut response).await.unwrap();
2376                assert_eq!(response.status, zx::sys::ZX_OK);
2377
2378                // FLUSH
2379                *expected_op.lock() = Some(ExpectedOp::Flush);
2380                writer
2381                    .write_entries(&BlockFifoRequest {
2382                        command: BlockFifoCommand {
2383                            opcode: BlockOpcode::Flush.into_primitive(),
2384                            ..Default::default()
2385                        },
2386                        ..Default::default()
2387                    })
2388                    .await
2389                    .unwrap();
2390
2391                reader.read_entries(&mut response).await.unwrap();
2392                assert_eq!(response.status, zx::sys::ZX_OK);
2393
2394                // TRIM
2395                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2396                writer
2397                    .write_entries(&BlockFifoRequest {
2398                        command: BlockFifoCommand {
2399                            opcode: BlockOpcode::Trim.into_primitive(),
2400                            ..Default::default()
2401                        },
2402                        dev_offset: 7,
2403                        length: 3,
2404                        ..Default::default()
2405                    })
2406                    .await
2407                    .unwrap();
2408
2409                reader.read_entries(&mut response).await.unwrap();
2410                assert_eq!(response.status, zx::sys::ZX_OK);
2411
2412                // READ past window
2413                *expected_op.lock() = None;
2414                writer
2415                    .write_entries(&BlockFifoRequest {
2416                        command: BlockFifoCommand {
2417                            opcode: BlockOpcode::Read.into_primitive(),
2418                            ..Default::default()
2419                        },
2420                        vmoid: vmo_id.id,
2421                        dev_offset: 19,
2422                        length: 2,
2423                        vmo_offset: 3,
2424                        ..Default::default()
2425                    })
2426                    .await
2427                    .unwrap();
2428
2429                reader.read_entries(&mut response).await.unwrap();
2430                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2431
2432                std::mem::drop(proxy);
2433            }
2434        );
2435    }
2436}