_block_server_c_rustc_static/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _, TryStreamExt as _};
9use std::borrow::Cow;
10use std::collections::btree_map::Entry;
11use std::collections::BTreeMap;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::AtomicU64;
15use std::sync::Arc;
16use zx::HandleBased;
17use {
18    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
19    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
20};
21
22pub mod async_interface;
23pub mod c_interface;
24
25#[derive(Clone)]
26pub enum DeviceInfo {
27    Block(BlockInfo),
28    Partition(PartitionInfo),
29}
30
31/// Information associated with non-partition block devices.
32#[derive(Clone)]
33pub struct BlockInfo {
34    pub device_flags: fblock::Flag,
35    pub block_count: u64,
36}
37
38/// Information associated with a block device that is also a partition.
39#[derive(Clone)]
40pub struct PartitionInfo {
41    /// The device flags reported by the underlying device.
42    pub device_flags: fblock::Flag,
43    /// If `block_range` is None, the partition is a volume and may not be contiguous.
44    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
45    /// slices and use that (along with the slice and block sizes) to determine the block count.
46    pub block_range: Option<Range<u64>>,
47    pub type_guid: [u8; 16],
48    pub instance_guid: [u8; 16],
49    pub name: String,
50    pub flags: u64,
51}
52
53// Multiple Block I/O request may be sent as a group.
54// Notes:
55// - the group is identified by the group id in the request
56// - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
57//   flag is set.
58// - when processing a request of a group fails, subsequent requests of that
59//   group will not be processed.
60//
61// Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
62//
63// FifoMessageGroup keeps track of the relevant BlockFifoResponse field for
64// a group requests. Only `status` and `count` needs to be updated.
65struct FifoMessageGroup {
66    status: zx::Status,
67    count: u32,
68    req_id: Option<u32>,
69}
70
71impl FifoMessageGroup {
72    fn new() -> Self {
73        FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
74    }
75}
76
77#[derive(Default)]
78struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
79
80// Keeps track of all the group requests that are currently being processed
81impl FifoMessageGroups {
82    /// Completes a request and returns a response to be sent if it's the last outstanding request
83    /// for this group.
84    fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
85        let mut map = self.0.lock();
86        let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
87        let group = o.get_mut();
88        if group.count == 1 {
89            if let Some(reqid) = group.req_id {
90                let status =
91                    if group.status != zx::Status::OK { group.status } else { status }.into_raw();
92
93                o.remove();
94
95                return Some(BlockFifoResponse {
96                    status,
97                    reqid,
98                    group: group_id,
99                    ..Default::default()
100                });
101            }
102        }
103
104        group.count = group.count.checked_sub(1).unwrap();
105        if status != zx::Status::OK && group.status == zx::Status::OK {
106            group.status = status
107        }
108        None
109    }
110}
111
112/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
113/// cbindgen:no-export
114pub struct BlockServer<SM> {
115    block_size: u32,
116    session_manager: Arc<SM>,
117}
118
119/// A single entry in `[OffsetMap]`.
120pub struct BlockOffsetMapping {
121    source_block_offset: u64,
122    target_block_offset: u64,
123    length: u64,
124}
125
126impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
127    type Error = zx::Status;
128
129    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
130        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
131        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
132        Ok(Self {
133            source_block_offset: wire.source_block_offset,
134            target_block_offset: wire.target_block_offset,
135            length: wire.length,
136        })
137    }
138}
139
140/// Remaps the offset of block requests based on an internal map.
141/// TODO(https://fxbug.dev/402515764): For now, this just supports a single entry in the map, which
142/// is all that is required for GPT partitions.  If we want to support this for FVM, we will need
143/// to support multiple entries, which requires changing the block server to support request
144/// splitting.
145pub struct OffsetMap {
146    mapping: BlockOffsetMapping,
147}
148
149impl OffsetMap {
150    pub fn new(mapping: BlockOffsetMapping) -> Self {
151        Self { mapping }
152    }
153
154    /// Adjusts the requested range, returning the new dev_offset.
155    /// Returns false if the request would exceed the range known to OffsetMap.
156    pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
157        if length == 0 {
158            return None;
159        }
160        let end = dev_offset.checked_add(length)?;
161        if self.mapping.source_block_offset > dev_offset
162            || end > self.mapping.source_block_offset + self.mapping.length
163        {
164            return None;
165        }
166        let delta = dev_offset - self.mapping.source_block_offset;
167        Some(self.mapping.target_block_offset + delta)
168    }
169}
170
171// Methods take Arc<Self> rather than &self because of
172// https://github.com/rust-lang/rust/issues/42940.
173pub trait SessionManager: 'static {
174    fn on_attach_vmo(
175        self: Arc<Self>,
176        vmo: &Arc<zx::Vmo>,
177    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
178
179    /// Creates a new session to handle `stream`.
180    /// The returned future should run until the session completes, for example when the client end
181    /// closes.
182    /// If `offset_map` is set, it will be used to remap the dev_offset of FIFO requests.
183    fn open_session(
184        self: Arc<Self>,
185        stream: fblock::SessionRequestStream,
186        offset_map: Option<OffsetMap>,
187        block_size: u32,
188    ) -> impl Future<Output = Result<(), Error>> + Send;
189
190    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
191    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
192
193    /// Called to handle the GetVolumeInfo FIDL call.
194    fn get_volume_info(
195        &self,
196    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
197    {
198        async { Err(zx::Status::NOT_SUPPORTED) }
199    }
200
201    /// Called to handle the QuerySlices FIDL call.
202    fn query_slices(
203        &self,
204        _start_slices: &[u64],
205    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
206        async { Err(zx::Status::NOT_SUPPORTED) }
207    }
208
209    /// Called to handle the Shrink FIDL call.
210    fn extend(
211        &self,
212        _start_slice: u64,
213        _slice_count: u64,
214    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
215        async { Err(zx::Status::NOT_SUPPORTED) }
216    }
217
218    /// Called to handle the Shrink FIDL call.
219    fn shrink(
220        &self,
221        _start_slice: u64,
222        _slice_count: u64,
223    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
224        async { Err(zx::Status::NOT_SUPPORTED) }
225    }
226}
227
228pub trait IntoSessionManager {
229    type SM: SessionManager;
230
231    fn into_session_manager(self) -> Arc<Self::SM>;
232}
233
234impl<SM: SessionManager> BlockServer<SM> {
235    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
236        Self { block_size, session_manager: session_manager.into_session_manager() }
237    }
238
239    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
240    pub async fn handle_requests(
241        &self,
242        mut requests: fvolume::VolumeRequestStream,
243    ) -> Result<(), Error> {
244        let scope = fasync::Scope::new();
245        while let Some(request) = requests.try_next().await.unwrap() {
246            if let Some(session) = self.handle_request(request).await? {
247                scope.spawn(session.map(|_| ()));
248            }
249        }
250        scope.await;
251        Ok(())
252    }
253
254    /// Processes a partition request.  If a new session task is created in response to the request,
255    /// it is returned.
256    async fn handle_request(
257        &self,
258        request: fvolume::VolumeRequest,
259    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
260        match request {
261            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
262                Ok(info) => {
263                    let (block_count, flags) = match info.as_ref() {
264                        DeviceInfo::Block(BlockInfo { block_count, device_flags }) => {
265                            (*block_count, *device_flags)
266                        }
267                        DeviceInfo::Partition(partition_info) => {
268                            let block_count = if let Some(range) =
269                                partition_info.block_range.as_ref()
270                            {
271                                range.end - range.start
272                            } else {
273                                let volume_info = self.session_manager.get_volume_info().await?;
274                                volume_info.0.slice_size * volume_info.1.partition_slice_count
275                                    / self.block_size as u64
276                            };
277                            (block_count, partition_info.device_flags)
278                        }
279                    };
280                    responder.send(Ok(&fblock::BlockInfo {
281                        block_count,
282                        block_size: self.block_size,
283                        max_transfer_size: fblock::MAX_TRANSFER_UNBOUNDED,
284                        flags,
285                    }))?;
286                }
287                Err(status) => responder.send(Err(status.into_raw()))?,
288            },
289            fvolume::VolumeRequest::GetStats { clear: _, responder } => {
290                // TODO(https://fxbug.dev/348077960): Implement this
291                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
292            }
293            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
294                return Ok(Some(self.session_manager.clone().open_session(
295                    session.into_stream(),
296                    None,
297                    self.block_size,
298                )));
299            }
300            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
301                session,
302                offset_map,
303                initial_mappings,
304                control_handle: _,
305            } => {
306                if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
307                    // TODO(https://fxbug.dev/402515764): Support multiple mappings and
308                    // dynamic querying for FVM as needed.  A single static map is
309                    // sufficient for GPT.
310                    session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
311                    return Ok(None);
312                }
313                let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
314                    Ok(m) => m,
315                    Err(status) => {
316                        session.close_with_epitaph(status)?;
317                        return Ok(None);
318                    }
319                };
320                let offset_map = OffsetMap::new(initial_mapping);
321                return Ok(Some(self.session_manager.clone().open_session(
322                    session.into_stream(),
323                    Some(offset_map),
324                    self.block_size,
325                )));
326            }
327            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
328                Ok(info) => {
329                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
330                        let mut guid =
331                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
332                        guid.value.copy_from_slice(&partition_info.type_guid);
333                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
334                    } else {
335                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
336                    }
337                }
338                Err(status) => {
339                    responder.send(status.into_raw(), None)?;
340                }
341            },
342            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
343                match self.device_info().await {
344                    Ok(info) => {
345                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
346                            let mut guid =
347                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
348                            guid.value.copy_from_slice(&partition_info.instance_guid);
349                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
350                        } else {
351                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
352                        }
353                    }
354                    Err(status) => {
355                        responder.send(status.into_raw(), None)?;
356                    }
357                }
358            }
359            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
360                Ok(info) => {
361                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
362                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
363                    } else {
364                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
365                    }
366                }
367                Err(status) => {
368                    responder.send(status.into_raw(), None)?;
369                }
370            },
371            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
372                Ok(info) => {
373                    if let DeviceInfo::Partition(info) = info.as_ref() {
374                        let mut type_guid =
375                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
376                        type_guid.value.copy_from_slice(&info.type_guid);
377                        let mut instance_guid =
378                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
379                        instance_guid.value.copy_from_slice(&info.instance_guid);
380                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
381                            name: Some(info.name.clone()),
382                            type_guid: Some(type_guid),
383                            instance_guid: Some(instance_guid),
384                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
385                            num_blocks: info
386                                .block_range
387                                .as_ref()
388                                .map(|range| range.end - range.start),
389                            flags: Some(info.flags),
390                            ..Default::default()
391                        }))?;
392                    }
393                }
394                Err(status) => responder.send(Err(status.into_raw()))?,
395            },
396            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
397                match self.session_manager.query_slices(&start_slices).await {
398                    Ok(mut results) => {
399                        let results_len = results.len();
400                        assert!(results_len <= 16);
401                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
402                        responder.send(
403                            zx::sys::ZX_OK,
404                            &results.try_into().unwrap(),
405                            results_len as u64,
406                        )?;
407                    }
408                    Err(s) => {
409                        responder.send(
410                            s.into_raw(),
411                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
412                            0,
413                        )?;
414                    }
415                }
416            }
417            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
418                match self.session_manager.get_volume_info().await {
419                    Ok((manager_info, volume_info)) => {
420                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
421                    }
422                    Err(s) => responder.send(s.into_raw(), None, None)?,
423                }
424            }
425            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
426                responder.send(
427                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
428                        .into_raw(),
429                )?;
430            }
431            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
432                responder.send(
433                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
434                        .into_raw(),
435                )?;
436            }
437            fvolume::VolumeRequest::Destroy { responder, .. } => {
438                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
439            }
440        }
441        Ok(None)
442    }
443
444    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
445        self.session_manager.get_info().await
446    }
447}
448
449struct SessionHelper<SM: SessionManager> {
450    session_manager: Arc<SM>,
451    offset_map: Option<OffsetMap>,
452    block_size: u32,
453    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
454    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
455    message_groups: FifoMessageGroups,
456}
457
458impl<SM: SessionManager> SessionHelper<SM> {
459    fn new(
460        session_manager: Arc<SM>,
461        offset_map: Option<OffsetMap>,
462        block_size: u32,
463    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
464        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
465        Ok((
466            Self {
467                session_manager,
468                offset_map,
469                block_size,
470                peer_fifo,
471                vmos: Mutex::default(),
472                message_groups: FifoMessageGroups::default(),
473            },
474            fifo,
475        ))
476    }
477
478    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
479        match request {
480            fblock::SessionRequest::GetFifo { responder } => {
481                let rights = zx::Rights::TRANSFER
482                    | zx::Rights::READ
483                    | zx::Rights::WRITE
484                    | zx::Rights::SIGNAL
485                    | zx::Rights::WAIT;
486                match self.peer_fifo.duplicate_handle(rights) {
487                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
488                    Err(s) => responder.send(Err(s.into_raw()))?,
489                }
490                Ok(())
491            }
492            fblock::SessionRequest::AttachVmo { vmo, responder } => {
493                let vmo = Arc::new(vmo);
494                let vmo_id = {
495                    let mut vmos = self.vmos.lock();
496                    if vmos.len() == u16::MAX as usize {
497                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
498                        return Ok(());
499                    } else {
500                        let vmo_id = match vmos.last_entry() {
501                            None => 1,
502                            Some(o) => {
503                                o.key().checked_add(1).unwrap_or_else(|| {
504                                    let mut vmo_id = 1;
505                                    // Find the first gap...
506                                    for (&id, _) in &*vmos {
507                                        if id > vmo_id {
508                                            break;
509                                        }
510                                        vmo_id = id + 1;
511                                    }
512                                    vmo_id
513                                })
514                            }
515                        };
516                        vmos.insert(vmo_id, vmo.clone());
517                        vmo_id
518                    }
519                };
520                self.session_manager.clone().on_attach_vmo(&vmo).await?;
521                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
522                Ok(())
523            }
524            fblock::SessionRequest::Close { responder } => {
525                responder.send(Ok(()))?;
526                Err(anyhow!("Closed"))
527            }
528        }
529    }
530
531    fn finish_fifo_request(
532        &self,
533        request: RequestTracking,
534        status: zx::Status,
535    ) -> Option<BlockFifoResponse> {
536        match request.group_or_request {
537            GroupOrRequest::Group(group_id) => {
538                let response = self.message_groups.complete(group_id, status);
539                fuchsia_trace::duration!(
540                    c"storage",
541                    c"block_server::finish_transaction_in_group",
542                    "group" => u32::from(group_id),
543                    "group_completed" => response.is_some(),
544                    "status" => status.into_raw());
545                if let Some(trace_flow_id) = request.trace_flow_id {
546                    fuchsia_trace::flow_step!(
547                        c"storage",
548                        c"block_server::finish_transaction",
549                        trace_flow_id.get().into()
550                    );
551                }
552                response
553            }
554            GroupOrRequest::Request(reqid) => {
555                fuchsia_trace::duration!(
556                    c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
557                if let Some(trace_flow_id) = request.trace_flow_id {
558                    fuchsia_trace::flow_step!(
559                        c"storage",
560                        c"block_server::finish_transaction",
561                        trace_flow_id.get().into()
562                    );
563                }
564                Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
565            }
566        }
567    }
568
569    fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
570        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
571        let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
572        let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
573        let mut op_code =
574            BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
575
576        let group_or_request = if is_group {
577            let mut groups = self.message_groups.0.lock();
578            let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
579            if group.req_id.is_some() {
580                // We have already received a request tagged as last.
581                if group.status == zx::Status::OK {
582                    group.status = zx::Status::INVALID_ARGS;
583                }
584                return None;
585            }
586            if last_in_group {
587                group.req_id = Some(request.reqid);
588                // If the group has had an error, there is no point trying to issue this request.
589                if group.status != zx::Status::OK {
590                    op_code = Err(group.status);
591                }
592            } else if group.status != zx::Status::OK {
593                // The group has already encountered an error, so there is no point trying to issue
594                // this request.
595                return None;
596            }
597            group.count += 1;
598            GroupOrRequest::Group(request.group)
599        } else {
600            GroupOrRequest::Request(request.reqid)
601        };
602
603        let mut vmo_offset = 0;
604        let vmo = match op_code {
605            Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
606                if request.length == 0 {
607                    return Err(zx::Status::INVALID_ARGS);
608                }
609                vmo_offset = request
610                    .vmo_offset
611                    .checked_mul(self.block_size as u64)
612                    .ok_or(zx::Status::OUT_OF_RANGE)?;
613                self.vmos
614                    .lock()
615                    .get(&request.vmoid)
616                    .cloned()
617                    .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
618            })(),
619            Ok(BlockOpcode::CloseVmo) => self
620                .vmos
621                .lock()
622                .remove(&request.vmoid)
623                .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
624            _ => Ok(None),
625        }
626        .unwrap_or_else(|e| {
627            op_code = Err(e);
628            None
629        });
630
631        let operation = op_code.map(|code| match code {
632            BlockOpcode::Read => Operation::Read {
633                device_block_offset: request.dev_offset,
634                block_count: request.length,
635                _unused: 0,
636                vmo_offset,
637            },
638            BlockOpcode::Write => Operation::Write {
639                device_block_offset: request.dev_offset,
640                block_count: request.length,
641                options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
642                    WriteOptions::FORCE_ACCESS
643                } else {
644                    WriteOptions::empty()
645                },
646                vmo_offset,
647            },
648            BlockOpcode::Flush => Operation::Flush,
649            BlockOpcode::Trim => Operation::Trim {
650                device_block_offset: request.dev_offset,
651                block_count: request.length,
652            },
653            BlockOpcode::CloseVmo => Operation::CloseVmo,
654        });
655        if let Ok(operation) = operation.as_ref() {
656            use fuchsia_trace::ArgValue;
657            static CACHE: AtomicU64 = AtomicU64::new(0);
658            if let Some(context) =
659                fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
660            {
661                let trace_args_with_group = [
662                    ArgValue::of("group", u32::from(group_or_request.group_id())),
663                    ArgValue::of("opcode", operation.trace_label()),
664                ];
665                let trace_args = [ArgValue::of("opcode", operation.trace_label())];
666                let _scope = if group_or_request.is_group() {
667                    fuchsia_trace::duration(
668                        c"storage",
669                        c"block_server::start_transaction",
670                        &trace_args_with_group,
671                    )
672                } else {
673                    fuchsia_trace::duration(
674                        c"storage",
675                        c"block_server::start_transaction",
676                        &trace_args,
677                    )
678                };
679                let trace_flow_id = NonZero::new(request.trace_flow_id);
680                if let Some(trace_flow_id) = trace_flow_id.clone() {
681                    fuchsia_trace::flow_step(
682                        &context,
683                        c"block_server::start_trnsaction",
684                        trace_flow_id.get().into(),
685                        &[],
686                    );
687                }
688            }
689        }
690        Some(DecodedRequest::new(
691            RequestTracking {
692                group_or_request,
693                trace_flow_id: NonZero::new(request.trace_flow_id),
694            },
695            operation,
696            vmo,
697            self.offset_map.as_ref(),
698        ))
699    }
700
701    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
702        std::mem::take(&mut *self.vmos.lock())
703    }
704}
705
706#[derive(Debug)]
707struct DecodedRequest {
708    request_tracking: RequestTracking,
709    operation: Result<Operation, zx::Status>,
710    vmo: Option<Arc<zx::Vmo>>,
711}
712
713impl DecodedRequest {
714    fn new(
715        request_tracking: RequestTracking,
716        operation: Result<Operation, zx::Status>,
717        vmo: Option<Arc<zx::Vmo>>,
718        offset_map: Option<&OffsetMap>,
719    ) -> Self {
720        let operation =
721            operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
722        Self { request_tracking, operation, vmo }
723    }
724}
725
726/// cbindgen:no-export
727pub type WriteOptions = block_protocol::WriteOptions;
728
729#[repr(C)]
730#[derive(Debug)]
731pub enum Operation {
732    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
733    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
734    // code can read `device_block_offset` from the read variant and then assume it's valid for the
735    // write variant).
736    Read {
737        device_block_offset: u64,
738        block_count: u32,
739        _unused: u32,
740        vmo_offset: u64,
741    },
742    Write {
743        device_block_offset: u64,
744        block_count: u32,
745        options: WriteOptions,
746        vmo_offset: u64,
747    },
748    Flush,
749    Trim {
750        device_block_offset: u64,
751        block_count: u32,
752    },
753    /// This will never be seen by the C interface.
754    CloseVmo,
755}
756
757impl Operation {
758    // Adjusts the operation's block range via `OffsetMap`.  If the request would exceed the range
759    // known to the map, or is otherwise invalid, returns an error.
760    fn validate_and_adjust_range(
761        mut self,
762        offset_map: Option<&OffsetMap>,
763    ) -> Result<Operation, zx::Status> {
764        let adjust_offset = |dev_offset, length| {
765            // For compatibility with the C++ server, always ensure the length is nonzero
766            if length == 0 {
767                return Err(zx::Status::INVALID_ARGS);
768            }
769            if let Some(offset_map) = offset_map {
770                offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
771            } else {
772                Ok(dev_offset)
773            }
774        };
775        match &mut self {
776            Operation::Read { device_block_offset, block_count, .. } => {
777                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
778            }
779            Operation::Write { device_block_offset, block_count, .. } => {
780                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
781            }
782            Operation::Trim { device_block_offset, block_count, .. } => {
783                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
784            }
785            _ => {}
786        }
787        Ok(self)
788    }
789    fn trace_label(&self) -> &'static str {
790        match self {
791            Operation::Read { .. } => "read",
792            Operation::Write { .. } => "write",
793            Operation::Flush { .. } => "flush",
794            Operation::Trim { .. } => "trim",
795            Operation::CloseVmo { .. } => "close_vmo",
796        }
797    }
798}
799
800#[derive(Clone, Copy, Debug)]
801pub struct RequestTracking {
802    group_or_request: GroupOrRequest,
803    trace_flow_id: Option<NonZero<u64>>,
804}
805
806#[derive(Clone, Copy, Debug)]
807pub enum GroupOrRequest {
808    Group(u16),
809    Request(u32),
810}
811
812impl GroupOrRequest {
813    fn is_group(&self) -> bool {
814        if let Self::Group(_) = self {
815            true
816        } else {
817            false
818        }
819    }
820
821    fn group_id(&self) -> u16 {
822        match self {
823            Self::Group(id) => *id,
824            Self::Request(_) => 0,
825        }
826    }
827}
828
829/// cbindgen:ignore
830const IS_GROUP: u64 = 0x8000_0000_0000_0000;
831/// cbindgen:ignore
832const USED_VMO: u64 = 0x4000_0000_0000_0000;
833#[repr(transparent)]
834#[derive(Clone, Copy, Eq, PartialEq, Hash)]
835pub struct RequestId(u64);
836
837impl RequestId {
838    /// Marks the request as having used a VMO, so that we keep the VMO alive until
839    /// the request has finished.
840    fn with_vmo(self) -> Self {
841        Self(self.0 | USED_VMO)
842    }
843
844    /// Returns whether the request ID indicates a VMO was used.
845    fn did_have_vmo(&self) -> bool {
846        self.0 & USED_VMO != 0
847    }
848}
849
850impl From<GroupOrRequest> for RequestId {
851    fn from(value: GroupOrRequest) -> Self {
852        match value {
853            GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
854            GroupOrRequest::Request(request) => RequestId(request as u64),
855        }
856    }
857}
858
859impl From<RequestId> for GroupOrRequest {
860    fn from(value: RequestId) -> Self {
861        if value.0 & IS_GROUP == 0 {
862            GroupOrRequest::Request(value.0 as u32)
863        } else {
864            GroupOrRequest::Group(value.0 as u16)
865        }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::{BlockServer, DeviceInfo, PartitionInfo};
872    use crate::async_interface::FIFO_MAX_REQUESTS;
873    use assert_matches::assert_matches;
874    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
875    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
876    use fuchsia_sync::Mutex;
877    use futures::channel::oneshot;
878    use futures::future::BoxFuture;
879    use futures::FutureExt as _;
880    use std::borrow::Cow;
881    use std::future::poll_fn;
882    use std::num::NonZero;
883    use std::pin::pin;
884    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
885    use std::sync::Arc;
886    use std::task::{Context, Poll};
887    use zx::{AsHandleRef as _, HandleBased as _};
888    use {
889        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
890        fuchsia_async as fasync,
891    };
892
893    #[derive(Default)]
894    struct MockInterface {
895        read_hook: Option<
896            Box<
897                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
898                    + Send
899                    + Sync,
900            >,
901        >,
902    }
903
904    impl super::async_interface::Interface for MockInterface {
905        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
906            Ok(())
907        }
908
909        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
910            Ok(Cow::Owned(test_device_info()))
911        }
912
913        async fn read(
914            &self,
915            device_block_offset: u64,
916            block_count: u32,
917            vmo: &Arc<zx::Vmo>,
918            vmo_offset: u64,
919            _trace_flow_id: Option<NonZero<u64>>,
920        ) -> Result<(), zx::Status> {
921            if let Some(read_hook) = &self.read_hook {
922                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
923            } else {
924                unimplemented!();
925            }
926        }
927
928        async fn write(
929            &self,
930            _device_block_offset: u64,
931            _block_count: u32,
932            _vmo: &Arc<zx::Vmo>,
933            _vmo_offset: u64,
934            _opts: WriteOptions,
935            _trace_flow_id: Option<NonZero<u64>>,
936        ) -> Result<(), zx::Status> {
937            unreachable!();
938        }
939
940        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
941            unreachable!();
942        }
943
944        async fn trim(
945            &self,
946            _device_block_offset: u64,
947            _block_count: u32,
948            _trace_flow_id: Option<NonZero<u64>>,
949        ) -> Result<(), zx::Status> {
950            unreachable!();
951        }
952
953        async fn get_volume_info(
954            &self,
955        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
956            // Hang forever for the test_requests_dont_block_sessions test.
957            let () = std::future::pending().await;
958            unreachable!();
959        }
960    }
961
962    const BLOCK_SIZE: u32 = 512;
963
964    fn test_device_info() -> DeviceInfo {
965        DeviceInfo::Partition(PartitionInfo {
966            device_flags: fblock::Flag::READONLY,
967            block_range: Some(12..34),
968            type_guid: [1; 16],
969            instance_guid: [2; 16],
970            name: "foo".to_string(),
971            flags: 0xabcd,
972        })
973    }
974
975    #[fuchsia::test]
976    async fn test_info() {
977        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
978
979        futures::join!(
980            async {
981                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
982                block_server.handle_requests(stream).await.unwrap();
983            },
984            async {
985                let expected_info = test_device_info();
986                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
987                    info
988                } else {
989                    unreachable!()
990                };
991
992                let block_info = proxy.get_info().await.unwrap().unwrap();
993                assert_eq!(
994                    block_info.block_count,
995                    partition_info.block_range.as_ref().unwrap().end
996                        - partition_info.block_range.as_ref().unwrap().start
997                );
998                assert_eq!(block_info.flags, fblock::Flag::READONLY);
999
1000                // TODO(https://fxbug.dev/348077960): Check max_transfer_size
1001
1002                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1003                assert_eq!(status, zx::sys::ZX_OK);
1004                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1005
1006                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1007                assert_eq!(status, zx::sys::ZX_OK);
1008                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1009
1010                let (status, name) = proxy.get_name().await.unwrap();
1011                assert_eq!(status, zx::sys::ZX_OK);
1012                assert_eq!(name.as_ref(), Some(&partition_info.name));
1013
1014                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1015                assert_eq!(metadata.name, name);
1016                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1017                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1018                assert_eq!(
1019                    metadata.start_block_offset,
1020                    Some(partition_info.block_range.as_ref().unwrap().start)
1021                );
1022                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1023                assert_eq!(metadata.flags, Some(partition_info.flags));
1024
1025                std::mem::drop(proxy);
1026            }
1027        );
1028    }
1029
1030    #[fuchsia::test]
1031    async fn test_attach_vmo() {
1032        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1033
1034        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1035        let koid = vmo.get_koid().unwrap();
1036
1037        futures::join!(
1038            async {
1039                let block_server = BlockServer::new(
1040                    BLOCK_SIZE,
1041                    Arc::new(MockInterface {
1042                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1043                            assert_eq!(vmo.get_koid().unwrap(), koid);
1044                            Box::pin(async { Ok(()) })
1045                        })),
1046                        ..MockInterface::default()
1047                    }),
1048                );
1049                block_server.handle_requests(stream).await.unwrap();
1050            },
1051            async move {
1052                let (session_proxy, server) = fidl::endpoints::create_proxy();
1053
1054                proxy.open_session(server).unwrap();
1055
1056                let vmo_id = session_proxy
1057                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1058                    .await
1059                    .unwrap()
1060                    .unwrap();
1061                assert_ne!(vmo_id.id, 0);
1062
1063                let mut fifo =
1064                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1065                let (mut reader, mut writer) = fifo.async_io();
1066
1067                // Keep attaching VMOs until we eventually hit the maximum.
1068                let mut count = 1;
1069                loop {
1070                    match session_proxy
1071                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1072                        .await
1073                        .unwrap()
1074                    {
1075                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1076                        Err(e) => {
1077                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1078                            break;
1079                        }
1080                    }
1081
1082                    // Only test every 10 to keep test time down.
1083                    if count % 10 == 0 {
1084                        writer
1085                            .write_entries(&BlockFifoRequest {
1086                                command: BlockFifoCommand {
1087                                    opcode: BlockOpcode::Read.into_primitive(),
1088                                    ..Default::default()
1089                                },
1090                                vmoid: vmo_id.id,
1091                                length: 1,
1092                                ..Default::default()
1093                            })
1094                            .await
1095                            .unwrap();
1096
1097                        let mut response = BlockFifoResponse::default();
1098                        reader.read_entries(&mut response).await.unwrap();
1099                        assert_eq!(response.status, zx::sys::ZX_OK);
1100                    }
1101
1102                    count += 1;
1103                }
1104
1105                assert_eq!(count, u16::MAX as u64);
1106
1107                // Detach the original VMO, and make sure we can then attach another one.
1108                writer
1109                    .write_entries(&BlockFifoRequest {
1110                        command: BlockFifoCommand {
1111                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1112                            ..Default::default()
1113                        },
1114                        vmoid: vmo_id.id,
1115                        ..Default::default()
1116                    })
1117                    .await
1118                    .unwrap();
1119
1120                let mut response = BlockFifoResponse::default();
1121                reader.read_entries(&mut response).await.unwrap();
1122                assert_eq!(response.status, zx::sys::ZX_OK);
1123
1124                let new_vmo_id = session_proxy
1125                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1126                    .await
1127                    .unwrap()
1128                    .unwrap();
1129                // It should reuse the same ID.
1130                assert_eq!(new_vmo_id.id, vmo_id.id);
1131
1132                std::mem::drop(proxy);
1133            }
1134        );
1135    }
1136
1137    #[fuchsia::test]
1138    async fn test_close() {
1139        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1140
1141        let mut server = std::pin::pin!(async {
1142            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1143            block_server.handle_requests(stream).await.unwrap();
1144        }
1145        .fuse());
1146
1147        let mut client = std::pin::pin!(async {
1148            let (session_proxy, server) = fidl::endpoints::create_proxy();
1149
1150            proxy.open_session(server).unwrap();
1151
1152            // Dropping the proxy should not cause the session to terminate because the session is
1153            // still live.
1154            std::mem::drop(proxy);
1155
1156            session_proxy.close().await.unwrap().unwrap();
1157
1158            // Keep the session alive.  Calling `close` should cause the server to terminate.
1159            let _: () = std::future::pending().await;
1160        }
1161        .fuse());
1162
1163        futures::select!(
1164            _ = server => {}
1165            _ = client => unreachable!(),
1166        );
1167    }
1168
1169    #[derive(Default)]
1170    struct IoMockInterface {
1171        do_checks: bool,
1172        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1173        return_errors: bool,
1174    }
1175
1176    #[derive(Debug)]
1177    enum ExpectedOp {
1178        Read(u64, u32, u64),
1179        Write(u64, u32, u64),
1180        Trim(u64, u32),
1181        Flush,
1182    }
1183
1184    impl super::async_interface::Interface for IoMockInterface {
1185        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1186            Ok(())
1187        }
1188
1189        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1190            Ok(Cow::Owned(test_device_info()))
1191        }
1192
1193        async fn read(
1194            &self,
1195            device_block_offset: u64,
1196            block_count: u32,
1197            _vmo: &Arc<zx::Vmo>,
1198            vmo_offset: u64,
1199            _trace_flow_id: Option<NonZero<u64>>,
1200        ) -> Result<(), zx::Status> {
1201            if self.return_errors {
1202                Err(zx::Status::INTERNAL)
1203            } else {
1204                if self.do_checks {
1205                    assert_matches!(
1206                        self.expected_op.lock().take(),
1207                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1208                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1209                        "Read {device_block_offset} {block_count} {vmo_offset}"
1210                    );
1211                }
1212                Ok(())
1213            }
1214        }
1215
1216        async fn write(
1217            &self,
1218            device_block_offset: u64,
1219            block_count: u32,
1220            _vmo: &Arc<zx::Vmo>,
1221            vmo_offset: u64,
1222            _opts: WriteOptions,
1223            _trace_flow_id: Option<NonZero<u64>>,
1224        ) -> Result<(), zx::Status> {
1225            if self.return_errors {
1226                Err(zx::Status::NOT_SUPPORTED)
1227            } else {
1228                if self.do_checks {
1229                    assert_matches!(
1230                        self.expected_op.lock().take(),
1231                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1232                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1233                        "Write {device_block_offset} {block_count} {vmo_offset}"
1234                    );
1235                }
1236                Ok(())
1237            }
1238        }
1239
1240        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1241            if self.return_errors {
1242                Err(zx::Status::NO_RESOURCES)
1243            } else {
1244                if self.do_checks {
1245                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1246                }
1247                Ok(())
1248            }
1249        }
1250
1251        async fn trim(
1252            &self,
1253            device_block_offset: u64,
1254            block_count: u32,
1255            _trace_flow_id: Option<NonZero<u64>>,
1256        ) -> Result<(), zx::Status> {
1257            if self.return_errors {
1258                Err(zx::Status::NO_MEMORY)
1259            } else {
1260                if self.do_checks {
1261                    assert_matches!(
1262                        self.expected_op.lock().take(),
1263                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1264                            block_count == b,
1265                        "Trim {device_block_offset} {block_count}"
1266                    );
1267                }
1268                Ok(())
1269            }
1270        }
1271    }
1272
1273    #[fuchsia::test]
1274    async fn test_io() {
1275        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1276
1277        let expected_op = Arc::new(Mutex::new(None));
1278        let expected_op_clone = expected_op.clone();
1279
1280        let server = async {
1281            let block_server = BlockServer::new(
1282                BLOCK_SIZE,
1283                Arc::new(IoMockInterface {
1284                    return_errors: false,
1285                    do_checks: true,
1286                    expected_op: expected_op_clone,
1287                }),
1288            );
1289            block_server.handle_requests(stream).await.unwrap();
1290        };
1291
1292        let client = async move {
1293            let (session_proxy, server) = fidl::endpoints::create_proxy();
1294
1295            proxy.open_session(server).unwrap();
1296
1297            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1298            let vmo_id = session_proxy
1299                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1300                .await
1301                .unwrap()
1302                .unwrap();
1303
1304            let mut fifo =
1305                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1306            let (mut reader, mut writer) = fifo.async_io();
1307
1308            // READ
1309            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1310            writer
1311                .write_entries(&BlockFifoRequest {
1312                    command: BlockFifoCommand {
1313                        opcode: BlockOpcode::Read.into_primitive(),
1314                        ..Default::default()
1315                    },
1316                    vmoid: vmo_id.id,
1317                    dev_offset: 1,
1318                    length: 2,
1319                    vmo_offset: 3,
1320                    ..Default::default()
1321                })
1322                .await
1323                .unwrap();
1324
1325            let mut response = BlockFifoResponse::default();
1326            reader.read_entries(&mut response).await.unwrap();
1327            assert_eq!(response.status, zx::sys::ZX_OK);
1328
1329            // WRITE
1330            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1331            writer
1332                .write_entries(&BlockFifoRequest {
1333                    command: BlockFifoCommand {
1334                        opcode: BlockOpcode::Write.into_primitive(),
1335                        ..Default::default()
1336                    },
1337                    vmoid: vmo_id.id,
1338                    dev_offset: 4,
1339                    length: 5,
1340                    vmo_offset: 6,
1341                    ..Default::default()
1342                })
1343                .await
1344                .unwrap();
1345
1346            let mut response = BlockFifoResponse::default();
1347            reader.read_entries(&mut response).await.unwrap();
1348            assert_eq!(response.status, zx::sys::ZX_OK);
1349
1350            // FLUSH
1351            *expected_op.lock() = Some(ExpectedOp::Flush);
1352            writer
1353                .write_entries(&BlockFifoRequest {
1354                    command: BlockFifoCommand {
1355                        opcode: BlockOpcode::Flush.into_primitive(),
1356                        ..Default::default()
1357                    },
1358                    ..Default::default()
1359                })
1360                .await
1361                .unwrap();
1362
1363            reader.read_entries(&mut response).await.unwrap();
1364            assert_eq!(response.status, zx::sys::ZX_OK);
1365
1366            // TRIM
1367            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1368            writer
1369                .write_entries(&BlockFifoRequest {
1370                    command: BlockFifoCommand {
1371                        opcode: BlockOpcode::Trim.into_primitive(),
1372                        ..Default::default()
1373                    },
1374                    dev_offset: 7,
1375                    length: 8,
1376                    ..Default::default()
1377                })
1378                .await
1379                .unwrap();
1380
1381            reader.read_entries(&mut response).await.unwrap();
1382            assert_eq!(response.status, zx::sys::ZX_OK);
1383
1384            std::mem::drop(proxy);
1385        };
1386
1387        futures::join!(server, client);
1388    }
1389
1390    #[fuchsia::test]
1391    async fn test_io_errors() {
1392        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1393
1394        futures::join!(
1395            async {
1396                let block_server = BlockServer::new(
1397                    BLOCK_SIZE,
1398                    Arc::new(IoMockInterface {
1399                        return_errors: true,
1400                        do_checks: false,
1401                        expected_op: Arc::new(Mutex::new(None)),
1402                    }),
1403                );
1404                block_server.handle_requests(stream).await.unwrap();
1405            },
1406            async move {
1407                let (session_proxy, server) = fidl::endpoints::create_proxy();
1408
1409                proxy.open_session(server).unwrap();
1410
1411                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1412                let vmo_id = session_proxy
1413                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1414                    .await
1415                    .unwrap()
1416                    .unwrap();
1417
1418                let mut fifo =
1419                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1420                let (mut reader, mut writer) = fifo.async_io();
1421
1422                // READ
1423                writer
1424                    .write_entries(&BlockFifoRequest {
1425                        command: BlockFifoCommand {
1426                            opcode: BlockOpcode::Read.into_primitive(),
1427                            ..Default::default()
1428                        },
1429                        vmoid: vmo_id.id,
1430                        length: 1,
1431                        reqid: 1,
1432                        ..Default::default()
1433                    })
1434                    .await
1435                    .unwrap();
1436
1437                let mut response = BlockFifoResponse::default();
1438                reader.read_entries(&mut response).await.unwrap();
1439                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1440
1441                // WRITE
1442                writer
1443                    .write_entries(&BlockFifoRequest {
1444                        command: BlockFifoCommand {
1445                            opcode: BlockOpcode::Write.into_primitive(),
1446                            ..Default::default()
1447                        },
1448                        vmoid: vmo_id.id,
1449                        length: 1,
1450                        reqid: 2,
1451                        ..Default::default()
1452                    })
1453                    .await
1454                    .unwrap();
1455
1456                reader.read_entries(&mut response).await.unwrap();
1457                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1458
1459                // FLUSH
1460                writer
1461                    .write_entries(&BlockFifoRequest {
1462                        command: BlockFifoCommand {
1463                            opcode: BlockOpcode::Flush.into_primitive(),
1464                            ..Default::default()
1465                        },
1466                        reqid: 3,
1467                        ..Default::default()
1468                    })
1469                    .await
1470                    .unwrap();
1471
1472                reader.read_entries(&mut response).await.unwrap();
1473                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1474
1475                // TRIM
1476                writer
1477                    .write_entries(&BlockFifoRequest {
1478                        command: BlockFifoCommand {
1479                            opcode: BlockOpcode::Trim.into_primitive(),
1480                            ..Default::default()
1481                        },
1482                        reqid: 4,
1483                        length: 1,
1484                        ..Default::default()
1485                    })
1486                    .await
1487                    .unwrap();
1488
1489                reader.read_entries(&mut response).await.unwrap();
1490                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1491
1492                std::mem::drop(proxy);
1493            }
1494        );
1495    }
1496
1497    #[fuchsia::test]
1498    async fn test_invalid_args() {
1499        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1500
1501        futures::join!(
1502            async {
1503                let block_server = BlockServer::new(
1504                    BLOCK_SIZE,
1505                    Arc::new(IoMockInterface {
1506                        return_errors: false,
1507                        do_checks: false,
1508                        expected_op: Arc::new(Mutex::new(None)),
1509                    }),
1510                );
1511                block_server.handle_requests(stream).await.unwrap();
1512            },
1513            async move {
1514                let (session_proxy, server) = fidl::endpoints::create_proxy();
1515
1516                proxy.open_session(server).unwrap();
1517
1518                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1519                let vmo_id = session_proxy
1520                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1521                    .await
1522                    .unwrap()
1523                    .unwrap();
1524
1525                let mut fifo =
1526                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1527
1528                async fn test(
1529                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1530                    request: BlockFifoRequest,
1531                ) -> Result<(), zx::Status> {
1532                    let (mut reader, mut writer) = fifo.async_io();
1533                    writer.write_entries(&request).await.unwrap();
1534                    let mut response = BlockFifoResponse::default();
1535                    reader.read_entries(&mut response).await.unwrap();
1536                    zx::Status::ok(response.status)
1537                }
1538
1539                // READ
1540
1541                let good_read_request = || BlockFifoRequest {
1542                    command: BlockFifoCommand {
1543                        opcode: BlockOpcode::Read.into_primitive(),
1544                        ..Default::default()
1545                    },
1546                    vmoid: vmo_id.id,
1547                    ..Default::default()
1548                };
1549
1550                assert_eq!(
1551                    test(
1552                        &mut fifo,
1553                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1554                    )
1555                    .await,
1556                    Err(zx::Status::INVALID_ARGS)
1557                );
1558
1559                assert_eq!(
1560                    test(
1561                        &mut fifo,
1562                        BlockFifoRequest {
1563                            vmo_offset: 0xffff_ffff_ffff_ffff,
1564                            ..good_read_request()
1565                        }
1566                    )
1567                    .await,
1568                    Err(zx::Status::INVALID_ARGS)
1569                );
1570
1571                // WRITE
1572
1573                let good_write_request = || BlockFifoRequest {
1574                    command: BlockFifoCommand {
1575                        opcode: BlockOpcode::Write.into_primitive(),
1576                        ..Default::default()
1577                    },
1578                    vmoid: vmo_id.id,
1579                    ..Default::default()
1580                };
1581
1582                assert_eq!(
1583                    test(
1584                        &mut fifo,
1585                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1586                    )
1587                    .await,
1588                    Err(zx::Status::INVALID_ARGS)
1589                );
1590
1591                assert_eq!(
1592                    test(
1593                        &mut fifo,
1594                        BlockFifoRequest {
1595                            vmo_offset: 0xffff_ffff_ffff_ffff,
1596                            ..good_write_request()
1597                        }
1598                    )
1599                    .await,
1600                    Err(zx::Status::INVALID_ARGS)
1601                );
1602
1603                // CLOSE VMO
1604
1605                assert_eq!(
1606                    test(
1607                        &mut fifo,
1608                        BlockFifoRequest {
1609                            command: BlockFifoCommand {
1610                                opcode: BlockOpcode::CloseVmo.into_primitive(),
1611                                ..Default::default()
1612                            },
1613                            vmoid: vmo_id.id + 1,
1614                            ..Default::default()
1615                        }
1616                    )
1617                    .await,
1618                    Err(zx::Status::IO)
1619                );
1620
1621                std::mem::drop(proxy);
1622            }
1623        );
1624    }
1625
1626    #[fuchsia::test]
1627    async fn test_concurrent_requests() {
1628        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1629
1630        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1631        let waiting_readers_clone = waiting_readers.clone();
1632
1633        futures::join!(
1634            async move {
1635                let block_server = BlockServer::new(
1636                    BLOCK_SIZE,
1637                    Arc::new(MockInterface {
1638                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1639                            let (tx, rx) = oneshot::channel();
1640                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
1641                            Box::pin(async move {
1642                                let _ = rx.await;
1643                                Ok(())
1644                            })
1645                        })),
1646                    }),
1647                );
1648                block_server.handle_requests(stream).await.unwrap();
1649            },
1650            async move {
1651                let (session_proxy, server) = fidl::endpoints::create_proxy();
1652
1653                proxy.open_session(server).unwrap();
1654
1655                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1656                let vmo_id = session_proxy
1657                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1658                    .await
1659                    .unwrap()
1660                    .unwrap();
1661
1662                let mut fifo =
1663                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1664                let (mut reader, mut writer) = fifo.async_io();
1665
1666                writer
1667                    .write_entries(&BlockFifoRequest {
1668                        command: BlockFifoCommand {
1669                            opcode: BlockOpcode::Read.into_primitive(),
1670                            ..Default::default()
1671                        },
1672                        reqid: 1,
1673                        dev_offset: 1, // Intentionally use the same as `reqid`.
1674                        vmoid: vmo_id.id,
1675                        length: 1,
1676                        ..Default::default()
1677                    })
1678                    .await
1679                    .unwrap();
1680
1681                writer
1682                    .write_entries(&BlockFifoRequest {
1683                        command: BlockFifoCommand {
1684                            opcode: BlockOpcode::Read.into_primitive(),
1685                            ..Default::default()
1686                        },
1687                        reqid: 2,
1688                        dev_offset: 2,
1689                        vmoid: vmo_id.id,
1690                        length: 1,
1691                        ..Default::default()
1692                    })
1693                    .await
1694                    .unwrap();
1695
1696                // Wait till both those entries are pending.
1697                poll_fn(|cx: &mut Context<'_>| {
1698                    if waiting_readers.lock().len() == 2 {
1699                        Poll::Ready(())
1700                    } else {
1701                        // Yield to the executor.
1702                        cx.waker().wake_by_ref();
1703                        Poll::Pending
1704                    }
1705                })
1706                .await;
1707
1708                let mut response = BlockFifoResponse::default();
1709                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1710
1711                let (id, tx) = waiting_readers.lock().pop().unwrap();
1712                tx.send(()).unwrap();
1713
1714                reader.read_entries(&mut response).await.unwrap();
1715                assert_eq!(response.status, zx::sys::ZX_OK);
1716                assert_eq!(response.reqid, id);
1717
1718                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1719
1720                let (id, tx) = waiting_readers.lock().pop().unwrap();
1721                tx.send(()).unwrap();
1722
1723                reader.read_entries(&mut response).await.unwrap();
1724                assert_eq!(response.status, zx::sys::ZX_OK);
1725                assert_eq!(response.reqid, id);
1726            }
1727        );
1728    }
1729
1730    #[fuchsia::test]
1731    async fn test_groups() {
1732        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1733
1734        futures::join!(
1735            async move {
1736                let block_server = BlockServer::new(
1737                    BLOCK_SIZE,
1738                    Arc::new(MockInterface {
1739                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1740                    }),
1741                );
1742                block_server.handle_requests(stream).await.unwrap();
1743            },
1744            async move {
1745                let (session_proxy, server) = fidl::endpoints::create_proxy();
1746
1747                proxy.open_session(server).unwrap();
1748
1749                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1750                let vmo_id = session_proxy
1751                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1752                    .await
1753                    .unwrap()
1754                    .unwrap();
1755
1756                let mut fifo =
1757                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1758                let (mut reader, mut writer) = fifo.async_io();
1759
1760                writer
1761                    .write_entries(&BlockFifoRequest {
1762                        command: BlockFifoCommand {
1763                            opcode: BlockOpcode::Read.into_primitive(),
1764                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1765                            ..Default::default()
1766                        },
1767                        group: 1,
1768                        vmoid: vmo_id.id,
1769                        length: 1,
1770                        ..Default::default()
1771                    })
1772                    .await
1773                    .unwrap();
1774
1775                writer
1776                    .write_entries(&BlockFifoRequest {
1777                        command: BlockFifoCommand {
1778                            opcode: BlockOpcode::Read.into_primitive(),
1779                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1780                            ..Default::default()
1781                        },
1782                        reqid: 2,
1783                        group: 1,
1784                        vmoid: vmo_id.id,
1785                        length: 1,
1786                        ..Default::default()
1787                    })
1788                    .await
1789                    .unwrap();
1790
1791                let mut response = BlockFifoResponse::default();
1792                reader.read_entries(&mut response).await.unwrap();
1793                assert_eq!(response.status, zx::sys::ZX_OK);
1794                assert_eq!(response.reqid, 2);
1795                assert_eq!(response.group, 1);
1796            }
1797        );
1798    }
1799
1800    #[fuchsia::test]
1801    async fn test_group_error() {
1802        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1803
1804        let counter = Arc::new(AtomicU64::new(0));
1805        let counter_clone = counter.clone();
1806
1807        futures::join!(
1808            async move {
1809                let block_server = BlockServer::new(
1810                    BLOCK_SIZE,
1811                    Arc::new(MockInterface {
1812                        read_hook: Some(Box::new(move |_, _, _, _| {
1813                            counter_clone.fetch_add(1, Ordering::Relaxed);
1814                            Box::pin(async { Err(zx::Status::BAD_STATE) })
1815                        })),
1816                    }),
1817                );
1818                block_server.handle_requests(stream).await.unwrap();
1819            },
1820            async move {
1821                let (session_proxy, server) = fidl::endpoints::create_proxy();
1822
1823                proxy.open_session(server).unwrap();
1824
1825                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1826                let vmo_id = session_proxy
1827                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1828                    .await
1829                    .unwrap()
1830                    .unwrap();
1831
1832                let mut fifo =
1833                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1834                let (mut reader, mut writer) = fifo.async_io();
1835
1836                writer
1837                    .write_entries(&BlockFifoRequest {
1838                        command: BlockFifoCommand {
1839                            opcode: BlockOpcode::Read.into_primitive(),
1840                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1841                            ..Default::default()
1842                        },
1843                        group: 1,
1844                        vmoid: vmo_id.id,
1845                        length: 1,
1846                        ..Default::default()
1847                    })
1848                    .await
1849                    .unwrap();
1850
1851                // Wait until processed.
1852                poll_fn(|cx: &mut Context<'_>| {
1853                    if counter.load(Ordering::Relaxed) == 1 {
1854                        Poll::Ready(())
1855                    } else {
1856                        // Yield to the executor.
1857                        cx.waker().wake_by_ref();
1858                        Poll::Pending
1859                    }
1860                })
1861                .await;
1862
1863                let mut response = BlockFifoResponse::default();
1864                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1865
1866                writer
1867                    .write_entries(&BlockFifoRequest {
1868                        command: BlockFifoCommand {
1869                            opcode: BlockOpcode::Read.into_primitive(),
1870                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1871                            ..Default::default()
1872                        },
1873                        group: 1,
1874                        vmoid: vmo_id.id,
1875                        length: 1,
1876                        ..Default::default()
1877                    })
1878                    .await
1879                    .unwrap();
1880
1881                writer
1882                    .write_entries(&BlockFifoRequest {
1883                        command: BlockFifoCommand {
1884                            opcode: BlockOpcode::Read.into_primitive(),
1885                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1886                            ..Default::default()
1887                        },
1888                        reqid: 2,
1889                        group: 1,
1890                        vmoid: vmo_id.id,
1891                        length: 1,
1892                        ..Default::default()
1893                    })
1894                    .await
1895                    .unwrap();
1896
1897                reader.read_entries(&mut response).await.unwrap();
1898                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1899                assert_eq!(response.reqid, 2);
1900                assert_eq!(response.group, 1);
1901
1902                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1903
1904                // Only the first request should have been processed.
1905                assert_eq!(counter.load(Ordering::Relaxed), 1);
1906            }
1907        );
1908    }
1909
1910    #[fuchsia::test]
1911    async fn test_group_with_two_lasts() {
1912        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1913
1914        let (tx, rx) = oneshot::channel();
1915
1916        futures::join!(
1917            async move {
1918                let rx = Mutex::new(Some(rx));
1919                let block_server = BlockServer::new(
1920                    BLOCK_SIZE,
1921                    Arc::new(MockInterface {
1922                        read_hook: Some(Box::new(move |_, _, _, _| {
1923                            let rx = rx.lock().take().unwrap();
1924                            Box::pin(async {
1925                                let _ = rx.await;
1926                                Ok(())
1927                            })
1928                        })),
1929                    }),
1930                );
1931                block_server.handle_requests(stream).await.unwrap();
1932            },
1933            async move {
1934                let (session_proxy, server) = fidl::endpoints::create_proxy();
1935
1936                proxy.open_session(server).unwrap();
1937
1938                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1939                let vmo_id = session_proxy
1940                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1941                    .await
1942                    .unwrap()
1943                    .unwrap();
1944
1945                let mut fifo =
1946                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1947                let (mut reader, mut writer) = fifo.async_io();
1948
1949                writer
1950                    .write_entries(&BlockFifoRequest {
1951                        command: BlockFifoCommand {
1952                            opcode: BlockOpcode::Read.into_primitive(),
1953                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1954                            ..Default::default()
1955                        },
1956                        reqid: 1,
1957                        group: 1,
1958                        vmoid: vmo_id.id,
1959                        length: 1,
1960                        ..Default::default()
1961                    })
1962                    .await
1963                    .unwrap();
1964
1965                writer
1966                    .write_entries(&BlockFifoRequest {
1967                        command: BlockFifoCommand {
1968                            opcode: BlockOpcode::Read.into_primitive(),
1969                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1970                            ..Default::default()
1971                        },
1972                        reqid: 2,
1973                        group: 1,
1974                        vmoid: vmo_id.id,
1975                        length: 1,
1976                        ..Default::default()
1977                    })
1978                    .await
1979                    .unwrap();
1980
1981                // Send an independent request to flush through the fifo.
1982                writer
1983                    .write_entries(&BlockFifoRequest {
1984                        command: BlockFifoCommand {
1985                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1986                            ..Default::default()
1987                        },
1988                        reqid: 3,
1989                        vmoid: vmo_id.id,
1990                        ..Default::default()
1991                    })
1992                    .await
1993                    .unwrap();
1994
1995                // It should succeed.
1996                let mut response = BlockFifoResponse::default();
1997                reader.read_entries(&mut response).await.unwrap();
1998                assert_eq!(response.status, zx::sys::ZX_OK);
1999                assert_eq!(response.reqid, 3);
2000
2001                // Now release the original request.
2002                tx.send(()).unwrap();
2003
2004                // The response should be for the first message tagged as last, and it should be
2005                // an error because we sent two messages with the LAST marker.
2006                let mut response = BlockFifoResponse::default();
2007                reader.read_entries(&mut response).await.unwrap();
2008                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2009                assert_eq!(response.reqid, 1);
2010                assert_eq!(response.group, 1);
2011            }
2012        );
2013    }
2014
2015    #[fuchsia::test(allow_stalls = false)]
2016    async fn test_requests_dont_block_sessions() {
2017        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2018
2019        let (tx, rx) = oneshot::channel();
2020
2021        fasync::Task::local(async move {
2022            let rx = Mutex::new(Some(rx));
2023            let block_server = BlockServer::new(
2024                BLOCK_SIZE,
2025                Arc::new(MockInterface {
2026                    read_hook: Some(Box::new(move |_, _, _, _| {
2027                        let rx = rx.lock().take().unwrap();
2028                        Box::pin(async {
2029                            let _ = rx.await;
2030                            Ok(())
2031                        })
2032                    })),
2033                }),
2034            );
2035            block_server.handle_requests(stream).await.unwrap();
2036        })
2037        .detach();
2038
2039        let mut fut = pin!(async {
2040            let (session_proxy, server) = fidl::endpoints::create_proxy();
2041
2042            proxy.open_session(server).unwrap();
2043
2044            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2045            let vmo_id = session_proxy
2046                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2047                .await
2048                .unwrap()
2049                .unwrap();
2050
2051            let mut fifo =
2052                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2053            let (mut reader, mut writer) = fifo.async_io();
2054
2055            writer
2056                .write_entries(&BlockFifoRequest {
2057                    command: BlockFifoCommand {
2058                        opcode: BlockOpcode::Read.into_primitive(),
2059                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2060                        ..Default::default()
2061                    },
2062                    reqid: 1,
2063                    group: 1,
2064                    vmoid: vmo_id.id,
2065                    length: 1,
2066                    ..Default::default()
2067                })
2068                .await
2069                .unwrap();
2070
2071            let mut response = BlockFifoResponse::default();
2072            reader.read_entries(&mut response).await.unwrap();
2073            assert_eq!(response.status, zx::sys::ZX_OK);
2074        });
2075
2076        // The response won't come back until we send on `tx`.
2077        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2078
2079        let mut fut2 = pin!(proxy.get_volume_info());
2080
2081        // get_volume_info is set up to stall forever.
2082        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2083
2084        // If we now free up the first future, it should resolve; the stalled call to
2085        // get_volume_info should not block the fifo response.
2086        let _ = tx.send(());
2087
2088        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2089    }
2090
2091    #[fuchsia::test]
2092    async fn test_request_flow_control() {
2093        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2094
2095        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2096        // server will block until that happens.
2097        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2098        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2099        let event_clone = event.clone();
2100        futures::join!(
2101            async move {
2102                let block_server = BlockServer::new(
2103                    BLOCK_SIZE,
2104                    Arc::new(MockInterface {
2105                        read_hook: Some(Box::new(move |_, _, _, _| {
2106                            let event_clone = event_clone.clone();
2107                            Box::pin(async move {
2108                                if !event_clone.1.load(Ordering::SeqCst) {
2109                                    event_clone.0.listen().await;
2110                                }
2111                                Ok(())
2112                            })
2113                        })),
2114                    }),
2115                );
2116                block_server.handle_requests(stream).await.unwrap();
2117            },
2118            async move {
2119                let (session_proxy, server) = fidl::endpoints::create_proxy();
2120
2121                proxy.open_session(server).unwrap();
2122
2123                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2124                let vmo_id = session_proxy
2125                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2126                    .await
2127                    .unwrap()
2128                    .unwrap();
2129
2130                let mut fifo =
2131                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2132                let (mut reader, mut writer) = fifo.async_io();
2133
2134                for i in 0..MAX_REQUESTS {
2135                    writer
2136                        .write_entries(&BlockFifoRequest {
2137                            command: BlockFifoCommand {
2138                                opcode: BlockOpcode::Read.into_primitive(),
2139                                ..Default::default()
2140                            },
2141                            reqid: (i + 1) as u32,
2142                            dev_offset: i,
2143                            vmoid: vmo_id.id,
2144                            length: 1,
2145                            ..Default::default()
2146                        })
2147                        .await
2148                        .unwrap();
2149                }
2150                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2151                    command: BlockFifoCommand {
2152                        opcode: BlockOpcode::Read.into_primitive(),
2153                        ..Default::default()
2154                    },
2155                    reqid: u32::MAX,
2156                    dev_offset: MAX_REQUESTS,
2157                    vmoid: vmo_id.id,
2158                    length: 1,
2159                    ..Default::default()
2160                })))
2161                .is_pending());
2162                // OK, let the server start to process.
2163                event.1.store(true, Ordering::SeqCst);
2164                event.0.notify(usize::MAX);
2165                // For each entry we read, make sure we can write a new one in.
2166                let mut finished_reqids = vec![];
2167                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2168                    let mut response = BlockFifoResponse::default();
2169                    reader.read_entries(&mut response).await.unwrap();
2170                    assert_eq!(response.status, zx::sys::ZX_OK);
2171                    finished_reqids.push(response.reqid);
2172                    writer
2173                        .write_entries(&BlockFifoRequest {
2174                            command: BlockFifoCommand {
2175                                opcode: BlockOpcode::Read.into_primitive(),
2176                                ..Default::default()
2177                            },
2178                            reqid: (i + 1) as u32,
2179                            dev_offset: i,
2180                            vmoid: vmo_id.id,
2181                            length: 1,
2182                            ..Default::default()
2183                        })
2184                        .await
2185                        .unwrap();
2186                }
2187                let mut response = BlockFifoResponse::default();
2188                for _ in 0..MAX_REQUESTS {
2189                    reader.read_entries(&mut response).await.unwrap();
2190                    assert_eq!(response.status, zx::sys::ZX_OK);
2191                    finished_reqids.push(response.reqid);
2192                }
2193                // Verify that we got a response for each request.  Note that we can't assume FIFO
2194                // ordering.
2195                finished_reqids.sort();
2196                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2197                let mut i = 1;
2198                for reqid in finished_reqids {
2199                    assert_eq!(reqid, i);
2200                    i += 1;
2201                }
2202            }
2203        );
2204    }
2205
2206    #[fuchsia::test]
2207    async fn test_passthrough_io_with_fixed_map() {
2208        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2209
2210        let expected_op = Arc::new(Mutex::new(None));
2211        let expected_op_clone = expected_op.clone();
2212        futures::join!(
2213            async {
2214                let block_server = BlockServer::new(
2215                    BLOCK_SIZE,
2216                    Arc::new(IoMockInterface {
2217                        return_errors: false,
2218                        do_checks: true,
2219                        expected_op: expected_op_clone,
2220                    }),
2221                );
2222                block_server.handle_requests(stream).await.unwrap();
2223            },
2224            async move {
2225                let (session_proxy, server) = fidl::endpoints::create_proxy();
2226
2227                let mappings = [fblock::BlockOffsetMapping {
2228                    source_block_offset: 0,
2229                    target_block_offset: 10,
2230                    length: 20,
2231                }];
2232                proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2233
2234                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2235                let vmo_id = session_proxy
2236                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2237                    .await
2238                    .unwrap()
2239                    .unwrap();
2240
2241                let mut fifo =
2242                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2243                let (mut reader, mut writer) = fifo.async_io();
2244
2245                // READ
2246                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2247                writer
2248                    .write_entries(&BlockFifoRequest {
2249                        command: BlockFifoCommand {
2250                            opcode: BlockOpcode::Read.into_primitive(),
2251                            ..Default::default()
2252                        },
2253                        vmoid: vmo_id.id,
2254                        dev_offset: 1,
2255                        length: 2,
2256                        vmo_offset: 3,
2257                        ..Default::default()
2258                    })
2259                    .await
2260                    .unwrap();
2261
2262                let mut response = BlockFifoResponse::default();
2263                reader.read_entries(&mut response).await.unwrap();
2264                assert_eq!(response.status, zx::sys::ZX_OK);
2265
2266                // WRITE
2267                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2268                writer
2269                    .write_entries(&BlockFifoRequest {
2270                        command: BlockFifoCommand {
2271                            opcode: BlockOpcode::Write.into_primitive(),
2272                            ..Default::default()
2273                        },
2274                        vmoid: vmo_id.id,
2275                        dev_offset: 4,
2276                        length: 5,
2277                        vmo_offset: 6,
2278                        ..Default::default()
2279                    })
2280                    .await
2281                    .unwrap();
2282
2283                reader.read_entries(&mut response).await.unwrap();
2284                assert_eq!(response.status, zx::sys::ZX_OK);
2285
2286                // FLUSH
2287                *expected_op.lock() = Some(ExpectedOp::Flush);
2288                writer
2289                    .write_entries(&BlockFifoRequest {
2290                        command: BlockFifoCommand {
2291                            opcode: BlockOpcode::Flush.into_primitive(),
2292                            ..Default::default()
2293                        },
2294                        ..Default::default()
2295                    })
2296                    .await
2297                    .unwrap();
2298
2299                reader.read_entries(&mut response).await.unwrap();
2300                assert_eq!(response.status, zx::sys::ZX_OK);
2301
2302                // TRIM
2303                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2304                writer
2305                    .write_entries(&BlockFifoRequest {
2306                        command: BlockFifoCommand {
2307                            opcode: BlockOpcode::Trim.into_primitive(),
2308                            ..Default::default()
2309                        },
2310                        dev_offset: 7,
2311                        length: 3,
2312                        ..Default::default()
2313                    })
2314                    .await
2315                    .unwrap();
2316
2317                reader.read_entries(&mut response).await.unwrap();
2318                assert_eq!(response.status, zx::sys::ZX_OK);
2319
2320                // READ past window
2321                *expected_op.lock() = None;
2322                writer
2323                    .write_entries(&BlockFifoRequest {
2324                        command: BlockFifoCommand {
2325                            opcode: BlockOpcode::Read.into_primitive(),
2326                            ..Default::default()
2327                        },
2328                        vmoid: vmo_id.id,
2329                        dev_offset: 19,
2330                        length: 2,
2331                        vmo_offset: 3,
2332                        ..Default::default()
2333                    })
2334                    .await
2335                    .unwrap();
2336
2337                reader.read_entries(&mut response).await.unwrap();
2338                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2339
2340                std::mem::drop(proxy);
2341            }
2342        );
2343    }
2344}