block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::bin::Bin;
5use anyhow::{Error, anyhow};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use zx::HandleBased;
19use {
20    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn block_count(&self) -> Option<u64> {
40        match self {
41            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42            Self::Partition(PartitionInfo { block_range, .. }) => {
43                block_range.as_ref().map(|range| range.end - range.start)
44            }
45        }
46    }
47
48    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49        match self {
50            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52                max_transfer_blocks.clone()
53            }
54        }
55    }
56
57    fn max_transfer_size(&self, block_size: u32) -> u32 {
58        if let Some(max_blocks) = self.max_transfer_blocks() {
59            max_blocks.get() * block_size
60        } else {
61            MAX_TRANSFER_UNBOUNDED
62        }
63    }
64}
65
66/// Information associated with non-partition block devices.
67#[derive(Clone, Default)]
68pub struct BlockInfo {
69    pub device_flags: fblock::Flag,
70    pub block_count: u64,
71    pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74/// Information associated with a block device that is also a partition.
75#[derive(Clone, Default)]
76pub struct PartitionInfo {
77    /// The device flags reported by the underlying device.
78    pub device_flags: fblock::Flag,
79    pub max_transfer_blocks: Option<NonZero<u32>>,
80    /// If `block_range` is None, the partition is a volume and may not be contiguous.
81    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
82    /// slices and use that (along with the slice and block sizes) to determine the block count.
83    pub block_range: Option<Range<u64>>,
84    pub type_guid: [u8; 16],
85    pub instance_guid: [u8; 16],
86    pub name: String,
87    pub flags: u64,
88}
89
90/// We internally keep track of active requests, so that when the server is torn down, we can
91/// deallocate all of the resources for pending requests.
92struct ActiveRequest<S> {
93    session: S,
94    group_or_request: GroupOrRequest,
95    trace_flow_id: TraceFlowId,
96    vmo_bin_key: Option<usize>,
97    status: zx::Status,
98    count: u32,
99    req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105    fn default() -> Self {
106        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107    }
108}
109
110impl<S> ActiveRequests<S> {
111    fn complete_and_take_response(
112        &self,
113        request_id: RequestId,
114        status: zx::Status,
115    ) -> Option<(S, BlockFifoResponse)> {
116        self.0.lock().complete_and_take_response(request_id, status)
117    }
118}
119
120struct ActiveRequestsInner<S> {
121    requests: Slab<ActiveRequest<S>>,
122    vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125// Keeps track of all the requests that are currently being processed
126impl<S> ActiveRequestsInner<S> {
127    /// Completes a request.
128    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129        let group = &mut self.requests[request_id.0];
130
131        group.count = group.count.checked_sub(1).unwrap();
132        if status != zx::Status::OK && group.status == zx::Status::OK {
133            group.status = status
134        }
135
136        fuchsia_trace::duration!(
137            c"storage",
138            c"block_server::finish_transaction",
139            "request_id" => request_id.0,
140            "group_completed" => group.count == 0,
141            "status" => status.into_raw());
142        if let Some(trace_flow_id) = group.trace_flow_id {
143            fuchsia_trace::flow_step!(
144                c"storage",
145                c"block_server::finish_request",
146                trace_flow_id.get().into()
147            );
148        }
149    }
150
151    /// Takes the response if all requests are finished.
152    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153        let group = &self.requests[request_id.0];
154        match group.req_id {
155            Some(reqid) if group.count == 0 => {
156                let group = self.requests.remove(request_id.0);
157                if let Some(vmo_bin_key) = group.vmo_bin_key {
158                    self.vmo_bin.release(vmo_bin_key);
159                }
160                Some((
161                    group.session,
162                    BlockFifoResponse {
163                        status: group.status.into_raw(),
164                        reqid,
165                        group: group.group_or_request.group_id().unwrap_or(0),
166                        ..Default::default()
167                    },
168                ))
169            }
170            _ => None,
171        }
172    }
173
174    /// Competes the request and returns a response if the request group is finished.
175    fn complete_and_take_response(
176        &mut self,
177        request_id: RequestId,
178        status: zx::Status,
179    ) -> Option<(S, BlockFifoResponse)> {
180        self.complete(request_id, status);
181        self.take_response(request_id)
182    }
183}
184
185/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
186/// cbindgen:no-export
187pub struct BlockServer<SM> {
188    block_size: u32,
189    session_manager: Arc<SM>,
190}
191
192/// A single entry in `[OffsetMap]`.
193#[derive(Debug)]
194pub struct BlockOffsetMapping {
195    source_block_offset: u64,
196    target_block_offset: u64,
197    length: u64,
198}
199
200impl BlockOffsetMapping {
201    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202        blocks.0 >= self.source_block_offset
203            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204    }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208    type Error = zx::Status;
209
210    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213        Ok(Self {
214            source_block_offset: wire.source_block_offset,
215            target_block_offset: wire.target_block_offset,
216            length: wire.length,
217        })
218    }
219}
220
221/// Remaps the offset of block requests based on an internal map, and truncates long requests.
222pub struct OffsetMap {
223    mapping: Option<BlockOffsetMapping>,
224    max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228    /// An OffsetMap that remaps requests.
229    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230        Self { mapping: Some(mapping), max_transfer_blocks }
231    }
232
233    /// An OffsetMap that just enforces maximum request sizes.
234    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235        Self { mapping: None, max_transfer_blocks }
236    }
237
238    pub fn is_empty(&self) -> bool {
239        self.mapping.is_none()
240    }
241
242    fn mapping(&self) -> Option<&BlockOffsetMapping> {
243        self.mapping.as_ref()
244    }
245
246    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247        self.max_transfer_blocks
248    }
249}
250
251// Methods take Arc<Self> rather than &self because of
252// https://github.com/rust-lang/rust/issues/42940.
253pub trait SessionManager: 'static {
254    type Session;
255
256    fn on_attach_vmo(
257        self: Arc<Self>,
258        vmo: &Arc<zx::Vmo>,
259    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261    /// Creates a new session to handle `stream`.
262    /// The returned future should run until the session completes, for example when the client end
263    /// closes.
264    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
265    fn open_session(
266        self: Arc<Self>,
267        stream: fblock::SessionRequestStream,
268        offset_map: OffsetMap,
269        block_size: u32,
270    ) -> impl Future<Output = Result<(), Error>> + Send;
271
272    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
273    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275    /// Called to handle the GetVolumeInfo FIDL call.
276    fn get_volume_info(
277        &self,
278    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279    {
280        async { Err(zx::Status::NOT_SUPPORTED) }
281    }
282
283    /// Called to handle the QuerySlices FIDL call.
284    fn query_slices(
285        &self,
286        _start_slices: &[u64],
287    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288        async { Err(zx::Status::NOT_SUPPORTED) }
289    }
290
291    /// Called to handle the Shrink FIDL call.
292    fn extend(
293        &self,
294        _start_slice: u64,
295        _slice_count: u64,
296    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297        async { Err(zx::Status::NOT_SUPPORTED) }
298    }
299
300    /// Called to handle the Shrink FIDL call.
301    fn shrink(
302        &self,
303        _start_slice: u64,
304        _slice_count: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306        async { Err(zx::Status::NOT_SUPPORTED) }
307    }
308
309    /// Returns the active requests.
310    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314    type SM: SessionManager;
315
316    fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321        Self { block_size, session_manager: session_manager.into_session_manager() }
322    }
323
324    pub fn session_manager(&self) -> &SM {
325        self.session_manager.as_ref()
326    }
327
328    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
329    pub async fn handle_requests(
330        &self,
331        mut requests: fvolume::VolumeRequestStream,
332    ) -> Result<(), Error> {
333        let scope = fasync::Scope::new();
334        loop {
335            match requests.try_next().await {
336                Ok(Some(request)) => {
337                    if let Some(session) = self.handle_request(request).await? {
338                        scope.spawn(session.map(|_| ()));
339                    }
340                }
341                Ok(None) => break,
342                Err(err) => log::warn!(err:?; "Invalid request"),
343            }
344        }
345        scope.await;
346        Ok(())
347    }
348
349    /// Processes a partition request.  If a new session task is created in response to the request,
350    /// it is returned.
351    async fn handle_request(
352        &self,
353        request: fvolume::VolumeRequest,
354    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
355        match request {
356            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
357                Ok(info) => {
358                    let max_transfer_size = info.max_transfer_size(self.block_size);
359                    let (block_count, flags) = match info.as_ref() {
360                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
361                            (*block_count, *device_flags)
362                        }
363                        DeviceInfo::Partition(partition_info) => {
364                            let block_count = if let Some(range) =
365                                partition_info.block_range.as_ref()
366                            {
367                                range.end - range.start
368                            } else {
369                                let volume_info = self.session_manager.get_volume_info().await?;
370                                volume_info.0.slice_size * volume_info.1.partition_slice_count
371                                    / self.block_size as u64
372                            };
373                            (block_count, partition_info.device_flags)
374                        }
375                    };
376                    responder.send(Ok(&fblock::BlockInfo {
377                        block_count,
378                        block_size: self.block_size,
379                        max_transfer_size,
380                        flags,
381                    }))?;
382                }
383                Err(status) => responder.send(Err(status.into_raw()))?,
384            },
385            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
386                match self.device_info().await {
387                    Ok(info) => {
388                        return Ok(Some(self.session_manager.clone().open_session(
389                            session.into_stream(),
390                            OffsetMap::empty(info.max_transfer_blocks()),
391                            self.block_size,
392                        )));
393                    }
394                    Err(status) => session.close_with_epitaph(status)?,
395                }
396            }
397            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
398                session,
399                mapping,
400                control_handle: _,
401            } => match self.device_info().await {
402                Ok(info) => {
403                    let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
404                        Ok(m) => m,
405                        Err(status) => {
406                            session.close_with_epitaph(status)?;
407                            return Ok(None);
408                        }
409                    };
410                    if let Some(max) = info.block_count() {
411                        if initial_mapping.target_block_offset + initial_mapping.length > max {
412                            log::warn!(
413                                "Invalid mapping for session: {initial_mapping:?} (max {max})"
414                            );
415                            session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
416                            return Ok(None);
417                        }
418                    }
419                    return Ok(Some(self.session_manager.clone().open_session(
420                        session.into_stream(),
421                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
422                        self.block_size,
423                    )));
424                }
425                Err(status) => session.close_with_epitaph(status)?,
426            },
427            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
428                Ok(info) => {
429                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
430                        let mut guid =
431                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
432                        guid.value.copy_from_slice(&partition_info.type_guid);
433                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
434                    } else {
435                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
436                    }
437                }
438                Err(status) => {
439                    responder.send(status.into_raw(), None)?;
440                }
441            },
442            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
443                match self.device_info().await {
444                    Ok(info) => {
445                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
446                            let mut guid =
447                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
448                            guid.value.copy_from_slice(&partition_info.instance_guid);
449                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
450                        } else {
451                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
452                        }
453                    }
454                    Err(status) => {
455                        responder.send(status.into_raw(), None)?;
456                    }
457                }
458            }
459            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
460                Ok(info) => {
461                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
462                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
463                    } else {
464                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
465                    }
466                }
467                Err(status) => {
468                    responder.send(status.into_raw(), None)?;
469                }
470            },
471            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
472                Ok(info) => {
473                    if let DeviceInfo::Partition(info) = info.as_ref() {
474                        let mut type_guid =
475                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
476                        type_guid.value.copy_from_slice(&info.type_guid);
477                        let mut instance_guid =
478                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
479                        instance_guid.value.copy_from_slice(&info.instance_guid);
480                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
481                            name: Some(info.name.clone()),
482                            type_guid: Some(type_guid),
483                            instance_guid: Some(instance_guid),
484                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
485                            num_blocks: info
486                                .block_range
487                                .as_ref()
488                                .map(|range| range.end - range.start),
489                            flags: Some(info.flags),
490                            ..Default::default()
491                        }))?;
492                    }
493                }
494                Err(status) => responder.send(Err(status.into_raw()))?,
495            },
496            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
497                match self.session_manager.query_slices(&start_slices).await {
498                    Ok(mut results) => {
499                        let results_len = results.len();
500                        assert!(results_len <= 16);
501                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
502                        responder.send(
503                            zx::sys::ZX_OK,
504                            &results.try_into().unwrap(),
505                            results_len as u64,
506                        )?;
507                    }
508                    Err(s) => {
509                        responder.send(
510                            s.into_raw(),
511                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
512                            0,
513                        )?;
514                    }
515                }
516            }
517            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
518                match self.session_manager.get_volume_info().await {
519                    Ok((manager_info, volume_info)) => {
520                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
521                    }
522                    Err(s) => responder.send(s.into_raw(), None, None)?,
523                }
524            }
525            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
526                responder.send(
527                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
528                        .into_raw(),
529                )?;
530            }
531            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
532                responder.send(
533                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
534                        .into_raw(),
535                )?;
536            }
537            fvolume::VolumeRequest::Destroy { responder, .. } => {
538                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
539            }
540        }
541        Ok(None)
542    }
543
544    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
545        self.session_manager.get_info().await
546    }
547}
548
549struct SessionHelper<SM: SessionManager> {
550    session_manager: Arc<SM>,
551    offset_map: OffsetMap,
552    block_size: u32,
553    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
554    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
555}
556
557impl<SM: SessionManager> SessionHelper<SM> {
558    fn new(
559        session_manager: Arc<SM>,
560        offset_map: OffsetMap,
561        block_size: u32,
562    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
563        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
564        Ok((
565            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
566            fifo,
567        ))
568    }
569
570    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
571        match request {
572            fblock::SessionRequest::GetFifo { responder } => {
573                let rights = zx::Rights::TRANSFER
574                    | zx::Rights::READ
575                    | zx::Rights::WRITE
576                    | zx::Rights::SIGNAL
577                    | zx::Rights::WAIT;
578                match self.peer_fifo.duplicate_handle(rights) {
579                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
580                    Err(s) => responder.send(Err(s.into_raw()))?,
581                }
582                Ok(())
583            }
584            fblock::SessionRequest::AttachVmo { vmo, responder } => {
585                let vmo = Arc::new(vmo);
586                let vmo_id = {
587                    let mut vmos = self.vmos.lock();
588                    if vmos.len() == u16::MAX as usize {
589                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
590                        return Ok(());
591                    } else {
592                        let vmo_id = match vmos.last_entry() {
593                            None => 1,
594                            Some(o) => {
595                                o.key().checked_add(1).unwrap_or_else(|| {
596                                    let mut vmo_id = 1;
597                                    // Find the first gap...
598                                    for (&id, _) in &*vmos {
599                                        if id > vmo_id {
600                                            break;
601                                        }
602                                        vmo_id = id + 1;
603                                    }
604                                    vmo_id
605                                })
606                            }
607                        };
608                        vmos.insert(vmo_id, vmo.clone());
609                        vmo_id
610                    }
611                };
612                self.session_manager.clone().on_attach_vmo(&vmo).await?;
613                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
614                Ok(())
615            }
616            fblock::SessionRequest::Close { responder } => {
617                responder.send(Ok(()))?;
618                Err(anyhow!("Closed"))
619            }
620        }
621    }
622
623    /// Decodes `request`.
624    fn decode_fifo_request(
625        &self,
626        session: SM::Session,
627        request: &BlockFifoRequest,
628    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
629        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
630        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
631            .ok_or(zx::Status::INVALID_ARGS)
632            .and_then(|code| {
633                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
634                    if request.length == 0 {
635                        return Err(zx::Status::INVALID_ARGS);
636                    }
637                    // Make sure the end offsets won't wrap.
638                    if request.dev_offset.checked_add(request.length as u64).is_none()
639                        || (code != BlockOpcode::Trim
640                            && (request.length as u64 * self.block_size as u64)
641                                .checked_add(request.vmo_offset)
642                                .is_none())
643                    {
644                        return Err(zx::Status::OUT_OF_RANGE);
645                    }
646                }
647                Ok(match code {
648                    BlockOpcode::Read => Operation::Read {
649                        device_block_offset: request.dev_offset,
650                        block_count: request.length,
651                        _unused: 0,
652                        vmo_offset: request
653                            .vmo_offset
654                            .checked_mul(self.block_size as u64)
655                            .ok_or(zx::Status::OUT_OF_RANGE)?,
656                        options: ReadOptions {
657                            inline_crypto_options: InlineCryptoOptions {
658                                dun: request.dun,
659                                slot: request.slot,
660                            },
661                        },
662                    },
663                    BlockOpcode::Write => {
664                        let mut options = WriteOptions {
665                            inline_crypto_options: InlineCryptoOptions {
666                                dun: request.dun,
667                                slot: request.slot,
668                            },
669                            ..WriteOptions::default()
670                        };
671                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
672                            options.flags |= WriteFlags::FORCE_ACCESS;
673                        }
674                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
675                            options.flags |= WriteFlags::PRE_BARRIER;
676                        }
677                        Operation::Write {
678                            device_block_offset: request.dev_offset,
679                            block_count: request.length,
680                            _unused: 0,
681                            options,
682                            vmo_offset: request
683                                .vmo_offset
684                                .checked_mul(self.block_size as u64)
685                                .ok_or(zx::Status::OUT_OF_RANGE)?,
686                        }
687                    }
688                    BlockOpcode::Flush => Operation::Flush,
689                    BlockOpcode::Trim => Operation::Trim {
690                        device_block_offset: request.dev_offset,
691                        block_count: request.length,
692                    },
693                    BlockOpcode::CloseVmo => Operation::CloseVmo,
694                })
695            });
696
697        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
698            GroupOrRequest::Group(request.group)
699        } else {
700            GroupOrRequest::Request(request.reqid)
701        };
702
703        let mut active_requests = self.session_manager.active_requests().0.lock();
704        let mut request_id = None;
705
706        // Multiple Block I/O request may be sent as a group.
707        // Notes:
708        // - the group is identified by the group id in the request
709        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
710        //   flag is set.
711        // - when processing a request of a group fails, subsequent requests of that
712        //   group will not be processed.
713        //
714        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
715        if group_or_request.is_group() {
716            // Search for an existing entry that matches this group.  NOTE: This is a potentially
717            // expensive way to find a group (it's iterating over all slots in the active-requests
718            // slab).  This can be optimised easily should we need to.
719            for (key, group) in &mut active_requests.requests {
720                if group.group_or_request == group_or_request {
721                    if group.req_id.is_some() {
722                        // We have already received a request tagged as last.
723                        if group.status == zx::Status::OK {
724                            group.status = zx::Status::INVALID_ARGS;
725                        }
726                        // Ignore this request.
727                        return Err(None);
728                    }
729                    if flags.contains(BlockIoFlag::GROUP_LAST) {
730                        group.req_id = Some(request.reqid);
731                        // If the group has had an error, there is no point trying to issue this
732                        // request.
733                        if group.status != zx::Status::OK {
734                            operation = Err(group.status);
735                        }
736                    } else if group.status != zx::Status::OK {
737                        // The group has already encountered an error, so there is no point trying
738                        // to issue this request.
739                        return Err(None);
740                    }
741                    request_id = Some(RequestId(key));
742                    group.count += 1;
743                    break;
744                }
745            }
746        }
747
748        let mut retain_vmo = false;
749        let vmo = match &operation {
750            Ok(Operation::Read { .. } | Operation::Write { .. }) => {
751                self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
752                    retain_vmo = true;
753                    Ok(Some(vmo))
754                })
755            }
756            Ok(Operation::CloseVmo) => {
757                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
758                    active_requests.vmo_bin.add(vmo.clone());
759                    Ok(Some(vmo))
760                })
761            }
762            _ => Ok(None),
763        }
764        .unwrap_or_else(|e| {
765            operation = Err(e);
766            None
767        });
768
769        let trace_flow_id = NonZero::new(request.trace_flow_id);
770        let request_id = request_id.unwrap_or_else(|| {
771            let vmo_bin_key =
772                if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
773            RequestId(active_requests.requests.insert(ActiveRequest {
774                session,
775                group_or_request,
776                trace_flow_id,
777                vmo_bin_key,
778                status: zx::Status::OK,
779                count: 1,
780                req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
781                    || flags.contains(BlockIoFlag::GROUP_LAST)
782                {
783                    Some(request.reqid)
784                } else {
785                    None
786                },
787            }))
788        });
789
790        Ok(DecodedRequest {
791            request_id,
792            trace_flow_id,
793            operation: operation.map_err(|status| {
794                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
795            })?,
796            vmo,
797        })
798    }
799
800    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
801        std::mem::take(&mut *self.vmos.lock())
802    }
803
804    /// Maps the request and returns the mapped request with an optional remainder.
805    fn map_request(
806        &self,
807        mut request: DecodedRequest,
808    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
809        let mut active_requests = self.session_manager.active_requests().0.lock();
810        let active_request = &mut active_requests.requests[request.request_id.0];
811        if active_request.status != zx::Status::OK {
812            return Err(active_requests
813                .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
814                .map(|(_, r)| r));
815        }
816        let mapping = self.offset_map.mapping();
817        match (mapping, request.operation.blocks()) {
818            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
819                return Err(active_requests
820                    .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
821                    .map(|(_, r)| r));
822            }
823            _ => {}
824        }
825        let remainder = request.operation.map(
826            self.offset_map.mapping(),
827            self.offset_map.max_transfer_blocks(),
828            self.block_size,
829        );
830        if remainder.is_some() {
831            active_request.count += 1;
832        }
833        static CACHE: AtomicU64 = AtomicU64::new(0);
834        if let Some(context) =
835            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
836        {
837            use fuchsia_trace::ArgValue;
838            let trace_args = [
839                ArgValue::of("request_id", request.request_id.0),
840                ArgValue::of("opcode", request.operation.trace_label()),
841            ];
842            let _scope = fuchsia_trace::duration(
843                c"storage",
844                c"block_server::start_transaction",
845                &trace_args,
846            );
847            if let Some(trace_flow_id) = active_request.trace_flow_id {
848                fuchsia_trace::flow_step(
849                    &context,
850                    c"block_server::start_transaction",
851                    trace_flow_id.get().into(),
852                    &[],
853                );
854            }
855        }
856        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
857        Ok((request, remainder))
858    }
859
860    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
861        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
862    }
863}
864
865#[repr(transparent)]
866#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
867pub struct RequestId(usize);
868
869#[derive(Clone, Debug)]
870struct DecodedRequest {
871    request_id: RequestId,
872    trace_flow_id: TraceFlowId,
873    operation: Operation,
874    vmo: Option<Arc<zx::Vmo>>,
875}
876
877/// cbindgen:no-export
878pub type WriteFlags = block_protocol::WriteFlags;
879pub type WriteOptions = block_protocol::WriteOptions;
880pub type ReadOptions = block_protocol::ReadOptions;
881
882#[repr(C)]
883#[derive(Clone, Debug, PartialEq, Eq)]
884pub enum Operation {
885    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
886    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
887    // code can read `device_block_offset` from the read variant and then assume it's valid for the
888    // write variant).
889    Read {
890        device_block_offset: u64,
891        block_count: u32,
892        _unused: u32,
893        vmo_offset: u64,
894        options: ReadOptions,
895    },
896    Write {
897        device_block_offset: u64,
898        block_count: u32,
899        _unused: u32,
900        vmo_offset: u64,
901        options: WriteOptions,
902    },
903    Flush,
904    Trim {
905        device_block_offset: u64,
906        block_count: u32,
907    },
908    /// This will never be seen by the C interface.
909    CloseVmo,
910}
911
912impl Operation {
913    fn trace_label(&self) -> &'static str {
914        match self {
915            Operation::Read { .. } => "read",
916            Operation::Write { .. } => "write",
917            Operation::Flush { .. } => "flush",
918            Operation::Trim { .. } => "trim",
919            Operation::CloseVmo { .. } => "close_vmo",
920        }
921    }
922
923    /// Returns (offset, length).
924    fn blocks(&self) -> Option<(u64, u32)> {
925        match self {
926            Operation::Read { device_block_offset, block_count, .. }
927            | Operation::Write { device_block_offset, block_count, .. }
928            | Operation::Trim { device_block_offset, block_count, .. } => {
929                Some((*device_block_offset, *block_count))
930            }
931            _ => None,
932        }
933    }
934
935    /// Returns mutable references to (offset, length).
936    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
937        match self {
938            Operation::Read { device_block_offset, block_count, .. }
939            | Operation::Write { device_block_offset, block_count, .. }
940            | Operation::Trim { device_block_offset, block_count, .. } => {
941                Some((device_block_offset, block_count))
942            }
943            _ => None,
944        }
945    }
946
947    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
948    /// start of the operation.
949    fn map(
950        &mut self,
951        mapping: Option<&BlockOffsetMapping>,
952        max_blocks: Option<NonZero<u32>>,
953        block_size: u32,
954    ) -> Option<Self> {
955        let mut max = match self {
956            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
957            _ => None,
958        };
959        let (offset, length) = self.blocks_mut()?;
960        let orig_offset = *offset;
961        if let Some(mapping) = mapping {
962            let delta = *offset - mapping.source_block_offset;
963            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
964            *offset = mapping.target_block_offset + delta;
965            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
966            max = match max {
967                None => Some(mapping_max),
968                Some(m) => Some(std::cmp::min(m, mapping_max)),
969            };
970        };
971        if let Some(max) = max {
972            if *length as u64 > max {
973                let rem = (*length as u64 - max) as u32;
974                *length = max as u32;
975                return Some(match self {
976                    Operation::Read {
977                        device_block_offset: _,
978                        block_count: _,
979                        vmo_offset,
980                        _unused,
981                        options,
982                    } => Operation::Read {
983                        device_block_offset: orig_offset + max,
984                        block_count: rem,
985                        vmo_offset: *vmo_offset + max * block_size as u64,
986                        _unused: *_unused,
987                        options: *options,
988                    },
989                    Operation::Write {
990                        device_block_offset: _,
991                        block_count: _,
992                        _unused,
993                        vmo_offset,
994                        options,
995                    } => {
996                        // Only send the barrier flag once per write request.
997                        let new_options = WriteOptions {
998                            flags: options.flags & !WriteFlags::PRE_BARRIER,
999                            ..*options
1000                        };
1001
1002                        Operation::Write {
1003                            device_block_offset: orig_offset + max,
1004                            block_count: rem,
1005                            _unused: *_unused,
1006                            vmo_offset: *vmo_offset + max * block_size as u64,
1007                            options: new_options,
1008                        }
1009                    }
1010                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1011                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1012                    }
1013                    _ => unreachable!(),
1014                });
1015            }
1016        }
1017        None
1018    }
1019}
1020
1021#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1022pub enum GroupOrRequest {
1023    Group(u16),
1024    Request(u32),
1025}
1026
1027impl GroupOrRequest {
1028    fn is_group(&self) -> bool {
1029        matches!(self, Self::Group(_))
1030    }
1031
1032    fn group_id(&self) -> Option<u16> {
1033        match self {
1034            Self::Group(id) => Some(*id),
1035            Self::Request(_) => None,
1036        }
1037    }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use super::{
1043        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1044        TraceFlowId,
1045    };
1046    use assert_matches::assert_matches;
1047    use block_protocol::{
1048        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1049        WriteOptions,
1050    };
1051    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1052    use fuchsia_sync::Mutex;
1053    use futures::FutureExt as _;
1054    use futures::channel::oneshot;
1055    use futures::future::BoxFuture;
1056    use std::borrow::Cow;
1057    use std::future::poll_fn;
1058    use std::num::NonZero;
1059    use std::pin::pin;
1060    use std::sync::Arc;
1061    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1062    use std::task::{Context, Poll};
1063    use zx::{AsHandleRef as _, HandleBased as _};
1064    use {
1065        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1066        fuchsia_async as fasync,
1067    };
1068
1069    #[derive(Default)]
1070    struct MockInterface {
1071        read_hook: Option<
1072            Box<
1073                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1074                    + Send
1075                    + Sync,
1076            >,
1077        >,
1078        write_hook:
1079            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1080        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1081    }
1082
1083    impl super::async_interface::Interface for MockInterface {
1084        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1085            Ok(())
1086        }
1087
1088        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1089            Ok(Cow::Owned(test_device_info()))
1090        }
1091
1092        async fn read(
1093            &self,
1094            device_block_offset: u64,
1095            block_count: u32,
1096            vmo: &Arc<zx::Vmo>,
1097            vmo_offset: u64,
1098            _opts: ReadOptions,
1099            _trace_flow_id: TraceFlowId,
1100        ) -> Result<(), zx::Status> {
1101            if let Some(read_hook) = &self.read_hook {
1102                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1103            } else {
1104                unimplemented!();
1105            }
1106        }
1107
1108        async fn write(
1109            &self,
1110            device_block_offset: u64,
1111            _block_count: u32,
1112            _vmo: &Arc<zx::Vmo>,
1113            _vmo_offset: u64,
1114            _opts: WriteOptions,
1115            _trace_flow_id: TraceFlowId,
1116        ) -> Result<(), zx::Status> {
1117            if let Some(write_hook) = &self.write_hook {
1118                write_hook(device_block_offset).await
1119            } else {
1120                unimplemented!();
1121            }
1122        }
1123
1124        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1125            unreachable!();
1126        }
1127
1128        fn barrier(&self) -> Result<(), zx::Status> {
1129            if let Some(barrier_hook) = &self.barrier_hook {
1130                barrier_hook()
1131            } else {
1132                unimplemented!();
1133            }
1134        }
1135
1136        async fn trim(
1137            &self,
1138            _device_block_offset: u64,
1139            _block_count: u32,
1140            _trace_flow_id: TraceFlowId,
1141        ) -> Result<(), zx::Status> {
1142            unreachable!();
1143        }
1144
1145        async fn get_volume_info(
1146            &self,
1147        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1148            // Hang forever for the test_requests_dont_block_sessions test.
1149            let () = std::future::pending().await;
1150            unreachable!();
1151        }
1152    }
1153
1154    const BLOCK_SIZE: u32 = 512;
1155    const MAX_TRANSFER_BLOCKS: u32 = 10;
1156
1157    fn test_device_info() -> DeviceInfo {
1158        DeviceInfo::Partition(PartitionInfo {
1159            device_flags: fblock::Flag::READONLY,
1160            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1161            block_range: Some(0..100),
1162            type_guid: [1; 16],
1163            instance_guid: [2; 16],
1164            name: "foo".to_string(),
1165            flags: 0xabcd,
1166        })
1167    }
1168
1169    #[fuchsia::test]
1170    async fn test_split_request_only_issues_single_barrier() {
1171        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1172        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1173        let barrier_called = Arc::new(AtomicU32::new(0));
1174        let barrier_called_clone = barrier_called.clone();
1175
1176        futures::join!(
1177            async move {
1178                let block_server = BlockServer::new(
1179                    BLOCK_SIZE,
1180                    Arc::new(MockInterface {
1181                        barrier_hook: Some(Box::new(move || {
1182                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1183                            Ok(())
1184                        })),
1185                        write_hook: Some(Box::new(move |_device_block_offset| {
1186                            Box::pin(async move { Ok(()) })
1187                        })),
1188                        ..MockInterface::default()
1189                    }),
1190                );
1191                block_server.handle_requests(stream).await.unwrap();
1192            },
1193            async move {
1194                let (session_proxy, server) = fidl::endpoints::create_proxy();
1195
1196                proxy.open_session(server).unwrap();
1197
1198                let vmo_id = session_proxy
1199                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1200                    .await
1201                    .unwrap()
1202                    .unwrap();
1203                assert_ne!(vmo_id.id, 0);
1204
1205                let mut fifo =
1206                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1207                let (mut reader, mut writer) = fifo.async_io();
1208                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1209                // sub requests.
1210                writer
1211                    .write_entries(&BlockFifoRequest {
1212                        command: BlockFifoCommand {
1213                            opcode: BlockOpcode::Write.into_primitive(),
1214                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1215                            ..Default::default()
1216                        },
1217                        vmoid: vmo_id.id,
1218                        dev_offset: 0,
1219                        length: MAX_TRANSFER_BLOCKS + 1,
1220                        vmo_offset: 6,
1221                        ..Default::default()
1222                    })
1223                    .await
1224                    .unwrap();
1225
1226                let mut response = BlockFifoResponse::default();
1227                reader.read_entries(&mut response).await.unwrap();
1228                assert_eq!(response.status, zx::sys::ZX_OK);
1229
1230                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1231
1232                std::mem::drop(proxy);
1233            }
1234        );
1235    }
1236
1237    #[fuchsia::test]
1238    async fn test_barriers_not_supported() {
1239        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1240        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1241
1242        futures::join!(
1243            async move {
1244                let block_server = BlockServer::new(
1245                    BLOCK_SIZE,
1246                    Arc::new(MockInterface {
1247                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1248                        write_hook: Some(Box::new(move |_device_block_offset| {
1249                            Box::pin(async move { Ok(()) })
1250                        })),
1251                        ..MockInterface::default()
1252                    }),
1253                );
1254                block_server.handle_requests(stream).await.unwrap();
1255            },
1256            async move {
1257                let (session_proxy, server) = fidl::endpoints::create_proxy();
1258
1259                proxy.open_session(server).unwrap();
1260
1261                let vmo_id = session_proxy
1262                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1263                    .await
1264                    .unwrap()
1265                    .unwrap();
1266                assert_ne!(vmo_id.id, 0);
1267
1268                let mut fifo =
1269                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1270                let (mut reader, mut writer) = fifo.async_io();
1271
1272                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1273                // sub requests.
1274                writer
1275                    .write_entries(&BlockFifoRequest {
1276                        command: BlockFifoCommand {
1277                            opcode: BlockOpcode::Write.into_primitive(),
1278                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1279                            ..Default::default()
1280                        },
1281                        vmoid: vmo_id.id,
1282                        dev_offset: 0,
1283                        length: MAX_TRANSFER_BLOCKS + 1,
1284                        vmo_offset: 6,
1285                        ..Default::default()
1286                    })
1287                    .await
1288                    .unwrap();
1289
1290                let mut response = BlockFifoResponse::default();
1291                reader.read_entries(&mut response).await.unwrap();
1292                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1293
1294                std::mem::drop(proxy);
1295            }
1296        );
1297    }
1298
1299    #[fuchsia::test]
1300    async fn test_barriers_ordering() {
1301        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1302        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1303        let barrier_called = Arc::new(AtomicBool::new(false));
1304
1305        futures::join!(
1306            async move {
1307                let barrier_called_clone = barrier_called.clone();
1308                let block_server = BlockServer::new(
1309                    BLOCK_SIZE,
1310                    Arc::new(MockInterface {
1311                        barrier_hook: Some(Box::new(move || {
1312                            barrier_called.store(true, Ordering::Relaxed);
1313                            Ok(())
1314                        })),
1315                        write_hook: Some(Box::new(move |device_block_offset| {
1316                            let barrier_called = barrier_called_clone.clone();
1317                            Box::pin(async move {
1318                                // The sleep allows the server to reorder the fifo requests.
1319                                if device_block_offset % 2 == 0 {
1320                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1321                                        zx::MonotonicDuration::from_millis(200),
1322                                    ))
1323                                    .await;
1324                                }
1325                                assert!(barrier_called.load(Ordering::Relaxed));
1326                                Ok(())
1327                            })
1328                        })),
1329                        ..MockInterface::default()
1330                    }),
1331                );
1332                block_server.handle_requests(stream).await.unwrap();
1333            },
1334            async move {
1335                let (session_proxy, server) = fidl::endpoints::create_proxy();
1336
1337                proxy.open_session(server).unwrap();
1338
1339                let vmo_id = session_proxy
1340                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1341                    .await
1342                    .unwrap()
1343                    .unwrap();
1344                assert_ne!(vmo_id.id, 0);
1345
1346                let mut fifo =
1347                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1348                let (mut reader, mut writer) = fifo.async_io();
1349
1350                writer
1351                    .write_entries(&BlockFifoRequest {
1352                        command: BlockFifoCommand {
1353                            opcode: BlockOpcode::Write.into_primitive(),
1354                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1355                            ..Default::default()
1356                        },
1357                        vmoid: vmo_id.id,
1358                        dev_offset: 0,
1359                        length: 5,
1360                        vmo_offset: 6,
1361                        ..Default::default()
1362                    })
1363                    .await
1364                    .unwrap();
1365
1366                for i in 0..10 {
1367                    writer
1368                        .write_entries(&BlockFifoRequest {
1369                            command: BlockFifoCommand {
1370                                opcode: BlockOpcode::Write.into_primitive(),
1371                                ..Default::default()
1372                            },
1373                            vmoid: vmo_id.id,
1374                            dev_offset: i + 1,
1375                            length: 5,
1376                            vmo_offset: 6,
1377                            ..Default::default()
1378                        })
1379                        .await
1380                        .unwrap();
1381                }
1382                for _ in 0..11 {
1383                    let mut response = BlockFifoResponse::default();
1384                    reader.read_entries(&mut response).await.unwrap();
1385                    assert_eq!(response.status, zx::sys::ZX_OK);
1386                }
1387
1388                std::mem::drop(proxy);
1389            }
1390        );
1391    }
1392
1393    #[fuchsia::test]
1394    async fn test_info() {
1395        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1396
1397        futures::join!(
1398            async {
1399                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1400                block_server.handle_requests(stream).await.unwrap();
1401            },
1402            async {
1403                let expected_info = test_device_info();
1404                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1405                    info
1406                } else {
1407                    unreachable!()
1408                };
1409
1410                let block_info = proxy.get_info().await.unwrap().unwrap();
1411                assert_eq!(
1412                    block_info.block_count,
1413                    partition_info.block_range.as_ref().unwrap().end
1414                        - partition_info.block_range.as_ref().unwrap().start
1415                );
1416                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1417
1418                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1419
1420                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1421                assert_eq!(status, zx::sys::ZX_OK);
1422                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1423
1424                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1425                assert_eq!(status, zx::sys::ZX_OK);
1426                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1427
1428                let (status, name) = proxy.get_name().await.unwrap();
1429                assert_eq!(status, zx::sys::ZX_OK);
1430                assert_eq!(name.as_ref(), Some(&partition_info.name));
1431
1432                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1433                assert_eq!(metadata.name, name);
1434                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1435                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1436                assert_eq!(
1437                    metadata.start_block_offset,
1438                    Some(partition_info.block_range.as_ref().unwrap().start)
1439                );
1440                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1441                assert_eq!(metadata.flags, Some(partition_info.flags));
1442
1443                std::mem::drop(proxy);
1444            }
1445        );
1446    }
1447
1448    #[fuchsia::test]
1449    async fn test_attach_vmo() {
1450        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1451
1452        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1453        let koid = vmo.get_koid().unwrap();
1454
1455        futures::join!(
1456            async {
1457                let block_server = BlockServer::new(
1458                    BLOCK_SIZE,
1459                    Arc::new(MockInterface {
1460                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1461                            assert_eq!(vmo.get_koid().unwrap(), koid);
1462                            Box::pin(async { Ok(()) })
1463                        })),
1464                        ..MockInterface::default()
1465                    }),
1466                );
1467                block_server.handle_requests(stream).await.unwrap();
1468            },
1469            async move {
1470                let (session_proxy, server) = fidl::endpoints::create_proxy();
1471
1472                proxy.open_session(server).unwrap();
1473
1474                let vmo_id = session_proxy
1475                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1476                    .await
1477                    .unwrap()
1478                    .unwrap();
1479                assert_ne!(vmo_id.id, 0);
1480
1481                let mut fifo =
1482                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1483                let (mut reader, mut writer) = fifo.async_io();
1484
1485                // Keep attaching VMOs until we eventually hit the maximum.
1486                let mut count = 1;
1487                loop {
1488                    match session_proxy
1489                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1490                        .await
1491                        .unwrap()
1492                    {
1493                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1494                        Err(e) => {
1495                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1496                            break;
1497                        }
1498                    }
1499
1500                    // Only test every 10 to keep test time down.
1501                    if count % 10 == 0 {
1502                        writer
1503                            .write_entries(&BlockFifoRequest {
1504                                command: BlockFifoCommand {
1505                                    opcode: BlockOpcode::Read.into_primitive(),
1506                                    ..Default::default()
1507                                },
1508                                vmoid: vmo_id.id,
1509                                length: 1,
1510                                ..Default::default()
1511                            })
1512                            .await
1513                            .unwrap();
1514
1515                        let mut response = BlockFifoResponse::default();
1516                        reader.read_entries(&mut response).await.unwrap();
1517                        assert_eq!(response.status, zx::sys::ZX_OK);
1518                    }
1519
1520                    count += 1;
1521                }
1522
1523                assert_eq!(count, u16::MAX as u64);
1524
1525                // Detach the original VMO, and make sure we can then attach another one.
1526                writer
1527                    .write_entries(&BlockFifoRequest {
1528                        command: BlockFifoCommand {
1529                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1530                            ..Default::default()
1531                        },
1532                        vmoid: vmo_id.id,
1533                        ..Default::default()
1534                    })
1535                    .await
1536                    .unwrap();
1537
1538                let mut response = BlockFifoResponse::default();
1539                reader.read_entries(&mut response).await.unwrap();
1540                assert_eq!(response.status, zx::sys::ZX_OK);
1541
1542                let new_vmo_id = session_proxy
1543                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1544                    .await
1545                    .unwrap()
1546                    .unwrap();
1547                // It should reuse the same ID.
1548                assert_eq!(new_vmo_id.id, vmo_id.id);
1549
1550                std::mem::drop(proxy);
1551            }
1552        );
1553    }
1554
1555    #[fuchsia::test]
1556    async fn test_close() {
1557        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1558
1559        let mut server = std::pin::pin!(
1560            async {
1561                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1562                block_server.handle_requests(stream).await.unwrap();
1563            }
1564            .fuse()
1565        );
1566
1567        let mut client = std::pin::pin!(
1568            async {
1569                let (session_proxy, server) = fidl::endpoints::create_proxy();
1570
1571                proxy.open_session(server).unwrap();
1572
1573                // Dropping the proxy should not cause the session to terminate because the session is
1574                // still live.
1575                std::mem::drop(proxy);
1576
1577                session_proxy.close().await.unwrap().unwrap();
1578
1579                // Keep the session alive.  Calling `close` should cause the server to terminate.
1580                let _: () = std::future::pending().await;
1581            }
1582            .fuse()
1583        );
1584
1585        futures::select!(
1586            _ = server => {}
1587            _ = client => unreachable!(),
1588        );
1589    }
1590
1591    #[derive(Default)]
1592    struct IoMockInterface {
1593        do_checks: bool,
1594        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1595        return_errors: bool,
1596    }
1597
1598    #[derive(Debug)]
1599    enum ExpectedOp {
1600        Read(u64, u32, u64),
1601        Write(u64, u32, u64),
1602        Trim(u64, u32),
1603        Flush,
1604    }
1605
1606    impl super::async_interface::Interface for IoMockInterface {
1607        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1608            Ok(())
1609        }
1610
1611        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1612            Ok(Cow::Owned(test_device_info()))
1613        }
1614
1615        async fn read(
1616            &self,
1617            device_block_offset: u64,
1618            block_count: u32,
1619            _vmo: &Arc<zx::Vmo>,
1620            vmo_offset: u64,
1621            _opts: ReadOptions,
1622            _trace_flow_id: TraceFlowId,
1623        ) -> Result<(), zx::Status> {
1624            if self.return_errors {
1625                Err(zx::Status::INTERNAL)
1626            } else {
1627                if self.do_checks {
1628                    assert_matches!(
1629                        self.expected_op.lock().take(),
1630                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1631                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1632                        "Read {device_block_offset} {block_count} {vmo_offset}"
1633                    );
1634                }
1635                Ok(())
1636            }
1637        }
1638
1639        async fn write(
1640            &self,
1641            device_block_offset: u64,
1642            block_count: u32,
1643            _vmo: &Arc<zx::Vmo>,
1644            vmo_offset: u64,
1645            _write_opts: WriteOptions,
1646            _trace_flow_id: TraceFlowId,
1647        ) -> Result<(), zx::Status> {
1648            if self.return_errors {
1649                Err(zx::Status::NOT_SUPPORTED)
1650            } else {
1651                if self.do_checks {
1652                    assert_matches!(
1653                        self.expected_op.lock().take(),
1654                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1655                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1656                        "Write {device_block_offset} {block_count} {vmo_offset}"
1657                    );
1658                }
1659                Ok(())
1660            }
1661        }
1662
1663        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1664            if self.return_errors {
1665                Err(zx::Status::NO_RESOURCES)
1666            } else {
1667                if self.do_checks {
1668                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1669                }
1670                Ok(())
1671            }
1672        }
1673
1674        fn barrier(&self) -> Result<(), zx::Status> {
1675            unreachable!()
1676        }
1677
1678        async fn trim(
1679            &self,
1680            device_block_offset: u64,
1681            block_count: u32,
1682            _trace_flow_id: TraceFlowId,
1683        ) -> Result<(), zx::Status> {
1684            if self.return_errors {
1685                Err(zx::Status::NO_MEMORY)
1686            } else {
1687                if self.do_checks {
1688                    assert_matches!(
1689                        self.expected_op.lock().take(),
1690                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1691                            block_count == b,
1692                        "Trim {device_block_offset} {block_count}"
1693                    );
1694                }
1695                Ok(())
1696            }
1697        }
1698    }
1699
1700    #[fuchsia::test]
1701    async fn test_io() {
1702        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1703
1704        let expected_op = Arc::new(Mutex::new(None));
1705        let expected_op_clone = expected_op.clone();
1706
1707        let server = async {
1708            let block_server = BlockServer::new(
1709                BLOCK_SIZE,
1710                Arc::new(IoMockInterface {
1711                    return_errors: false,
1712                    do_checks: true,
1713                    expected_op: expected_op_clone,
1714                }),
1715            );
1716            block_server.handle_requests(stream).await.unwrap();
1717        };
1718
1719        let client = async move {
1720            let (session_proxy, server) = fidl::endpoints::create_proxy();
1721
1722            proxy.open_session(server).unwrap();
1723
1724            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1725            let vmo_id = session_proxy
1726                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1727                .await
1728                .unwrap()
1729                .unwrap();
1730
1731            let mut fifo =
1732                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1733            let (mut reader, mut writer) = fifo.async_io();
1734
1735            // READ
1736            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1737            writer
1738                .write_entries(&BlockFifoRequest {
1739                    command: BlockFifoCommand {
1740                        opcode: BlockOpcode::Read.into_primitive(),
1741                        ..Default::default()
1742                    },
1743                    vmoid: vmo_id.id,
1744                    dev_offset: 1,
1745                    length: 2,
1746                    vmo_offset: 3,
1747                    ..Default::default()
1748                })
1749                .await
1750                .unwrap();
1751
1752            let mut response = BlockFifoResponse::default();
1753            reader.read_entries(&mut response).await.unwrap();
1754            assert_eq!(response.status, zx::sys::ZX_OK);
1755
1756            // WRITE
1757            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1758            writer
1759                .write_entries(&BlockFifoRequest {
1760                    command: BlockFifoCommand {
1761                        opcode: BlockOpcode::Write.into_primitive(),
1762                        ..Default::default()
1763                    },
1764                    vmoid: vmo_id.id,
1765                    dev_offset: 4,
1766                    length: 5,
1767                    vmo_offset: 6,
1768                    ..Default::default()
1769                })
1770                .await
1771                .unwrap();
1772
1773            let mut response = BlockFifoResponse::default();
1774            reader.read_entries(&mut response).await.unwrap();
1775            assert_eq!(response.status, zx::sys::ZX_OK);
1776
1777            // FLUSH
1778            *expected_op.lock() = Some(ExpectedOp::Flush);
1779            writer
1780                .write_entries(&BlockFifoRequest {
1781                    command: BlockFifoCommand {
1782                        opcode: BlockOpcode::Flush.into_primitive(),
1783                        ..Default::default()
1784                    },
1785                    ..Default::default()
1786                })
1787                .await
1788                .unwrap();
1789
1790            reader.read_entries(&mut response).await.unwrap();
1791            assert_eq!(response.status, zx::sys::ZX_OK);
1792
1793            // TRIM
1794            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1795            writer
1796                .write_entries(&BlockFifoRequest {
1797                    command: BlockFifoCommand {
1798                        opcode: BlockOpcode::Trim.into_primitive(),
1799                        ..Default::default()
1800                    },
1801                    dev_offset: 7,
1802                    length: 8,
1803                    ..Default::default()
1804                })
1805                .await
1806                .unwrap();
1807
1808            reader.read_entries(&mut response).await.unwrap();
1809            assert_eq!(response.status, zx::sys::ZX_OK);
1810
1811            std::mem::drop(proxy);
1812        };
1813
1814        futures::join!(server, client);
1815    }
1816
1817    #[fuchsia::test]
1818    async fn test_io_errors() {
1819        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1820
1821        futures::join!(
1822            async {
1823                let block_server = BlockServer::new(
1824                    BLOCK_SIZE,
1825                    Arc::new(IoMockInterface {
1826                        return_errors: true,
1827                        do_checks: false,
1828                        expected_op: Arc::new(Mutex::new(None)),
1829                    }),
1830                );
1831                block_server.handle_requests(stream).await.unwrap();
1832            },
1833            async move {
1834                let (session_proxy, server) = fidl::endpoints::create_proxy();
1835
1836                proxy.open_session(server).unwrap();
1837
1838                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1839                let vmo_id = session_proxy
1840                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1841                    .await
1842                    .unwrap()
1843                    .unwrap();
1844
1845                let mut fifo =
1846                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1847                let (mut reader, mut writer) = fifo.async_io();
1848
1849                // READ
1850                writer
1851                    .write_entries(&BlockFifoRequest {
1852                        command: BlockFifoCommand {
1853                            opcode: BlockOpcode::Read.into_primitive(),
1854                            ..Default::default()
1855                        },
1856                        vmoid: vmo_id.id,
1857                        length: 1,
1858                        reqid: 1,
1859                        ..Default::default()
1860                    })
1861                    .await
1862                    .unwrap();
1863
1864                let mut response = BlockFifoResponse::default();
1865                reader.read_entries(&mut response).await.unwrap();
1866                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1867
1868                // WRITE
1869                writer
1870                    .write_entries(&BlockFifoRequest {
1871                        command: BlockFifoCommand {
1872                            opcode: BlockOpcode::Write.into_primitive(),
1873                            ..Default::default()
1874                        },
1875                        vmoid: vmo_id.id,
1876                        length: 1,
1877                        reqid: 2,
1878                        ..Default::default()
1879                    })
1880                    .await
1881                    .unwrap();
1882
1883                reader.read_entries(&mut response).await.unwrap();
1884                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1885
1886                // FLUSH
1887                writer
1888                    .write_entries(&BlockFifoRequest {
1889                        command: BlockFifoCommand {
1890                            opcode: BlockOpcode::Flush.into_primitive(),
1891                            ..Default::default()
1892                        },
1893                        reqid: 3,
1894                        ..Default::default()
1895                    })
1896                    .await
1897                    .unwrap();
1898
1899                reader.read_entries(&mut response).await.unwrap();
1900                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1901
1902                // TRIM
1903                writer
1904                    .write_entries(&BlockFifoRequest {
1905                        command: BlockFifoCommand {
1906                            opcode: BlockOpcode::Trim.into_primitive(),
1907                            ..Default::default()
1908                        },
1909                        reqid: 4,
1910                        length: 1,
1911                        ..Default::default()
1912                    })
1913                    .await
1914                    .unwrap();
1915
1916                reader.read_entries(&mut response).await.unwrap();
1917                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1918
1919                std::mem::drop(proxy);
1920            }
1921        );
1922    }
1923
1924    #[fuchsia::test]
1925    async fn test_invalid_args() {
1926        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1927
1928        futures::join!(
1929            async {
1930                let block_server = BlockServer::new(
1931                    BLOCK_SIZE,
1932                    Arc::new(IoMockInterface {
1933                        return_errors: false,
1934                        do_checks: false,
1935                        expected_op: Arc::new(Mutex::new(None)),
1936                    }),
1937                );
1938                block_server.handle_requests(stream).await.unwrap();
1939            },
1940            async move {
1941                let (session_proxy, server) = fidl::endpoints::create_proxy();
1942
1943                proxy.open_session(server).unwrap();
1944
1945                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1946                let vmo_id = session_proxy
1947                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1948                    .await
1949                    .unwrap()
1950                    .unwrap();
1951
1952                let mut fifo =
1953                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1954
1955                async fn test(
1956                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1957                    request: BlockFifoRequest,
1958                ) -> Result<(), zx::Status> {
1959                    let (mut reader, mut writer) = fifo.async_io();
1960                    writer.write_entries(&request).await.unwrap();
1961                    let mut response = BlockFifoResponse::default();
1962                    reader.read_entries(&mut response).await.unwrap();
1963                    zx::Status::ok(response.status)
1964                }
1965
1966                // READ
1967
1968                let good_read_request = || BlockFifoRequest {
1969                    command: BlockFifoCommand {
1970                        opcode: BlockOpcode::Read.into_primitive(),
1971                        ..Default::default()
1972                    },
1973                    length: 1,
1974                    vmoid: vmo_id.id,
1975                    ..Default::default()
1976                };
1977
1978                assert_eq!(
1979                    test(
1980                        &mut fifo,
1981                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1982                    )
1983                    .await,
1984                    Err(zx::Status::IO)
1985                );
1986
1987                assert_eq!(
1988                    test(
1989                        &mut fifo,
1990                        BlockFifoRequest {
1991                            vmo_offset: 0xffff_ffff_ffff_ffff,
1992                            ..good_read_request()
1993                        }
1994                    )
1995                    .await,
1996                    Err(zx::Status::OUT_OF_RANGE)
1997                );
1998
1999                assert_eq!(
2000                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2001                    Err(zx::Status::INVALID_ARGS)
2002                );
2003
2004                // WRITE
2005
2006                let good_write_request = || BlockFifoRequest {
2007                    command: BlockFifoCommand {
2008                        opcode: BlockOpcode::Write.into_primitive(),
2009                        ..Default::default()
2010                    },
2011                    length: 1,
2012                    vmoid: vmo_id.id,
2013                    ..Default::default()
2014                };
2015
2016                assert_eq!(
2017                    test(
2018                        &mut fifo,
2019                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2020                    )
2021                    .await,
2022                    Err(zx::Status::IO)
2023                );
2024
2025                assert_eq!(
2026                    test(
2027                        &mut fifo,
2028                        BlockFifoRequest {
2029                            vmo_offset: 0xffff_ffff_ffff_ffff,
2030                            ..good_write_request()
2031                        }
2032                    )
2033                    .await,
2034                    Err(zx::Status::OUT_OF_RANGE)
2035                );
2036
2037                assert_eq!(
2038                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2039                    Err(zx::Status::INVALID_ARGS)
2040                );
2041
2042                // CLOSE VMO
2043
2044                assert_eq!(
2045                    test(
2046                        &mut fifo,
2047                        BlockFifoRequest {
2048                            command: BlockFifoCommand {
2049                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2050                                ..Default::default()
2051                            },
2052                            vmoid: vmo_id.id + 1,
2053                            ..Default::default()
2054                        }
2055                    )
2056                    .await,
2057                    Err(zx::Status::IO)
2058                );
2059
2060                std::mem::drop(proxy);
2061            }
2062        );
2063    }
2064
2065    #[fuchsia::test]
2066    async fn test_concurrent_requests() {
2067        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2068
2069        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2070        let waiting_readers_clone = waiting_readers.clone();
2071
2072        futures::join!(
2073            async move {
2074                let block_server = BlockServer::new(
2075                    BLOCK_SIZE,
2076                    Arc::new(MockInterface {
2077                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2078                            let (tx, rx) = oneshot::channel();
2079                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2080                            Box::pin(async move {
2081                                let _ = rx.await;
2082                                Ok(())
2083                            })
2084                        })),
2085                        ..MockInterface::default()
2086                    }),
2087                );
2088                block_server.handle_requests(stream).await.unwrap();
2089            },
2090            async move {
2091                let (session_proxy, server) = fidl::endpoints::create_proxy();
2092
2093                proxy.open_session(server).unwrap();
2094
2095                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2096                let vmo_id = session_proxy
2097                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2098                    .await
2099                    .unwrap()
2100                    .unwrap();
2101
2102                let mut fifo =
2103                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2104                let (mut reader, mut writer) = fifo.async_io();
2105
2106                writer
2107                    .write_entries(&BlockFifoRequest {
2108                        command: BlockFifoCommand {
2109                            opcode: BlockOpcode::Read.into_primitive(),
2110                            ..Default::default()
2111                        },
2112                        reqid: 1,
2113                        dev_offset: 1, // Intentionally use the same as `reqid`.
2114                        vmoid: vmo_id.id,
2115                        length: 1,
2116                        ..Default::default()
2117                    })
2118                    .await
2119                    .unwrap();
2120
2121                writer
2122                    .write_entries(&BlockFifoRequest {
2123                        command: BlockFifoCommand {
2124                            opcode: BlockOpcode::Read.into_primitive(),
2125                            ..Default::default()
2126                        },
2127                        reqid: 2,
2128                        dev_offset: 2,
2129                        vmoid: vmo_id.id,
2130                        length: 1,
2131                        ..Default::default()
2132                    })
2133                    .await
2134                    .unwrap();
2135
2136                // Wait till both those entries are pending.
2137                poll_fn(|cx: &mut Context<'_>| {
2138                    if waiting_readers.lock().len() == 2 {
2139                        Poll::Ready(())
2140                    } else {
2141                        // Yield to the executor.
2142                        cx.waker().wake_by_ref();
2143                        Poll::Pending
2144                    }
2145                })
2146                .await;
2147
2148                let mut response = BlockFifoResponse::default();
2149                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2150
2151                let (id, tx) = waiting_readers.lock().pop().unwrap();
2152                tx.send(()).unwrap();
2153
2154                reader.read_entries(&mut response).await.unwrap();
2155                assert_eq!(response.status, zx::sys::ZX_OK);
2156                assert_eq!(response.reqid, id);
2157
2158                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2159
2160                let (id, tx) = waiting_readers.lock().pop().unwrap();
2161                tx.send(()).unwrap();
2162
2163                reader.read_entries(&mut response).await.unwrap();
2164                assert_eq!(response.status, zx::sys::ZX_OK);
2165                assert_eq!(response.reqid, id);
2166            }
2167        );
2168    }
2169
2170    #[fuchsia::test]
2171    async fn test_groups() {
2172        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2173
2174        futures::join!(
2175            async move {
2176                let block_server = BlockServer::new(
2177                    BLOCK_SIZE,
2178                    Arc::new(MockInterface {
2179                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2180                        ..MockInterface::default()
2181                    }),
2182                );
2183                block_server.handle_requests(stream).await.unwrap();
2184            },
2185            async move {
2186                let (session_proxy, server) = fidl::endpoints::create_proxy();
2187
2188                proxy.open_session(server).unwrap();
2189
2190                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2191                let vmo_id = session_proxy
2192                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2193                    .await
2194                    .unwrap()
2195                    .unwrap();
2196
2197                let mut fifo =
2198                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2199                let (mut reader, mut writer) = fifo.async_io();
2200
2201                writer
2202                    .write_entries(&BlockFifoRequest {
2203                        command: BlockFifoCommand {
2204                            opcode: BlockOpcode::Read.into_primitive(),
2205                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2206                            ..Default::default()
2207                        },
2208                        group: 1,
2209                        vmoid: vmo_id.id,
2210                        length: 1,
2211                        ..Default::default()
2212                    })
2213                    .await
2214                    .unwrap();
2215
2216                writer
2217                    .write_entries(&BlockFifoRequest {
2218                        command: BlockFifoCommand {
2219                            opcode: BlockOpcode::Read.into_primitive(),
2220                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2221                            ..Default::default()
2222                        },
2223                        reqid: 2,
2224                        group: 1,
2225                        vmoid: vmo_id.id,
2226                        length: 1,
2227                        ..Default::default()
2228                    })
2229                    .await
2230                    .unwrap();
2231
2232                let mut response = BlockFifoResponse::default();
2233                reader.read_entries(&mut response).await.unwrap();
2234                assert_eq!(response.status, zx::sys::ZX_OK);
2235                assert_eq!(response.reqid, 2);
2236                assert_eq!(response.group, 1);
2237            }
2238        );
2239    }
2240
2241    #[fuchsia::test]
2242    async fn test_group_error() {
2243        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2244
2245        let counter = Arc::new(AtomicU64::new(0));
2246        let counter_clone = counter.clone();
2247
2248        futures::join!(
2249            async move {
2250                let block_server = BlockServer::new(
2251                    BLOCK_SIZE,
2252                    Arc::new(MockInterface {
2253                        read_hook: Some(Box::new(move |_, _, _, _| {
2254                            counter_clone.fetch_add(1, Ordering::Relaxed);
2255                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2256                        })),
2257                        ..MockInterface::default()
2258                    }),
2259                );
2260                block_server.handle_requests(stream).await.unwrap();
2261            },
2262            async move {
2263                let (session_proxy, server) = fidl::endpoints::create_proxy();
2264
2265                proxy.open_session(server).unwrap();
2266
2267                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2268                let vmo_id = session_proxy
2269                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2270                    .await
2271                    .unwrap()
2272                    .unwrap();
2273
2274                let mut fifo =
2275                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2276                let (mut reader, mut writer) = fifo.async_io();
2277
2278                writer
2279                    .write_entries(&BlockFifoRequest {
2280                        command: BlockFifoCommand {
2281                            opcode: BlockOpcode::Read.into_primitive(),
2282                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2283                            ..Default::default()
2284                        },
2285                        group: 1,
2286                        vmoid: vmo_id.id,
2287                        length: 1,
2288                        ..Default::default()
2289                    })
2290                    .await
2291                    .unwrap();
2292
2293                // Wait until processed.
2294                poll_fn(|cx: &mut Context<'_>| {
2295                    if counter.load(Ordering::Relaxed) == 1 {
2296                        Poll::Ready(())
2297                    } else {
2298                        // Yield to the executor.
2299                        cx.waker().wake_by_ref();
2300                        Poll::Pending
2301                    }
2302                })
2303                .await;
2304
2305                let mut response = BlockFifoResponse::default();
2306                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2307
2308                writer
2309                    .write_entries(&BlockFifoRequest {
2310                        command: BlockFifoCommand {
2311                            opcode: BlockOpcode::Read.into_primitive(),
2312                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2313                            ..Default::default()
2314                        },
2315                        group: 1,
2316                        vmoid: vmo_id.id,
2317                        length: 1,
2318                        ..Default::default()
2319                    })
2320                    .await
2321                    .unwrap();
2322
2323                writer
2324                    .write_entries(&BlockFifoRequest {
2325                        command: BlockFifoCommand {
2326                            opcode: BlockOpcode::Read.into_primitive(),
2327                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2328                            ..Default::default()
2329                        },
2330                        reqid: 2,
2331                        group: 1,
2332                        vmoid: vmo_id.id,
2333                        length: 1,
2334                        ..Default::default()
2335                    })
2336                    .await
2337                    .unwrap();
2338
2339                reader.read_entries(&mut response).await.unwrap();
2340                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2341                assert_eq!(response.reqid, 2);
2342                assert_eq!(response.group, 1);
2343
2344                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2345
2346                // Only the first request should have been processed.
2347                assert_eq!(counter.load(Ordering::Relaxed), 1);
2348            }
2349        );
2350    }
2351
2352    #[fuchsia::test]
2353    async fn test_group_with_two_lasts() {
2354        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2355
2356        let (tx, rx) = oneshot::channel();
2357
2358        futures::join!(
2359            async move {
2360                let rx = Mutex::new(Some(rx));
2361                let block_server = BlockServer::new(
2362                    BLOCK_SIZE,
2363                    Arc::new(MockInterface {
2364                        read_hook: Some(Box::new(move |_, _, _, _| {
2365                            let rx = rx.lock().take().unwrap();
2366                            Box::pin(async {
2367                                let _ = rx.await;
2368                                Ok(())
2369                            })
2370                        })),
2371                        ..MockInterface::default()
2372                    }),
2373                );
2374                block_server.handle_requests(stream).await.unwrap();
2375            },
2376            async move {
2377                let (session_proxy, server) = fidl::endpoints::create_proxy();
2378
2379                proxy.open_session(server).unwrap();
2380
2381                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2382                let vmo_id = session_proxy
2383                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2384                    .await
2385                    .unwrap()
2386                    .unwrap();
2387
2388                let mut fifo =
2389                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2390                let (mut reader, mut writer) = fifo.async_io();
2391
2392                writer
2393                    .write_entries(&BlockFifoRequest {
2394                        command: BlockFifoCommand {
2395                            opcode: BlockOpcode::Read.into_primitive(),
2396                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2397                            ..Default::default()
2398                        },
2399                        reqid: 1,
2400                        group: 1,
2401                        vmoid: vmo_id.id,
2402                        length: 1,
2403                        ..Default::default()
2404                    })
2405                    .await
2406                    .unwrap();
2407
2408                writer
2409                    .write_entries(&BlockFifoRequest {
2410                        command: BlockFifoCommand {
2411                            opcode: BlockOpcode::Read.into_primitive(),
2412                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2413                            ..Default::default()
2414                        },
2415                        reqid: 2,
2416                        group: 1,
2417                        vmoid: vmo_id.id,
2418                        length: 1,
2419                        ..Default::default()
2420                    })
2421                    .await
2422                    .unwrap();
2423
2424                // Send an independent request to flush through the fifo.
2425                writer
2426                    .write_entries(&BlockFifoRequest {
2427                        command: BlockFifoCommand {
2428                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2429                            ..Default::default()
2430                        },
2431                        reqid: 3,
2432                        vmoid: vmo_id.id,
2433                        ..Default::default()
2434                    })
2435                    .await
2436                    .unwrap();
2437
2438                // It should succeed.
2439                let mut response = BlockFifoResponse::default();
2440                reader.read_entries(&mut response).await.unwrap();
2441                assert_eq!(response.status, zx::sys::ZX_OK);
2442                assert_eq!(response.reqid, 3);
2443
2444                // Now release the original request.
2445                tx.send(()).unwrap();
2446
2447                // The response should be for the first message tagged as last, and it should be
2448                // an error because we sent two messages with the LAST marker.
2449                let mut response = BlockFifoResponse::default();
2450                reader.read_entries(&mut response).await.unwrap();
2451                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2452                assert_eq!(response.reqid, 1);
2453                assert_eq!(response.group, 1);
2454            }
2455        );
2456    }
2457
2458    #[fuchsia::test(allow_stalls = false)]
2459    async fn test_requests_dont_block_sessions() {
2460        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2461
2462        let (tx, rx) = oneshot::channel();
2463
2464        fasync::Task::local(async move {
2465            let rx = Mutex::new(Some(rx));
2466            let block_server = BlockServer::new(
2467                BLOCK_SIZE,
2468                Arc::new(MockInterface {
2469                    read_hook: Some(Box::new(move |_, _, _, _| {
2470                        let rx = rx.lock().take().unwrap();
2471                        Box::pin(async {
2472                            let _ = rx.await;
2473                            Ok(())
2474                        })
2475                    })),
2476                    ..MockInterface::default()
2477                }),
2478            );
2479            block_server.handle_requests(stream).await.unwrap();
2480        })
2481        .detach();
2482
2483        let mut fut = pin!(async {
2484            let (session_proxy, server) = fidl::endpoints::create_proxy();
2485
2486            proxy.open_session(server).unwrap();
2487
2488            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2489            let vmo_id = session_proxy
2490                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2491                .await
2492                .unwrap()
2493                .unwrap();
2494
2495            let mut fifo =
2496                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2497            let (mut reader, mut writer) = fifo.async_io();
2498
2499            writer
2500                .write_entries(&BlockFifoRequest {
2501                    command: BlockFifoCommand {
2502                        opcode: BlockOpcode::Read.into_primitive(),
2503                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2504                        ..Default::default()
2505                    },
2506                    reqid: 1,
2507                    group: 1,
2508                    vmoid: vmo_id.id,
2509                    length: 1,
2510                    ..Default::default()
2511                })
2512                .await
2513                .unwrap();
2514
2515            let mut response = BlockFifoResponse::default();
2516            reader.read_entries(&mut response).await.unwrap();
2517            assert_eq!(response.status, zx::sys::ZX_OK);
2518        });
2519
2520        // The response won't come back until we send on `tx`.
2521        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2522
2523        let mut fut2 = pin!(proxy.get_volume_info());
2524
2525        // get_volume_info is set up to stall forever.
2526        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2527
2528        // If we now free up the first future, it should resolve; the stalled call to
2529        // get_volume_info should not block the fifo response.
2530        let _ = tx.send(());
2531
2532        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2533    }
2534
2535    #[fuchsia::test]
2536    async fn test_request_flow_control() {
2537        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2538
2539        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2540        // server will block until that happens.
2541        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2542        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2543        let event_clone = event.clone();
2544        futures::join!(
2545            async move {
2546                let block_server = BlockServer::new(
2547                    BLOCK_SIZE,
2548                    Arc::new(MockInterface {
2549                        read_hook: Some(Box::new(move |_, _, _, _| {
2550                            let event_clone = event_clone.clone();
2551                            Box::pin(async move {
2552                                if !event_clone.1.load(Ordering::SeqCst) {
2553                                    event_clone.0.listen().await;
2554                                }
2555                                Ok(())
2556                            })
2557                        })),
2558                        ..MockInterface::default()
2559                    }),
2560                );
2561                block_server.handle_requests(stream).await.unwrap();
2562            },
2563            async move {
2564                let (session_proxy, server) = fidl::endpoints::create_proxy();
2565
2566                proxy.open_session(server).unwrap();
2567
2568                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2569                let vmo_id = session_proxy
2570                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2571                    .await
2572                    .unwrap()
2573                    .unwrap();
2574
2575                let mut fifo =
2576                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2577                let (mut reader, mut writer) = fifo.async_io();
2578
2579                for i in 0..MAX_REQUESTS {
2580                    writer
2581                        .write_entries(&BlockFifoRequest {
2582                            command: BlockFifoCommand {
2583                                opcode: BlockOpcode::Read.into_primitive(),
2584                                ..Default::default()
2585                            },
2586                            reqid: (i + 1) as u32,
2587                            dev_offset: i,
2588                            vmoid: vmo_id.id,
2589                            length: 1,
2590                            ..Default::default()
2591                        })
2592                        .await
2593                        .unwrap();
2594                }
2595                assert!(
2596                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2597                        command: BlockFifoCommand {
2598                            opcode: BlockOpcode::Read.into_primitive(),
2599                            ..Default::default()
2600                        },
2601                        reqid: u32::MAX,
2602                        dev_offset: MAX_REQUESTS,
2603                        vmoid: vmo_id.id,
2604                        length: 1,
2605                        ..Default::default()
2606                    })))
2607                    .is_pending()
2608                );
2609                // OK, let the server start to process.
2610                event.1.store(true, Ordering::SeqCst);
2611                event.0.notify(usize::MAX);
2612                // For each entry we read, make sure we can write a new one in.
2613                let mut finished_reqids = vec![];
2614                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2615                    let mut response = BlockFifoResponse::default();
2616                    reader.read_entries(&mut response).await.unwrap();
2617                    assert_eq!(response.status, zx::sys::ZX_OK);
2618                    finished_reqids.push(response.reqid);
2619                    writer
2620                        .write_entries(&BlockFifoRequest {
2621                            command: BlockFifoCommand {
2622                                opcode: BlockOpcode::Read.into_primitive(),
2623                                ..Default::default()
2624                            },
2625                            reqid: (i + 1) as u32,
2626                            dev_offset: i,
2627                            vmoid: vmo_id.id,
2628                            length: 1,
2629                            ..Default::default()
2630                        })
2631                        .await
2632                        .unwrap();
2633                }
2634                let mut response = BlockFifoResponse::default();
2635                for _ in 0..MAX_REQUESTS {
2636                    reader.read_entries(&mut response).await.unwrap();
2637                    assert_eq!(response.status, zx::sys::ZX_OK);
2638                    finished_reqids.push(response.reqid);
2639                }
2640                // Verify that we got a response for each request.  Note that we can't assume FIFO
2641                // ordering.
2642                finished_reqids.sort();
2643                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2644                let mut i = 1;
2645                for reqid in finished_reqids {
2646                    assert_eq!(reqid, i);
2647                    i += 1;
2648                }
2649            }
2650        );
2651    }
2652
2653    #[fuchsia::test]
2654    async fn test_passthrough_io_with_fixed_map() {
2655        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2656
2657        let expected_op = Arc::new(Mutex::new(None));
2658        let expected_op_clone = expected_op.clone();
2659        futures::join!(
2660            async {
2661                let block_server = BlockServer::new(
2662                    BLOCK_SIZE,
2663                    Arc::new(IoMockInterface {
2664                        return_errors: false,
2665                        do_checks: true,
2666                        expected_op: expected_op_clone,
2667                    }),
2668                );
2669                block_server.handle_requests(stream).await.unwrap();
2670            },
2671            async move {
2672                let (session_proxy, server) = fidl::endpoints::create_proxy();
2673
2674                let mapping = fblock::BlockOffsetMapping {
2675                    source_block_offset: 0,
2676                    target_block_offset: 10,
2677                    length: 20,
2678                };
2679                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2680
2681                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2682                let vmo_id = session_proxy
2683                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2684                    .await
2685                    .unwrap()
2686                    .unwrap();
2687
2688                let mut fifo =
2689                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2690                let (mut reader, mut writer) = fifo.async_io();
2691
2692                // READ
2693                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2694                writer
2695                    .write_entries(&BlockFifoRequest {
2696                        command: BlockFifoCommand {
2697                            opcode: BlockOpcode::Read.into_primitive(),
2698                            ..Default::default()
2699                        },
2700                        vmoid: vmo_id.id,
2701                        dev_offset: 1,
2702                        length: 2,
2703                        vmo_offset: 3,
2704                        ..Default::default()
2705                    })
2706                    .await
2707                    .unwrap();
2708
2709                let mut response = BlockFifoResponse::default();
2710                reader.read_entries(&mut response).await.unwrap();
2711                assert_eq!(response.status, zx::sys::ZX_OK);
2712
2713                // WRITE
2714                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2715                writer
2716                    .write_entries(&BlockFifoRequest {
2717                        command: BlockFifoCommand {
2718                            opcode: BlockOpcode::Write.into_primitive(),
2719                            ..Default::default()
2720                        },
2721                        vmoid: vmo_id.id,
2722                        dev_offset: 4,
2723                        length: 5,
2724                        vmo_offset: 6,
2725                        ..Default::default()
2726                    })
2727                    .await
2728                    .unwrap();
2729
2730                reader.read_entries(&mut response).await.unwrap();
2731                assert_eq!(response.status, zx::sys::ZX_OK);
2732
2733                // FLUSH
2734                *expected_op.lock() = Some(ExpectedOp::Flush);
2735                writer
2736                    .write_entries(&BlockFifoRequest {
2737                        command: BlockFifoCommand {
2738                            opcode: BlockOpcode::Flush.into_primitive(),
2739                            ..Default::default()
2740                        },
2741                        ..Default::default()
2742                    })
2743                    .await
2744                    .unwrap();
2745
2746                reader.read_entries(&mut response).await.unwrap();
2747                assert_eq!(response.status, zx::sys::ZX_OK);
2748
2749                // TRIM
2750                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2751                writer
2752                    .write_entries(&BlockFifoRequest {
2753                        command: BlockFifoCommand {
2754                            opcode: BlockOpcode::Trim.into_primitive(),
2755                            ..Default::default()
2756                        },
2757                        dev_offset: 7,
2758                        length: 3,
2759                        ..Default::default()
2760                    })
2761                    .await
2762                    .unwrap();
2763
2764                reader.read_entries(&mut response).await.unwrap();
2765                assert_eq!(response.status, zx::sys::ZX_OK);
2766
2767                // READ past window
2768                *expected_op.lock() = None;
2769                writer
2770                    .write_entries(&BlockFifoRequest {
2771                        command: BlockFifoCommand {
2772                            opcode: BlockOpcode::Read.into_primitive(),
2773                            ..Default::default()
2774                        },
2775                        vmoid: vmo_id.id,
2776                        dev_offset: 19,
2777                        length: 2,
2778                        vmo_offset: 3,
2779                        ..Default::default()
2780                    })
2781                    .await
2782                    .unwrap();
2783
2784                reader.read_entries(&mut response).await.unwrap();
2785                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2786
2787                std::mem::drop(proxy);
2788            }
2789        );
2790    }
2791
2792    #[fuchsia::test]
2793    fn operation_map() {
2794        fn expect_map_result(
2795            mut operation: Operation,
2796            mapping: Option<BlockOffsetMapping>,
2797            max_blocks: Option<NonZero<u32>>,
2798            expected_operations: Vec<Operation>,
2799        ) {
2800            let mut ops = vec![];
2801            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2802                ops.push(operation);
2803                operation = remainder;
2804            }
2805            ops.push(operation);
2806            assert_eq!(ops, expected_operations);
2807        }
2808
2809        // No limits
2810        expect_map_result(
2811            Operation::Read {
2812                device_block_offset: 10,
2813                block_count: 200,
2814                _unused: 0,
2815                vmo_offset: 0,
2816                options: ReadOptions::default(),
2817            },
2818            None,
2819            None,
2820            vec![Operation::Read {
2821                device_block_offset: 10,
2822                block_count: 200,
2823                _unused: 0,
2824                vmo_offset: 0,
2825                options: ReadOptions::default(),
2826            }],
2827        );
2828
2829        // Max block count
2830        expect_map_result(
2831            Operation::Read {
2832                device_block_offset: 10,
2833                block_count: 200,
2834                _unused: 0,
2835                vmo_offset: 0,
2836                options: ReadOptions::default(),
2837            },
2838            None,
2839            NonZero::new(120),
2840            vec![
2841                Operation::Read {
2842                    device_block_offset: 10,
2843                    block_count: 120,
2844                    _unused: 0,
2845                    vmo_offset: 0,
2846                    options: ReadOptions::default(),
2847                },
2848                Operation::Read {
2849                    device_block_offset: 130,
2850                    block_count: 80,
2851                    _unused: 0,
2852                    vmo_offset: 120 * 512,
2853                    options: ReadOptions::default(),
2854                },
2855            ],
2856        );
2857        expect_map_result(
2858            Operation::Write {
2859                device_block_offset: 10,
2860                block_count: 200,
2861                _unused: 0,
2862                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2863                vmo_offset: 0,
2864            },
2865            None,
2866            NonZero::new(120),
2867            vec![
2868                Operation::Write {
2869                    device_block_offset: 10,
2870                    block_count: 120,
2871                    _unused: 0,
2872                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2873                    vmo_offset: 0,
2874                },
2875                Operation::Write {
2876                    device_block_offset: 130,
2877                    block_count: 80,
2878                    _unused: 0,
2879                    options: WriteOptions::default(),
2880                    vmo_offset: 120 * 512,
2881                },
2882            ],
2883        );
2884        expect_map_result(
2885            Operation::Trim { device_block_offset: 10, block_count: 200 },
2886            None,
2887            NonZero::new(120),
2888            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2889        );
2890
2891        // Remapping + Max block count
2892        expect_map_result(
2893            Operation::Read {
2894                device_block_offset: 10,
2895                block_count: 200,
2896                _unused: 0,
2897                vmo_offset: 0,
2898                options: ReadOptions::default(),
2899            },
2900            Some(BlockOffsetMapping {
2901                source_block_offset: 10,
2902                target_block_offset: 100,
2903                length: 200,
2904            }),
2905            NonZero::new(120),
2906            vec![
2907                Operation::Read {
2908                    device_block_offset: 100,
2909                    block_count: 120,
2910                    _unused: 0,
2911                    vmo_offset: 0,
2912                    options: ReadOptions::default(),
2913                },
2914                Operation::Read {
2915                    device_block_offset: 220,
2916                    block_count: 80,
2917                    _unused: 0,
2918                    vmo_offset: 120 * 512,
2919                    options: ReadOptions::default(),
2920                },
2921            ],
2922        );
2923        expect_map_result(
2924            Operation::Write {
2925                device_block_offset: 10,
2926                block_count: 200,
2927                _unused: 0,
2928                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2929                vmo_offset: 0,
2930            },
2931            Some(BlockOffsetMapping {
2932                source_block_offset: 10,
2933                target_block_offset: 100,
2934                length: 200,
2935            }),
2936            NonZero::new(120),
2937            vec![
2938                Operation::Write {
2939                    device_block_offset: 100,
2940                    block_count: 120,
2941                    _unused: 0,
2942                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2943                    vmo_offset: 0,
2944                },
2945                Operation::Write {
2946                    device_block_offset: 220,
2947                    block_count: 80,
2948                    _unused: 0,
2949                    options: WriteOptions::default(),
2950                    vmo_offset: 120 * 512,
2951                },
2952            ],
2953        );
2954        expect_map_result(
2955            Operation::Trim { device_block_offset: 10, block_count: 200 },
2956            Some(BlockOffsetMapping {
2957                source_block_offset: 10,
2958                target_block_offset: 100,
2959                length: 200,
2960            }),
2961            NonZero::new(120),
2962            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2963        );
2964    }
2965}