block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use async_trait::async_trait;
13use fidl::endpoints::ServerEnd;
14use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
15use fidl_fuchsia_hardware_block_partition::PartitionProxy;
16use fidl_fuchsia_hardware_block_volume::VolumeProxy;
17use fuchsia_sync::Mutex;
18use futures::channel::oneshot;
19use futures::executor::block_on;
20use lazy_static::lazy_static;
21use std::collections::HashMap;
22use std::future::Future;
23use std::hash::{Hash, Hasher};
24use std::mem::MaybeUninit;
25use std::num::NonZero;
26use std::ops::{DerefMut, Range};
27use std::pin::Pin;
28use std::sync::atomic::{AtomicU16, Ordering};
29use std::sync::Arc;
30use std::task::{Context, Poll, Waker};
31use zx::sys::zx_handle_t;
32use zx::{self as zx, HandleBased as _};
33use {
34    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
35    fuchsia_async as fasync, storage_trace as trace,
36};
37
38pub use cache::Cache;
39
40pub use block::Flag as BlockFlags;
41
42pub use block_protocol::*;
43
44pub mod cache;
45
46const TEMP_VMO_SIZE: usize = 65536;
47
48/// If a trace flow ID isn't specified for requests, one will be generated.
49pub const NO_TRACE_ID: u64 = 0;
50
51pub use block_driver::{BlockIoFlag, BlockOpcode};
52
53fn fidl_to_status(error: fidl::Error) -> zx::Status {
54    match error {
55        fidl::Error::ClientChannelClosed { status, .. } => status,
56        _ => zx::Status::INTERNAL,
57    }
58}
59
60fn opcode_str(opcode: u8) -> &'static str {
61    match BlockOpcode::from_primitive(opcode) {
62        Some(BlockOpcode::Read) => "read",
63        Some(BlockOpcode::Write) => "write",
64        Some(BlockOpcode::Flush) => "flush",
65        Some(BlockOpcode::Trim) => "trim",
66        Some(BlockOpcode::CloseVmo) => "close_vmo",
67        None => "unknown",
68    }
69}
70
71// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
72// reused within this process).
73fn generate_trace_flow_id(request_id: u32) -> u64 {
74    lazy_static! {
75        static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
76    };
77    *SELF_HANDLE as u64 + (request_id as u64) << 32
78}
79
80pub enum BufferSlice<'a> {
81    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
82    Memory(&'a [u8]),
83}
84
85impl<'a> BufferSlice<'a> {
86    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
87        BufferSlice::VmoId { vmo_id, offset, length }
88    }
89}
90
91impl<'a> From<&'a [u8]> for BufferSlice<'a> {
92    fn from(buf: &'a [u8]) -> Self {
93        BufferSlice::Memory(buf)
94    }
95}
96
97pub enum MutableBufferSlice<'a> {
98    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
99    Memory(&'a mut [u8]),
100}
101
102impl<'a> MutableBufferSlice<'a> {
103    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
104        MutableBufferSlice::VmoId { vmo_id, offset, length }
105    }
106}
107
108impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
109    fn from(buf: &'a mut [u8]) -> Self {
110        MutableBufferSlice::Memory(buf)
111    }
112}
113
114#[derive(Default)]
115struct RequestState {
116    result: Option<zx::Status>,
117    waker: Option<Waker>,
118}
119
120#[derive(Default)]
121struct FifoState {
122    // The fifo.
123    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
124
125    // The next request ID to be used.
126    next_request_id: u32,
127
128    // A queue of messages to be sent on the fifo.
129    queue: std::collections::VecDeque<BlockFifoRequest>,
130
131    // Map from request ID to RequestState.
132    map: HashMap<u32, RequestState>,
133
134    // The waker for the FifoPoller.
135    poller_waker: Option<Waker>,
136
137    // If set, attach a barrier to the next write request
138    attach_barrier: bool,
139}
140
141impl FifoState {
142    fn terminate(&mut self) {
143        self.fifo.take();
144        for (_, request_state) in self.map.iter_mut() {
145            request_state.result.get_or_insert(zx::Status::CANCELED);
146            if let Some(waker) = request_state.waker.take() {
147                waker.wake();
148            }
149        }
150        if let Some(waker) = self.poller_waker.take() {
151            waker.wake();
152        }
153    }
154
155    // Returns true if polling should be terminated.
156    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
157        let fifo = if let Some(fifo) = self.fifo.as_ref() {
158            fifo
159        } else {
160            return true;
161        };
162
163        loop {
164            let slice = self.queue.as_slices().0;
165            if slice.is_empty() {
166                return false;
167            }
168            match fifo.try_write(context, slice) {
169                Poll::Ready(Ok(sent)) => {
170                    self.queue.drain(0..sent);
171                }
172                Poll::Ready(Err(_)) => {
173                    self.terminate();
174                    return true;
175                }
176                Poll::Pending => {
177                    return false;
178                }
179            }
180        }
181    }
182}
183
184type FifoStateRef = Arc<Mutex<FifoState>>;
185
186// A future used for fifo responses.
187struct ResponseFuture {
188    request_id: u32,
189    fifo_state: FifoStateRef,
190}
191
192impl ResponseFuture {
193    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
194        ResponseFuture { request_id, fifo_state }
195    }
196}
197
198impl Future for ResponseFuture {
199    type Output = Result<(), zx::Status>;
200
201    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
202        let mut state = self.fifo_state.lock();
203        let request_state = state.map.get_mut(&self.request_id).unwrap();
204        if let Some(result) = request_state.result {
205            Poll::Ready(result.into())
206        } else {
207            request_state.waker.replace(context.waker().clone());
208            Poll::Pending
209        }
210    }
211}
212
213impl Drop for ResponseFuture {
214    fn drop(&mut self) {
215        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
216    }
217}
218
219/// Wraps a vmo-id. Will panic if you forget to detach.
220#[derive(Debug)]
221#[must_use]
222pub struct VmoId(AtomicU16);
223
224impl VmoId {
225    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
226    pub fn new(id: u16) -> Self {
227        Self(AtomicU16::new(id))
228    }
229
230    /// Invalidates self and returns a new VmoId with the same underlying ID.
231    pub fn take(&self) -> Self {
232        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
233    }
234
235    pub fn is_valid(&self) -> bool {
236        self.id() != block_driver::BLOCK_VMOID_INVALID
237    }
238
239    /// Takes the ID.  The caller assumes responsibility for detaching.
240    #[must_use]
241    pub fn into_id(self) -> u16 {
242        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
243    }
244
245    pub fn id(&self) -> u16 {
246        self.0.load(Ordering::Relaxed)
247    }
248}
249
250impl PartialEq for VmoId {
251    fn eq(&self, other: &Self) -> bool {
252        self.id() == other.id()
253    }
254}
255
256impl Eq for VmoId {}
257
258impl Drop for VmoId {
259    fn drop(&mut self) {
260        assert_eq!(
261            self.0.load(Ordering::Relaxed),
262            block_driver::BLOCK_VMOID_INVALID,
263            "Did you forget to detach?"
264        );
265    }
266}
267
268impl Hash for VmoId {
269    fn hash<H: Hasher>(&self, state: &mut H) {
270        self.id().hash(state);
271    }
272}
273
274/// Represents a client connection to a block device. This is a simplified version of the block.fidl
275/// interface.
276/// Most users will use the RemoteBlockClient instantiation of this trait.
277#[async_trait]
278pub trait BlockClient: Send + Sync {
279    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
280    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
281
282    /// Detaches the given vmo-id from the device.
283    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
284
285    /// Reads from the device at |device_offset| into the given buffer slice.
286    async fn read_at(
287        &self,
288        buffer_slice: MutableBufferSlice<'_>,
289        device_offset: u64,
290    ) -> Result<(), zx::Status> {
291        self.read_at_traced(buffer_slice, device_offset, 0).await
292    }
293
294    async fn read_at_traced(
295        &self,
296        buffer_slice: MutableBufferSlice<'_>,
297        device_offset: u64,
298        trace_flow_id: u64,
299    ) -> Result<(), zx::Status>;
300
301    /// Writes the data in |buffer_slice| to the device.
302    async fn write_at(
303        &self,
304        buffer_slice: BufferSlice<'_>,
305        device_offset: u64,
306    ) -> Result<(), zx::Status> {
307        self.write_at_with_opts_traced(
308            buffer_slice,
309            device_offset,
310            WriteOptions::empty(),
311            NO_TRACE_ID,
312        )
313        .await
314    }
315
316    async fn write_at_with_opts(
317        &self,
318        buffer_slice: BufferSlice<'_>,
319        device_offset: u64,
320        opts: WriteOptions,
321    ) -> Result<(), zx::Status> {
322        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
323    }
324
325    async fn write_at_with_opts_traced(
326        &self,
327        buffer_slice: BufferSlice<'_>,
328        device_offset: u64,
329        opts: WriteOptions,
330        trace_flow_id: u64,
331    ) -> Result<(), zx::Status>;
332
333    /// Trims the given range on the block device.
334    async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
335        self.trim_traced(device_range, NO_TRACE_ID).await
336    }
337
338    async fn trim_traced(
339        &self,
340        device_range: Range<u64>,
341        trace_flow_id: u64,
342    ) -> Result<(), zx::Status>;
343
344    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
345    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
346    /// This method makes it easier to guarantee that the barrier is attached to the correct
347    /// write operation when subsequent write operations can get reordered.
348    fn barrier(&self);
349
350    async fn flush(&self) -> Result<(), zx::Status> {
351        self.flush_traced(NO_TRACE_ID).await
352    }
353
354    /// Sends a flush request to the underlying block device.
355    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
356
357    /// Closes the fifo.
358    async fn close(&self) -> Result<(), zx::Status>;
359
360    /// Returns the block size of the device.
361    fn block_size(&self) -> u32;
362
363    /// Returns the size, in blocks, of the device.
364    fn block_count(&self) -> u64;
365
366    /// Returns the maximum number of blocks which can be transferred in a single request.
367    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
368
369    /// Returns the block flags reported by the device.
370    fn block_flags(&self) -> BlockFlags;
371
372    /// Returns true if the remote fifo is still connected.
373    fn is_connected(&self) -> bool;
374}
375
376struct Common {
377    block_size: u32,
378    block_count: u64,
379    max_transfer_blocks: Option<NonZero<u32>>,
380    block_flags: BlockFlags,
381    fifo_state: FifoStateRef,
382    temp_vmo: futures::lock::Mutex<zx::Vmo>,
383    temp_vmo_id: VmoId,
384}
385
386impl Common {
387    fn new(
388        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
389        info: &block::BlockInfo,
390        temp_vmo: zx::Vmo,
391        temp_vmo_id: VmoId,
392    ) -> Self {
393        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
394        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
395        Self {
396            block_size: info.block_size,
397            block_count: info.block_count,
398            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
399                NonZero::new(info.max_transfer_size / info.block_size)
400            } else {
401                None
402            },
403            block_flags: info.flags,
404            fifo_state,
405            temp_vmo: futures::lock::Mutex::new(temp_vmo),
406            temp_vmo_id,
407        }
408    }
409
410    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
411        if bytes % self.block_size as u64 != 0 {
412            Err(zx::Status::INVALID_ARGS)
413        } else {
414            Ok(bytes / self.block_size as u64)
415        }
416    }
417
418    // Sends the request and waits for the response.
419    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
420        async move {
421            let (request_id, trace_flow_id) = {
422                let mut state = self.fifo_state.lock();
423
424                let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
425                if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
426                    && state.attach_barrier
427                {
428                    flags |= BlockIoFlag::PRE_BARRIER;
429                    request.command.flags = flags.bits();
430                    state.attach_barrier = false;
431                }
432
433                if state.fifo.is_none() {
434                    // Fifo has been closed.
435                    return Err(zx::Status::CANCELED);
436                }
437                trace::duration!(
438                    c"storage",
439                    c"block_client::send::start",
440                    "op" => opcode_str(request.command.opcode)
441                );
442                let request_id = state.next_request_id;
443                state.next_request_id = state.next_request_id.overflowing_add(1).0;
444                assert!(
445                    state.map.insert(request_id, RequestState::default()).is_none(),
446                    "request id in use!"
447                );
448                request.reqid = request_id;
449                if request.trace_flow_id == NO_TRACE_ID {
450                    request.trace_flow_id = generate_trace_flow_id(request_id);
451                }
452                let trace_flow_id = request.trace_flow_id;
453                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
454                state.queue.push_back(request);
455                if let Some(waker) = state.poller_waker.clone() {
456                    state.poll_send_requests(&mut Context::from_waker(&waker));
457                }
458                (request_id, trace_flow_id)
459            };
460            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
461            trace::duration!(c"storage", c"block_client::send::end");
462            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
463            Ok(())
464        }
465        .await
466    }
467
468    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
469        self.send(BlockFifoRequest {
470            command: BlockFifoCommand {
471                opcode: BlockOpcode::CloseVmo.into_primitive(),
472                flags: 0,
473                ..Default::default()
474            },
475            vmoid: vmo_id.into_id(),
476            ..Default::default()
477        })
478        .await
479    }
480
481    async fn read_at(
482        &self,
483        buffer_slice: MutableBufferSlice<'_>,
484        device_offset: u64,
485        trace_flow_id: u64,
486    ) -> Result<(), zx::Status> {
487        match buffer_slice {
488            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
489                self.send(BlockFifoRequest {
490                    command: BlockFifoCommand {
491                        opcode: BlockOpcode::Read.into_primitive(),
492                        flags: 0,
493                        ..Default::default()
494                    },
495                    vmoid: vmo_id.id(),
496                    length: self
497                        .to_blocks(length)?
498                        .try_into()
499                        .map_err(|_| zx::Status::INVALID_ARGS)?,
500                    vmo_offset: self.to_blocks(offset)?,
501                    dev_offset: self.to_blocks(device_offset)?,
502                    trace_flow_id,
503                    ..Default::default()
504                })
505                .await?
506            }
507            MutableBufferSlice::Memory(mut slice) => {
508                let temp_vmo = self.temp_vmo.lock().await;
509                let mut device_block = self.to_blocks(device_offset)?;
510                loop {
511                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
512                    let block_count = self.to_blocks(to_do as u64)? as u32;
513                    self.send(BlockFifoRequest {
514                        command: BlockFifoCommand {
515                            opcode: BlockOpcode::Read.into_primitive(),
516                            flags: 0,
517                            ..Default::default()
518                        },
519                        vmoid: self.temp_vmo_id.id(),
520                        length: block_count,
521                        vmo_offset: 0,
522                        dev_offset: device_block,
523                        trace_flow_id,
524                        ..Default::default()
525                    })
526                    .await?;
527                    temp_vmo.read(&mut slice[..to_do], 0)?;
528                    if to_do == slice.len() {
529                        break;
530                    }
531                    device_block += block_count as u64;
532                    slice = &mut slice[to_do..];
533                }
534            }
535        }
536        Ok(())
537    }
538
539    async fn write_at(
540        &self,
541        buffer_slice: BufferSlice<'_>,
542        device_offset: u64,
543        opts: WriteOptions,
544        trace_flow_id: u64,
545    ) -> Result<(), zx::Status> {
546        let mut flags = BlockIoFlag::empty();
547
548        if opts.contains(WriteOptions::FORCE_ACCESS) {
549            flags |= BlockIoFlag::FORCE_ACCESS;
550        }
551
552        if opts.contains(WriteOptions::PRE_BARRIER) {
553            flags |= BlockIoFlag::PRE_BARRIER;
554        }
555
556        match buffer_slice {
557            BufferSlice::VmoId { vmo_id, offset, length } => {
558                self.send(BlockFifoRequest {
559                    command: BlockFifoCommand {
560                        opcode: BlockOpcode::Write.into_primitive(),
561                        flags: flags.bits(),
562                        ..Default::default()
563                    },
564                    vmoid: vmo_id.id(),
565                    length: self
566                        .to_blocks(length)?
567                        .try_into()
568                        .map_err(|_| zx::Status::INVALID_ARGS)?,
569                    vmo_offset: self.to_blocks(offset)?,
570                    dev_offset: self.to_blocks(device_offset)?,
571                    trace_flow_id,
572                    ..Default::default()
573                })
574                .await?;
575            }
576            BufferSlice::Memory(mut slice) => {
577                let temp_vmo = self.temp_vmo.lock().await;
578                let mut device_block = self.to_blocks(device_offset)?;
579                loop {
580                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
581                    let block_count = self.to_blocks(to_do as u64)? as u32;
582                    temp_vmo.write(&slice[..to_do], 0)?;
583                    self.send(BlockFifoRequest {
584                        command: BlockFifoCommand {
585                            opcode: BlockOpcode::Write.into_primitive(),
586                            flags: flags.bits(),
587                            ..Default::default()
588                        },
589                        vmoid: self.temp_vmo_id.id(),
590                        length: block_count,
591                        vmo_offset: 0,
592                        dev_offset: device_block,
593                        trace_flow_id,
594                        ..Default::default()
595                    })
596                    .await?;
597                    if to_do == slice.len() {
598                        break;
599                    }
600                    device_block += block_count as u64;
601                    slice = &slice[to_do..];
602                }
603            }
604        }
605        Ok(())
606    }
607
608    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
609        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
610        let dev_offset = self.to_blocks(device_range.start)?;
611        self.send(BlockFifoRequest {
612            command: BlockFifoCommand {
613                opcode: BlockOpcode::Trim.into_primitive(),
614                flags: 0,
615                ..Default::default()
616            },
617            vmoid: block_driver::BLOCK_VMOID_INVALID,
618            length,
619            dev_offset,
620            trace_flow_id,
621            ..Default::default()
622        })
623        .await
624    }
625
626    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
627        self.send(BlockFifoRequest {
628            command: BlockFifoCommand {
629                opcode: BlockOpcode::Flush.into_primitive(),
630                flags: 0,
631                ..Default::default()
632            },
633            vmoid: block_driver::BLOCK_VMOID_INVALID,
634            trace_flow_id,
635            ..Default::default()
636        })
637        .await
638    }
639
640    fn barrier(&self) {
641        self.fifo_state.lock().attach_barrier = true;
642    }
643
644    fn block_size(&self) -> u32 {
645        self.block_size
646    }
647
648    fn block_count(&self) -> u64 {
649        self.block_count
650    }
651
652    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
653        self.max_transfer_blocks.clone()
654    }
655
656    fn block_flags(&self) -> BlockFlags {
657        self.block_flags
658    }
659
660    fn is_connected(&self) -> bool {
661        self.fifo_state.lock().fifo.is_some()
662    }
663}
664
665impl Drop for Common {
666    fn drop(&mut self) {
667        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
668        // down.
669        let _ = self.temp_vmo_id.take().into_id();
670        self.fifo_state.lock().terminate();
671    }
672}
673
674/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
675pub struct RemoteBlockClient {
676    session: block::SessionProxy,
677    common: Common,
678}
679
680pub trait AsBlockProxy {
681    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
682
683    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
684}
685
686impl<T: AsBlockProxy> AsBlockProxy for &T {
687    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
688        AsBlockProxy::get_info(*self)
689    }
690    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
691        AsBlockProxy::open_session(*self, session)
692    }
693}
694
695macro_rules! impl_as_block_proxy {
696    ($name:ident) => {
697        impl AsBlockProxy for $name {
698            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
699                $name::get_info(self).await
700            }
701
702            fn open_session(
703                &self,
704                session: ServerEnd<block::SessionMarker>,
705            ) -> Result<(), fidl::Error> {
706                $name::open_session(self, session)
707            }
708        }
709    };
710}
711
712impl_as_block_proxy!(BlockProxy);
713impl_as_block_proxy!(PartitionProxy);
714impl_as_block_proxy!(VolumeProxy);
715
716impl RemoteBlockClient {
717    /// Returns a connection to a remote block device via the given channel.
718    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
719        let info =
720            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
721        let (session, server) = fidl::endpoints::create_proxy();
722        let () = remote.open_session(server).map_err(fidl_to_status)?;
723        Self::from_session(info, session).await
724    }
725
726    pub async fn from_session(
727        info: block::BlockInfo,
728        session: block::SessionProxy,
729    ) -> Result<Self, zx::Status> {
730        let fifo =
731            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
732        let fifo = fasync::Fifo::from_fifo(fifo);
733        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
734        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
735        let vmo_id =
736            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
737        let vmo_id = VmoId::new(vmo_id.id);
738        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
739    }
740}
741
742#[async_trait]
743impl BlockClient for RemoteBlockClient {
744    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
745        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
746        let vmo_id = self
747            .session
748            .attach_vmo(dup)
749            .await
750            .map_err(fidl_to_status)?
751            .map_err(zx::Status::from_raw)?;
752        Ok(VmoId::new(vmo_id.id))
753    }
754
755    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
756        self.common.detach_vmo(vmo_id).await
757    }
758
759    async fn read_at_traced(
760        &self,
761        buffer_slice: MutableBufferSlice<'_>,
762        device_offset: u64,
763        trace_flow_id: u64,
764    ) -> Result<(), zx::Status> {
765        self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
766    }
767
768    async fn write_at_with_opts_traced(
769        &self,
770        buffer_slice: BufferSlice<'_>,
771        device_offset: u64,
772        opts: WriteOptions,
773        trace_flow_id: u64,
774    ) -> Result<(), zx::Status> {
775        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
776    }
777
778    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
779        self.common.trim(range, trace_flow_id).await
780    }
781
782    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
783        self.common.flush(trace_flow_id).await
784    }
785
786    fn barrier(&self) {
787        self.common.barrier()
788    }
789
790    async fn close(&self) -> Result<(), zx::Status> {
791        let () =
792            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
793        Ok(())
794    }
795
796    fn block_size(&self) -> u32 {
797        self.common.block_size()
798    }
799
800    fn block_count(&self) -> u64 {
801        self.common.block_count()
802    }
803
804    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
805        self.common.max_transfer_blocks()
806    }
807
808    fn block_flags(&self) -> BlockFlags {
809        self.common.block_flags()
810    }
811
812    fn is_connected(&self) -> bool {
813        self.common.is_connected()
814    }
815}
816
817pub struct RemoteBlockClientSync {
818    session: block::SessionSynchronousProxy,
819    common: Common,
820}
821
822impl RemoteBlockClientSync {
823    /// Returns a connection to a remote block device via the given channel, but spawns a separate
824    /// thread for polling the fifo which makes it work in cases where no executor is configured for
825    /// the calling thread.
826    pub fn new(
827        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
828    ) -> Result<Self, zx::Status> {
829        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
830        let info = remote
831            .get_info(zx::MonotonicInstant::INFINITE)
832            .map_err(fidl_to_status)?
833            .map_err(zx::Status::from_raw)?;
834        let (client, server) = fidl::endpoints::create_endpoints();
835        let () = remote.open_session(server).map_err(fidl_to_status)?;
836        let session = block::SessionSynchronousProxy::new(client.into_channel());
837        let fifo = session
838            .get_fifo(zx::MonotonicInstant::INFINITE)
839            .map_err(fidl_to_status)?
840            .map_err(zx::Status::from_raw)?;
841        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
842        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
843        let vmo_id = session
844            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
845            .map_err(fidl_to_status)?
846            .map_err(zx::Status::from_raw)?;
847        let vmo_id = VmoId::new(vmo_id.id);
848
849        // The fifo needs to be instantiated from the thread that has the executor as that's where
850        // the fifo registers for notifications to be delivered.
851        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
852        std::thread::spawn(move || {
853            let mut executor = fasync::LocalExecutor::new();
854            let fifo = fasync::Fifo::from_fifo(fifo);
855            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
856            let fifo_state = common.fifo_state.clone();
857            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
858            executor.run_singlethreaded(FifoPoller { fifo_state });
859        });
860        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
861    }
862
863    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
864        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
865        let vmo_id = self
866            .session
867            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
868            .map_err(fidl_to_status)?
869            .map_err(zx::Status::from_raw)?;
870        Ok(VmoId::new(vmo_id.id))
871    }
872
873    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
874        block_on(self.common.detach_vmo(vmo_id))
875    }
876
877    pub fn read_at(
878        &self,
879        buffer_slice: MutableBufferSlice<'_>,
880        device_offset: u64,
881    ) -> Result<(), zx::Status> {
882        block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
883    }
884
885    pub fn write_at(
886        &self,
887        buffer_slice: BufferSlice<'_>,
888        device_offset: u64,
889    ) -> Result<(), zx::Status> {
890        block_on(self.common.write_at(
891            buffer_slice,
892            device_offset,
893            WriteOptions::empty(),
894            NO_TRACE_ID,
895        ))
896    }
897
898    pub fn flush(&self) -> Result<(), zx::Status> {
899        block_on(self.common.flush(NO_TRACE_ID))
900    }
901
902    pub fn close(&self) -> Result<(), zx::Status> {
903        let () = self
904            .session
905            .close(zx::MonotonicInstant::INFINITE)
906            .map_err(fidl_to_status)?
907            .map_err(zx::Status::from_raw)?;
908        Ok(())
909    }
910
911    pub fn block_size(&self) -> u32 {
912        self.common.block_size()
913    }
914
915    pub fn block_count(&self) -> u64 {
916        self.common.block_count()
917    }
918
919    pub fn is_connected(&self) -> bool {
920        self.common.is_connected()
921    }
922}
923
924impl Drop for RemoteBlockClientSync {
925    fn drop(&mut self) {
926        // Ignore errors here as there is not much we can do about it.
927        let _ = self.close();
928    }
929}
930
931// FifoPoller is a future responsible for sending and receiving from the fifo.
932struct FifoPoller {
933    fifo_state: FifoStateRef,
934}
935
936impl Future for FifoPoller {
937    type Output = ();
938
939    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
940        let mut state_lock = self.fifo_state.lock();
941        let state = state_lock.deref_mut(); // So that we can split the borrow.
942
943        // Send requests.
944        if state.poll_send_requests(context) {
945            return Poll::Ready(());
946        }
947
948        // Receive responses.
949        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
950        loop {
951            let mut response = MaybeUninit::uninit();
952            match fifo.try_read(context, &mut response) {
953                Poll::Pending => {
954                    state.poller_waker = Some(context.waker().clone());
955                    return Poll::Pending;
956                }
957                Poll::Ready(Ok(_)) => {
958                    let response = unsafe { response.assume_init() };
959                    let request_id = response.reqid;
960                    // If the request isn't in the map, assume that it's a cancelled read.
961                    if let Some(request_state) = state.map.get_mut(&request_id) {
962                        request_state.result.replace(zx::Status::from_raw(response.status));
963                        if let Some(waker) = request_state.waker.take() {
964                            waker.wake();
965                        }
966                    }
967                }
968                Poll::Ready(Err(_)) => {
969                    state.terminate();
970                    return Poll::Ready(());
971                }
972            }
973        }
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use super::{
980        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
981        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
982    };
983    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
984    use fidl::endpoints::RequestStream as _;
985    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
986    use futures::join;
987    use futures::stream::futures_unordered::FuturesUnordered;
988    use futures::stream::StreamExt as _;
989    use ramdevice_client::RamdiskClient;
990    use std::borrow::Cow;
991    use std::num::NonZero;
992    use std::sync::atomic::{AtomicBool, Ordering};
993    use std::sync::Arc;
994    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
995
996    const RAMDISK_BLOCK_SIZE: u64 = 1024;
997    const RAMDISK_BLOCK_COUNT: u64 = 1024;
998
999    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1000        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1001            .await
1002            .expect("RamdiskClient::create failed");
1003        let client_end = ramdisk.open().expect("ramdisk.open failed");
1004        let proxy = client_end.into_proxy();
1005        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1006        assert_eq!(block_client.block_size(), 1024);
1007        let client_end = ramdisk.open().expect("ramdisk.open failed");
1008        let proxy = client_end.into_proxy();
1009        (ramdisk, proxy, block_client)
1010    }
1011
1012    #[fuchsia::test]
1013    async fn test_against_ram_disk() {
1014        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1015
1016        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1017        vmo.write(b"hello", 5).expect("vmo.write failed");
1018        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1019        block_client
1020            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1021            .await
1022            .expect("write_at failed");
1023        block_client
1024            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1025            .await
1026            .expect("read_at failed");
1027        let mut buf: [u8; 5] = Default::default();
1028        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1029        assert_eq!(&buf, b"hello");
1030        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1031    }
1032
1033    #[fuchsia::test]
1034    async fn test_alignment() {
1035        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1036        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1037        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1038        block_client
1039            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1040            .await
1041            .expect_err("expected failure due to bad alignment");
1042        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1043    }
1044
1045    #[fuchsia::test]
1046    async fn test_parallel_io() {
1047        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1048        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1049        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1050        let mut reads = Vec::new();
1051        for _ in 0..1024 {
1052            reads.push(
1053                block_client
1054                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1055                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1056            );
1057        }
1058        futures::future::join_all(reads).await;
1059        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1060    }
1061
1062    #[fuchsia::test]
1063    async fn test_closed_device() {
1064        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1065        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1066        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1067        let mut reads = Vec::new();
1068        for _ in 0..1024 {
1069            reads.push(
1070                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1071            );
1072        }
1073        assert!(block_client.is_connected());
1074        let _ = futures::join!(futures::future::join_all(reads), async {
1075            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1076        });
1077        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1078        while block_client
1079            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1080            .await
1081            .is_ok()
1082        {}
1083
1084        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1085        // get false-positives from is_connected.
1086        while block_client.is_connected() {
1087            // Sleep for a bit to minimise lock contention.
1088            fasync::Timer::new(fasync::MonotonicInstant::after(
1089                zx::MonotonicDuration::from_millis(500),
1090            ))
1091            .await;
1092        }
1093
1094        // But once is_connected goes negative, it should stay negative.
1095        assert_eq!(block_client.is_connected(), false);
1096        let _ = block_client.detach_vmo(vmo_id).await;
1097    }
1098
1099    #[fuchsia::test]
1100    async fn test_cancelled_reads() {
1101        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1102        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1103        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1104        {
1105            let mut reads = FuturesUnordered::new();
1106            for _ in 0..1024 {
1107                reads.push(
1108                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1109                );
1110            }
1111            // Read the first 500 results and then dump the rest.
1112            for _ in 0..500 {
1113                reads.next().await;
1114            }
1115        }
1116        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1117    }
1118
1119    #[fuchsia::test]
1120    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1121        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1122        let block_client_ref = &block_client;
1123        let test_one = |offset, len, fill| async move {
1124            let buf = vec![fill; len];
1125            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1126            // Read back an extra block either side.
1127            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1128            block_client_ref
1129                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1130                .await
1131                .expect("read_at failed");
1132            assert_eq!(
1133                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1134                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1135            );
1136            assert_eq!(
1137                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1138                &buf[..]
1139            );
1140            assert_eq!(
1141                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1142                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1143            );
1144        };
1145        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1146        join!(
1147            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1148            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1149        );
1150    }
1151
1152    // Implements dummy server which can be used by test cases to verify whether
1153    // channel messages and fifo operations are being received - by using set_channel_handler or
1154    // set_fifo_hander respectively
1155    struct FakeBlockServer<'a> {
1156        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1157        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1158        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1159    }
1160
1161    impl<'a> FakeBlockServer<'a> {
1162        // Creates a new FakeBlockServer given a channel to listen on.
1163        //
1164        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1165        // handles requests received from channel or the fifo respectfully.
1166        //
1167        // 'channel_handler' receives a message before it is handled by the default implementation
1168        // and can return 'true' to indicate all processing is done and no further processing of
1169        // that message is required
1170        //
1171        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1172        // FakeBlockServer will send over the fifo.
1173        fn new(
1174            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1175            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1176            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1177        ) -> FakeBlockServer<'a> {
1178            FakeBlockServer {
1179                server_channel: Some(server_channel),
1180                channel_handler: Box::new(channel_handler),
1181                fifo_handler: Box::new(fifo_handler),
1182            }
1183        }
1184
1185        // Runs the server
1186        async fn run(&mut self) {
1187            let server = self.server_channel.take().unwrap();
1188
1189            // Set up a mock server.
1190            let (server_fifo, client_fifo) =
1191                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1192                    .expect("Fifo::create failed");
1193            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1194
1195            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1196            let fifo_future = Abortable::new(
1197                async {
1198                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1199                    let (mut reader, mut writer) = fifo.async_io();
1200                    let mut request = BlockFifoRequest::default();
1201                    loop {
1202                        match reader.read_entries(&mut request).await {
1203                            Ok(1) => {}
1204                            Err(zx::Status::PEER_CLOSED) => break,
1205                            Err(e) => panic!("read_entry failed {:?}", e),
1206                            _ => unreachable!(),
1207                        };
1208
1209                        let response = self.fifo_handler.as_ref()(request);
1210                        writer
1211                            .write_entries(std::slice::from_ref(&response))
1212                            .await
1213                            .expect("write_entries failed");
1214                    }
1215                },
1216                fifo_future_abort_registration,
1217            );
1218
1219            let channel_future = async {
1220                server
1221                    .into_stream()
1222                    .for_each_concurrent(None, |request| async {
1223                        let request = request.expect("unexpected fidl error");
1224
1225                        match request {
1226                            block::BlockRequest::GetInfo { responder } => {
1227                                responder
1228                                    .send(Ok(&block::BlockInfo {
1229                                        block_count: 1024,
1230                                        block_size: 512,
1231                                        max_transfer_size: 1024 * 1024,
1232                                        flags: block::Flag::empty(),
1233                                    }))
1234                                    .expect("send failed");
1235                            }
1236                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1237                                let stream = session.into_stream();
1238                                stream
1239                                    .for_each(|request| async {
1240                                        let request = request.expect("unexpected fidl error");
1241                                        // Give a chance for the test to register and potentially
1242                                        // handle the event
1243                                        if self.channel_handler.as_ref()(&request) {
1244                                            return;
1245                                        }
1246                                        match request {
1247                                            block::SessionRequest::GetFifo { responder } => {
1248                                                match maybe_server_fifo.lock().take() {
1249                                                    Some(fifo) => {
1250                                                        responder.send(Ok(fifo.downcast()))
1251                                                    }
1252                                                    None => responder.send(Err(
1253                                                        zx::Status::NO_RESOURCES.into_raw(),
1254                                                    )),
1255                                                }
1256                                                .expect("send failed")
1257                                            }
1258                                            block::SessionRequest::AttachVmo {
1259                                                vmo: _,
1260                                                responder,
1261                                            } => responder
1262                                                .send(Ok(&block::VmoId { id: 1 }))
1263                                                .expect("send failed"),
1264                                            block::SessionRequest::Close { responder } => {
1265                                                fifo_future_abort.abort();
1266                                                responder.send(Ok(())).expect("send failed")
1267                                            }
1268                                        }
1269                                    })
1270                                    .await
1271                            }
1272                            _ => panic!("Unexpected message"),
1273                        }
1274                    })
1275                    .await;
1276            };
1277
1278            let _result = join!(fifo_future, channel_future);
1279            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1280        }
1281    }
1282
1283    #[fuchsia::test]
1284    async fn test_block_close_is_called() {
1285        let close_called = fuchsia_sync::Mutex::new(false);
1286        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1287
1288        std::thread::spawn(move || {
1289            let _block_client =
1290                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1291            // The drop here should cause Close to be sent.
1292        });
1293
1294        let channel_handler = |request: &block::SessionRequest| -> bool {
1295            if let block::SessionRequest::Close { .. } = request {
1296                *close_called.lock() = true;
1297            }
1298            false
1299        };
1300        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1301
1302        // After the server has finished running, we can check to see that close was called.
1303        assert!(*close_called.lock());
1304    }
1305
1306    #[fuchsia::test]
1307    async fn test_block_flush_is_called() {
1308        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1309
1310        struct Interface {
1311            flush_called: Arc<AtomicBool>,
1312        }
1313        impl block_server::async_interface::Interface for Interface {
1314            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1315                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1316                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1317                    max_transfer_blocks: None,
1318                    block_range: Some(0..1000),
1319                    type_guid: [0; 16],
1320                    instance_guid: [0; 16],
1321                    name: "foo".to_string(),
1322                    flags: 0,
1323                })))
1324            }
1325
1326            async fn read(
1327                &self,
1328                _device_block_offset: u64,
1329                _block_count: u32,
1330                _vmo: &Arc<zx::Vmo>,
1331                _vmo_offset: u64,
1332                _trace_flow_id: Option<NonZero<u64>>,
1333            ) -> Result<(), zx::Status> {
1334                unreachable!();
1335            }
1336
1337            fn barrier(&self) -> Result<(), zx::Status> {
1338                unreachable!()
1339            }
1340
1341            async fn write(
1342                &self,
1343                _device_block_offset: u64,
1344                _block_count: u32,
1345                _vmo: &Arc<zx::Vmo>,
1346                _vmo_offset: u64,
1347                _opts: WriteOptions,
1348                _trace_flow_id: Option<NonZero<u64>>,
1349            ) -> Result<(), zx::Status> {
1350                unreachable!();
1351            }
1352
1353            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1354                self.flush_called.store(true, Ordering::Relaxed);
1355                Ok(())
1356            }
1357
1358            async fn trim(
1359                &self,
1360                _device_block_offset: u64,
1361                _block_count: u32,
1362                _trace_flow_id: Option<NonZero<u64>>,
1363            ) -> Result<(), zx::Status> {
1364                unreachable!();
1365            }
1366        }
1367
1368        let flush_called = Arc::new(AtomicBool::new(false));
1369
1370        futures::join!(
1371            async {
1372                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1373
1374                block_client.flush().await.expect("flush failed");
1375            },
1376            async {
1377                let block_server = BlockServer::new(
1378                    512,
1379                    Arc::new(Interface { flush_called: flush_called.clone() }),
1380                );
1381                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1382            }
1383        );
1384
1385        assert!(flush_called.load(Ordering::Relaxed));
1386    }
1387
1388    #[fuchsia::test]
1389    async fn test_trace_flow_ids_set() {
1390        let (proxy, server) = fidl::endpoints::create_proxy();
1391
1392        futures::join!(
1393            async {
1394                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1395                block_client.flush().await.expect("flush failed");
1396            },
1397            async {
1398                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1399                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1400                    if request.trace_flow_id > 0 {
1401                        *flow_id.lock() = Some(request.trace_flow_id);
1402                    }
1403                    BlockFifoResponse {
1404                        status: zx::Status::OK.into_raw(),
1405                        reqid: request.reqid,
1406                        ..Default::default()
1407                    }
1408                };
1409                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1410                // After the server has finished running, verify the trace flow ID was set to some value.
1411                assert!(flow_id.lock().is_some());
1412            }
1413        );
1414    }
1415}