block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
13use fuchsia_sync::Mutex;
14use futures::channel::oneshot;
15use futures::executor::block_on;
16use std::borrow::Borrow;
17use std::collections::HashMap;
18use std::future::Future;
19use std::hash::{Hash, Hasher};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::ops::{DerefMut, Range};
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU16, Ordering};
25use std::sync::{Arc, LazyLock};
26use std::task::{Context, Poll, Waker};
27use zx::sys::zx_handle_t;
28use zx::{self as zx, HandleBased as _};
29use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync, storage_trace as trace};
30
31pub use cache::Cache;
32
33pub use block::DeviceFlag as BlockDeviceFlag;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41/// If a trace flow ID isn't specified for requests, one will be generated.
42pub const NO_TRACE_ID: u64 = 0;
43
44pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47    match error {
48        fidl::Error::ClientChannelClosed { status, .. } => status,
49        _ => zx::Status::INTERNAL,
50    }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54    match BlockOpcode::from_primitive(opcode) {
55        Some(BlockOpcode::Read) => "read",
56        Some(BlockOpcode::Write) => "write",
57        Some(BlockOpcode::Flush) => "flush",
58        Some(BlockOpcode::Trim) => "trim",
59        Some(BlockOpcode::CloseVmo) => "close_vmo",
60        None => "unknown",
61    }
62}
63
64// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
65// reused within this process).
66fn generate_trace_flow_id(request_id: u32) -> u64 {
67    static SELF_HANDLE: LazyLock<zx_handle_t> =
68        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
69    *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74    Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79        BufferSlice::VmoId { vmo_id, offset, length }
80    }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84    fn from(buf: &'a [u8]) -> Self {
85        BufferSlice::Memory(buf)
86    }
87}
88
89pub enum MutableBufferSlice<'a> {
90    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91    Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96        MutableBufferSlice::VmoId { vmo_id, offset, length }
97    }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101    fn from(buf: &'a mut [u8]) -> Self {
102        MutableBufferSlice::Memory(buf)
103    }
104}
105
106#[derive(Default)]
107struct RequestState {
108    result: Option<zx::Status>,
109    waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114    // The fifo.
115    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117    // The next request ID to be used.
118    next_request_id: u32,
119
120    // A queue of messages to be sent on the fifo.
121    queue: std::collections::VecDeque<BlockFifoRequest>,
122
123    // Map from request ID to RequestState.
124    map: HashMap<u32, RequestState>,
125
126    // The waker for the FifoPoller.
127    poller_waker: Option<Waker>,
128
129    // If set, attach a barrier to the next write request
130    attach_barrier: bool,
131}
132
133impl FifoState {
134    fn terminate(&mut self) {
135        self.fifo.take();
136        for (_, request_state) in self.map.iter_mut() {
137            request_state.result.get_or_insert(zx::Status::CANCELED);
138            if let Some(waker) = request_state.waker.take() {
139                waker.wake();
140            }
141        }
142        if let Some(waker) = self.poller_waker.take() {
143            waker.wake();
144        }
145    }
146
147    // Returns true if polling should be terminated.
148    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
149        let fifo = if let Some(fifo) = self.fifo.as_ref() {
150            fifo
151        } else {
152            return true;
153        };
154
155        loop {
156            let slice = self.queue.as_slices().0;
157            if slice.is_empty() {
158                return false;
159            }
160            match fifo.try_write(context, slice) {
161                Poll::Ready(Ok(sent)) => {
162                    self.queue.drain(0..sent);
163                }
164                Poll::Ready(Err(_)) => {
165                    self.terminate();
166                    return true;
167                }
168                Poll::Pending => {
169                    return false;
170                }
171            }
172        }
173    }
174}
175
176type FifoStateRef = Arc<Mutex<FifoState>>;
177
178// A future used for fifo responses.
179struct ResponseFuture {
180    request_id: u32,
181    fifo_state: FifoStateRef,
182}
183
184impl ResponseFuture {
185    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
186        ResponseFuture { request_id, fifo_state }
187    }
188}
189
190impl Future for ResponseFuture {
191    type Output = Result<(), zx::Status>;
192
193    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194        let mut state = self.fifo_state.lock();
195        let request_state = state.map.get_mut(&self.request_id).unwrap();
196        if let Some(result) = request_state.result {
197            Poll::Ready(result.into())
198        } else {
199            request_state.waker.replace(context.waker().clone());
200            Poll::Pending
201        }
202    }
203}
204
205impl Drop for ResponseFuture {
206    fn drop(&mut self) {
207        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
208    }
209}
210
211/// Wraps a vmo-id. Will panic if you forget to detach.
212#[derive(Debug)]
213#[must_use]
214pub struct VmoId(AtomicU16);
215
216impl VmoId {
217    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
218    pub fn new(id: u16) -> Self {
219        Self(AtomicU16::new(id))
220    }
221
222    /// Invalidates self and returns a new VmoId with the same underlying ID.
223    pub fn take(&self) -> Self {
224        Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
225    }
226
227    pub fn is_valid(&self) -> bool {
228        self.id() != VMOID_INVALID
229    }
230
231    /// Takes the ID.  The caller assumes responsibility for detaching.
232    #[must_use]
233    pub fn into_id(self) -> u16 {
234        self.0.swap(VMOID_INVALID, Ordering::Relaxed)
235    }
236
237    pub fn id(&self) -> u16 {
238        self.0.load(Ordering::Relaxed)
239    }
240}
241
242impl PartialEq for VmoId {
243    fn eq(&self, other: &Self) -> bool {
244        self.id() == other.id()
245    }
246}
247
248impl Eq for VmoId {}
249
250impl Drop for VmoId {
251    fn drop(&mut self) {
252        assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
253    }
254}
255
256impl Hash for VmoId {
257    fn hash<H: Hasher>(&self, state: &mut H) {
258        self.id().hash(state);
259    }
260}
261
262/// Represents a client connection to a block device. This is a simplified version of the block.fidl
263/// interface.
264/// Most users will use the RemoteBlockClient instantiation of this trait.
265pub trait BlockClient: Send + Sync {
266    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
267    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
268
269    /// Detaches the given vmo-id from the device.
270    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
271
272    /// Reads from the device at |device_offset| into the given buffer slice.
273    fn read_at(
274        &self,
275        buffer_slice: MutableBufferSlice<'_>,
276        device_offset: u64,
277    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
278        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
279    }
280
281    fn read_at_with_opts(
282        &self,
283        buffer_slice: MutableBufferSlice<'_>,
284        device_offset: u64,
285        opts: ReadOptions,
286    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
288    }
289
290    fn read_at_with_opts_traced(
291        &self,
292        buffer_slice: MutableBufferSlice<'_>,
293        device_offset: u64,
294        opts: ReadOptions,
295        trace_flow_id: u64,
296    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
297
298    /// Writes the data in |buffer_slice| to the device.
299    fn write_at(
300        &self,
301        buffer_slice: BufferSlice<'_>,
302        device_offset: u64,
303    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
304        self.write_at_with_opts_traced(
305            buffer_slice,
306            device_offset,
307            WriteOptions::default(),
308            NO_TRACE_ID,
309        )
310    }
311
312    fn write_at_with_opts(
313        &self,
314        buffer_slice: BufferSlice<'_>,
315        device_offset: u64,
316        opts: WriteOptions,
317    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
318        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
319    }
320
321    fn write_at_with_opts_traced(
322        &self,
323        buffer_slice: BufferSlice<'_>,
324        device_offset: u64,
325        opts: WriteOptions,
326        trace_flow_id: u64,
327    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
328
329    /// Trims the given range on the block device.
330    fn trim(
331        &self,
332        device_range: Range<u64>,
333    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
334        self.trim_traced(device_range, NO_TRACE_ID)
335    }
336
337    fn trim_traced(
338        &self,
339        device_range: Range<u64>,
340        trace_flow_id: u64,
341    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
342
343    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
344    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
345    /// This method makes it easier to guarantee that the barrier is attached to the correct
346    /// write operation when subsequent write operations can get reordered.
347    fn barrier(&self);
348
349    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
350        self.flush_traced(NO_TRACE_ID)
351    }
352
353    /// Sends a flush request to the underlying block device.
354    fn flush_traced(
355        &self,
356        trace_flow_id: u64,
357    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
358
359    /// Closes the fifo.
360    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
361
362    /// Returns the block size of the device.
363    fn block_size(&self) -> u32;
364
365    /// Returns the size, in blocks, of the device.
366    fn block_count(&self) -> u64;
367
368    /// Returns the maximum number of blocks which can be transferred in a single request.
369    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
370
371    /// Returns the block flags reported by the device.
372    fn block_flags(&self) -> BlockDeviceFlag;
373
374    /// Returns true if the remote fifo is still connected.
375    fn is_connected(&self) -> bool;
376}
377
378struct Common {
379    block_size: u32,
380    block_count: u64,
381    max_transfer_blocks: Option<NonZero<u32>>,
382    block_flags: BlockDeviceFlag,
383    fifo_state: FifoStateRef,
384    temp_vmo: futures::lock::Mutex<zx::Vmo>,
385    temp_vmo_id: VmoId,
386}
387
388impl Common {
389    fn new(
390        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
391        info: &block::BlockInfo,
392        temp_vmo: zx::Vmo,
393        temp_vmo_id: VmoId,
394    ) -> Self {
395        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
396        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
397        Self {
398            block_size: info.block_size,
399            block_count: info.block_count,
400            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
401                NonZero::new(info.max_transfer_size / info.block_size)
402            } else {
403                None
404            },
405            block_flags: info.flags,
406            fifo_state,
407            temp_vmo: futures::lock::Mutex::new(temp_vmo),
408            temp_vmo_id,
409        }
410    }
411
412    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
413        if bytes % self.block_size as u64 != 0 {
414            Err(zx::Status::INVALID_ARGS)
415        } else {
416            Ok(bytes / self.block_size as u64)
417        }
418    }
419
420    // Sends the request and waits for the response.
421    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
422        let (request_id, trace_flow_id) = {
423            let mut state = self.fifo_state.lock();
424
425            let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
426            if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
427                && state.attach_barrier
428            {
429                flags |= BlockIoFlag::PRE_BARRIER;
430                request.command.flags = flags.bits();
431                state.attach_barrier = false;
432            }
433
434            if state.fifo.is_none() {
435                // Fifo has been closed.
436                return Err(zx::Status::CANCELED);
437            }
438            trace::duration!(
439                "storage",
440                "block_client::send::start",
441                "op" => opcode_str(request.command.opcode),
442                "len" => request.length * self.block_size
443            );
444            let request_id = state.next_request_id;
445            state.next_request_id = state.next_request_id.overflowing_add(1).0;
446            assert!(
447                state.map.insert(request_id, RequestState::default()).is_none(),
448                "request id in use!"
449            );
450            request.reqid = request_id;
451            if request.trace_flow_id == NO_TRACE_ID {
452                request.trace_flow_id = generate_trace_flow_id(request_id);
453            }
454            let trace_flow_id = request.trace_flow_id;
455            trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
456            state.queue.push_back(request);
457            if let Some(waker) = state.poller_waker.clone() {
458                state.poll_send_requests(&mut Context::from_waker(&waker));
459            }
460            (request_id, trace_flow_id)
461        };
462        ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
463        trace::duration!("storage", "block_client::send::end");
464        trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
465        Ok(())
466    }
467
468    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
469        self.send(BlockFifoRequest {
470            command: BlockFifoCommand {
471                opcode: BlockOpcode::CloseVmo.into_primitive(),
472                flags: 0,
473                ..Default::default()
474            },
475            vmoid: vmo_id.into_id(),
476            ..Default::default()
477        })
478        .await
479    }
480
481    async fn read_at(
482        &self,
483        buffer_slice: MutableBufferSlice<'_>,
484        device_offset: u64,
485        opts: ReadOptions,
486        trace_flow_id: u64,
487    ) -> Result<(), zx::Status> {
488        match buffer_slice {
489            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
490                self.send(BlockFifoRequest {
491                    command: BlockFifoCommand {
492                        opcode: BlockOpcode::Read.into_primitive(),
493                        flags: 0,
494                        ..Default::default()
495                    },
496                    vmoid: vmo_id.id(),
497                    length: self
498                        .to_blocks(length)?
499                        .try_into()
500                        .map_err(|_| zx::Status::INVALID_ARGS)?,
501                    vmo_offset: self.to_blocks(offset)?,
502                    dev_offset: self.to_blocks(device_offset)?,
503                    trace_flow_id,
504                    dun: opts.inline_crypto.dun,
505                    slot: opts.inline_crypto.slot,
506                    ..Default::default()
507                })
508                .await?
509            }
510            MutableBufferSlice::Memory(mut slice) => {
511                let temp_vmo = self.temp_vmo.lock().await;
512                let mut device_block = self.to_blocks(device_offset)?;
513                loop {
514                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
515                    let block_count = self.to_blocks(to_do as u64)? as u32;
516                    self.send(BlockFifoRequest {
517                        command: BlockFifoCommand {
518                            opcode: BlockOpcode::Read.into_primitive(),
519                            flags: 0,
520                            ..Default::default()
521                        },
522                        vmoid: self.temp_vmo_id.id(),
523                        length: block_count,
524                        vmo_offset: 0,
525                        dev_offset: device_block,
526                        trace_flow_id,
527                        dun: opts.inline_crypto.dun,
528                        slot: opts.inline_crypto.slot,
529                        ..Default::default()
530                    })
531                    .await?;
532                    temp_vmo.read(&mut slice[..to_do], 0)?;
533                    if to_do == slice.len() {
534                        break;
535                    }
536                    device_block += block_count as u64;
537                    slice = &mut slice[to_do..];
538                }
539            }
540        }
541        Ok(())
542    }
543
544    async fn write_at(
545        &self,
546        buffer_slice: BufferSlice<'_>,
547        device_offset: u64,
548        opts: WriteOptions,
549        trace_flow_id: u64,
550    ) -> Result<(), zx::Status> {
551        let mut flags = BlockIoFlag::empty();
552
553        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
554            flags |= BlockIoFlag::FORCE_ACCESS;
555        }
556
557        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
558            flags |= BlockIoFlag::PRE_BARRIER;
559        }
560
561        match buffer_slice {
562            BufferSlice::VmoId { vmo_id, offset, length } => {
563                self.send(BlockFifoRequest {
564                    command: BlockFifoCommand {
565                        opcode: BlockOpcode::Write.into_primitive(),
566                        flags: flags.bits(),
567                        ..Default::default()
568                    },
569                    vmoid: vmo_id.id(),
570                    length: self
571                        .to_blocks(length)?
572                        .try_into()
573                        .map_err(|_| zx::Status::INVALID_ARGS)?,
574                    vmo_offset: self.to_blocks(offset)?,
575                    dev_offset: self.to_blocks(device_offset)?,
576                    trace_flow_id,
577                    dun: opts.inline_crypto.dun,
578                    slot: opts.inline_crypto.slot,
579                    ..Default::default()
580                })
581                .await?;
582            }
583            BufferSlice::Memory(mut slice) => {
584                let temp_vmo = self.temp_vmo.lock().await;
585                let mut device_block = self.to_blocks(device_offset)?;
586                loop {
587                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
588                    let block_count = self.to_blocks(to_do as u64)? as u32;
589                    temp_vmo.write(&slice[..to_do], 0)?;
590                    self.send(BlockFifoRequest {
591                        command: BlockFifoCommand {
592                            opcode: BlockOpcode::Write.into_primitive(),
593                            flags: flags.bits(),
594                            ..Default::default()
595                        },
596                        vmoid: self.temp_vmo_id.id(),
597                        length: block_count,
598                        vmo_offset: 0,
599                        dev_offset: device_block,
600                        trace_flow_id,
601                        dun: opts.inline_crypto.dun,
602                        slot: opts.inline_crypto.slot,
603                        ..Default::default()
604                    })
605                    .await?;
606                    if to_do == slice.len() {
607                        break;
608                    }
609                    device_block += block_count as u64;
610                    slice = &slice[to_do..];
611                }
612            }
613        }
614        Ok(())
615    }
616
617    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
618        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
619        let dev_offset = self.to_blocks(device_range.start)?;
620        self.send(BlockFifoRequest {
621            command: BlockFifoCommand {
622                opcode: BlockOpcode::Trim.into_primitive(),
623                flags: 0,
624                ..Default::default()
625            },
626            vmoid: VMOID_INVALID,
627            length,
628            dev_offset,
629            trace_flow_id,
630            ..Default::default()
631        })
632        .await
633    }
634
635    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
636        self.send(BlockFifoRequest {
637            command: BlockFifoCommand {
638                opcode: BlockOpcode::Flush.into_primitive(),
639                flags: 0,
640                ..Default::default()
641            },
642            vmoid: VMOID_INVALID,
643            trace_flow_id,
644            ..Default::default()
645        })
646        .await
647    }
648
649    fn barrier(&self) {
650        self.fifo_state.lock().attach_barrier = true;
651    }
652
653    fn block_size(&self) -> u32 {
654        self.block_size
655    }
656
657    fn block_count(&self) -> u64 {
658        self.block_count
659    }
660
661    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
662        self.max_transfer_blocks.clone()
663    }
664
665    fn block_flags(&self) -> BlockDeviceFlag {
666        self.block_flags
667    }
668
669    fn is_connected(&self) -> bool {
670        self.fifo_state.lock().fifo.is_some()
671    }
672}
673
674impl Drop for Common {
675    fn drop(&mut self) {
676        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
677        // down.
678        let _ = self.temp_vmo_id.take().into_id();
679        self.fifo_state.lock().terminate();
680    }
681}
682
683// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
684pub struct RemoteBlockClient {
685    session: block::SessionProxy,
686    common: Common,
687}
688
689impl RemoteBlockClient {
690    /// Returns a connection to a remote block device via the given channel.
691    pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
692        let remote = remote.borrow();
693        let info =
694            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
695        let (session, server) = fidl::endpoints::create_proxy();
696        let () = remote.open_session(server).map_err(fidl_to_status)?;
697        Self::from_session(info, session).await
698    }
699
700    pub async fn from_session(
701        info: block::BlockInfo,
702        session: block::SessionProxy,
703    ) -> Result<Self, zx::Status> {
704        let fifo =
705            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
706        let fifo = fasync::Fifo::from_fifo(fifo);
707        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
708        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
709        let vmo_id =
710            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
711        let vmo_id = VmoId::new(vmo_id.id);
712        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
713    }
714}
715
716impl BlockClient for RemoteBlockClient {
717    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
718        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
719        let vmo_id = self
720            .session
721            .attach_vmo(dup)
722            .await
723            .map_err(fidl_to_status)?
724            .map_err(zx::Status::from_raw)?;
725        Ok(VmoId::new(vmo_id.id))
726    }
727
728    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
729        self.common.detach_vmo(vmo_id).await
730    }
731
732    async fn read_at_with_opts_traced(
733        &self,
734        buffer_slice: MutableBufferSlice<'_>,
735        device_offset: u64,
736        opts: ReadOptions,
737        trace_flow_id: u64,
738    ) -> Result<(), zx::Status> {
739        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
740    }
741
742    async fn write_at_with_opts_traced(
743        &self,
744        buffer_slice: BufferSlice<'_>,
745        device_offset: u64,
746        opts: WriteOptions,
747        trace_flow_id: u64,
748    ) -> Result<(), zx::Status> {
749        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
750    }
751
752    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
753        self.common.trim(range, trace_flow_id).await
754    }
755
756    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
757        self.common.flush(trace_flow_id).await
758    }
759
760    fn barrier(&self) {
761        self.common.barrier()
762    }
763
764    async fn close(&self) -> Result<(), zx::Status> {
765        let () =
766            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
767        Ok(())
768    }
769
770    fn block_size(&self) -> u32 {
771        self.common.block_size()
772    }
773
774    fn block_count(&self) -> u64 {
775        self.common.block_count()
776    }
777
778    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
779        self.common.max_transfer_blocks()
780    }
781
782    fn block_flags(&self) -> BlockDeviceFlag {
783        self.common.block_flags()
784    }
785
786    fn is_connected(&self) -> bool {
787        self.common.is_connected()
788    }
789}
790
791pub struct RemoteBlockClientSync {
792    session: block::SessionSynchronousProxy,
793    common: Common,
794}
795
796impl RemoteBlockClientSync {
797    /// Returns a connection to a remote block device via the given channel, but spawns a separate
798    /// thread for polling the fifo which makes it work in cases where no executor is configured for
799    /// the calling thread.
800    pub fn new(
801        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
802    ) -> Result<Self, zx::Status> {
803        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
804        let info = remote
805            .get_info(zx::MonotonicInstant::INFINITE)
806            .map_err(fidl_to_status)?
807            .map_err(zx::Status::from_raw)?;
808        let (client, server) = fidl::endpoints::create_endpoints();
809        let () = remote.open_session(server).map_err(fidl_to_status)?;
810        let session = block::SessionSynchronousProxy::new(client.into_channel());
811        let fifo = session
812            .get_fifo(zx::MonotonicInstant::INFINITE)
813            .map_err(fidl_to_status)?
814            .map_err(zx::Status::from_raw)?;
815        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
816        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
817        let vmo_id = session
818            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
819            .map_err(fidl_to_status)?
820            .map_err(zx::Status::from_raw)?;
821        let vmo_id = VmoId::new(vmo_id.id);
822
823        // The fifo needs to be instantiated from the thread that has the executor as that's where
824        // the fifo registers for notifications to be delivered.
825        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
826        std::thread::spawn(move || {
827            let mut executor = fasync::LocalExecutor::default();
828            let fifo = fasync::Fifo::from_fifo(fifo);
829            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
830            let fifo_state = common.fifo_state.clone();
831            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
832            executor.run_singlethreaded(FifoPoller { fifo_state });
833        });
834        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
835    }
836
837    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
838        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
839        let vmo_id = self
840            .session
841            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
842            .map_err(fidl_to_status)?
843            .map_err(zx::Status::from_raw)?;
844        Ok(VmoId::new(vmo_id.id))
845    }
846
847    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
848        block_on(self.common.detach_vmo(vmo_id))
849    }
850
851    pub fn read_at(
852        &self,
853        buffer_slice: MutableBufferSlice<'_>,
854        device_offset: u64,
855    ) -> Result<(), zx::Status> {
856        block_on(self.common.read_at(
857            buffer_slice,
858            device_offset,
859            ReadOptions::default(),
860            NO_TRACE_ID,
861        ))
862    }
863
864    pub fn write_at(
865        &self,
866        buffer_slice: BufferSlice<'_>,
867        device_offset: u64,
868    ) -> Result<(), zx::Status> {
869        block_on(self.common.write_at(
870            buffer_slice,
871            device_offset,
872            WriteOptions::default(),
873            NO_TRACE_ID,
874        ))
875    }
876
877    pub fn flush(&self) -> Result<(), zx::Status> {
878        block_on(self.common.flush(NO_TRACE_ID))
879    }
880
881    pub fn close(&self) -> Result<(), zx::Status> {
882        let () = self
883            .session
884            .close(zx::MonotonicInstant::INFINITE)
885            .map_err(fidl_to_status)?
886            .map_err(zx::Status::from_raw)?;
887        Ok(())
888    }
889
890    pub fn block_size(&self) -> u32 {
891        self.common.block_size()
892    }
893
894    pub fn block_count(&self) -> u64 {
895        self.common.block_count()
896    }
897
898    pub fn is_connected(&self) -> bool {
899        self.common.is_connected()
900    }
901}
902
903impl Drop for RemoteBlockClientSync {
904    fn drop(&mut self) {
905        // Ignore errors here as there is not much we can do about it.
906        let _ = self.close();
907    }
908}
909
910// FifoPoller is a future responsible for sending and receiving from the fifo.
911struct FifoPoller {
912    fifo_state: FifoStateRef,
913}
914
915impl Future for FifoPoller {
916    type Output = ();
917
918    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
919        let mut state_lock = self.fifo_state.lock();
920        let state = state_lock.deref_mut(); // So that we can split the borrow.
921
922        // Send requests.
923        if state.poll_send_requests(context) {
924            return Poll::Ready(());
925        }
926
927        // Receive responses.
928        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
929        loop {
930            let mut response = MaybeUninit::uninit();
931            match fifo.try_read(context, &mut response) {
932                Poll::Pending => {
933                    state.poller_waker = Some(context.waker().clone());
934                    return Poll::Pending;
935                }
936                Poll::Ready(Ok(_)) => {
937                    let response = unsafe { response.assume_init() };
938                    let request_id = response.reqid;
939                    // If the request isn't in the map, assume that it's a cancelled read.
940                    if let Some(request_state) = state.map.get_mut(&request_id) {
941                        request_state.result.replace(zx::Status::from_raw(response.status));
942                        if let Some(waker) = request_state.waker.take() {
943                            waker.wake();
944                        }
945                    }
946                }
947                Poll::Ready(Err(_)) => {
948                    state.terminate();
949                    return Poll::Ready(());
950                }
951            }
952        }
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    use super::{
959        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
960        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
961    };
962    use block_protocol::ReadOptions;
963    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
964    use fidl::endpoints::RequestStream as _;
965    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
966    use futures::join;
967    use futures::stream::StreamExt as _;
968    use futures::stream::futures_unordered::FuturesUnordered;
969    use ramdevice_client::RamdiskClient;
970    use std::borrow::Cow;
971    use std::num::NonZero;
972    use std::sync::Arc;
973    use std::sync::atomic::{AtomicBool, Ordering};
974    use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync};
975
976    const RAMDISK_BLOCK_SIZE: u64 = 1024;
977    const RAMDISK_BLOCK_COUNT: u64 = 1024;
978
979    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
980        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
981            .await
982            .expect("RamdiskClient::create failed");
983        let client_end = ramdisk.open().expect("ramdisk.open failed");
984        let proxy = client_end.into_proxy();
985        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
986        assert_eq!(block_client.block_size(), 1024);
987        let client_end = ramdisk.open().expect("ramdisk.open failed");
988        let proxy = client_end.into_proxy();
989        (ramdisk, proxy, block_client)
990    }
991
992    #[fuchsia::test]
993    async fn test_against_ram_disk() {
994        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
995
996        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
997        vmo.write(b"hello", 5).expect("vmo.write failed");
998        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
999        block_client
1000            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1001            .await
1002            .expect("write_at failed");
1003        block_client
1004            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1005            .await
1006            .expect("read_at failed");
1007        let mut buf: [u8; 5] = Default::default();
1008        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1009        assert_eq!(&buf, b"hello");
1010        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1011    }
1012
1013    #[fuchsia::test]
1014    async fn test_alignment() {
1015        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1016        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1017        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1018        block_client
1019            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1020            .await
1021            .expect_err("expected failure due to bad alignment");
1022        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1023    }
1024
1025    #[fuchsia::test]
1026    async fn test_parallel_io() {
1027        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1028        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1029        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1030        let mut reads = Vec::new();
1031        for _ in 0..1024 {
1032            reads.push(
1033                block_client
1034                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1035                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1036            );
1037        }
1038        futures::future::join_all(reads).await;
1039        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1040    }
1041
1042    #[fuchsia::test]
1043    async fn test_closed_device() {
1044        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1045        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1046        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1047        let mut reads = Vec::new();
1048        for _ in 0..1024 {
1049            reads.push(
1050                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1051            );
1052        }
1053        assert!(block_client.is_connected());
1054        let _ = futures::join!(futures::future::join_all(reads), async {
1055            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1056        });
1057        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1058        while block_client
1059            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1060            .await
1061            .is_ok()
1062        {}
1063
1064        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1065        // get false-positives from is_connected.
1066        while block_client.is_connected() {
1067            // Sleep for a bit to minimise lock contention.
1068            fasync::Timer::new(fasync::MonotonicInstant::after(
1069                zx::MonotonicDuration::from_millis(500),
1070            ))
1071            .await;
1072        }
1073
1074        // But once is_connected goes negative, it should stay negative.
1075        assert_eq!(block_client.is_connected(), false);
1076        let _ = block_client.detach_vmo(vmo_id).await;
1077    }
1078
1079    #[fuchsia::test]
1080    async fn test_cancelled_reads() {
1081        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1082        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1083        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1084        {
1085            let mut reads = FuturesUnordered::new();
1086            for _ in 0..1024 {
1087                reads.push(
1088                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1089                );
1090            }
1091            // Read the first 500 results and then dump the rest.
1092            for _ in 0..500 {
1093                reads.next().await;
1094            }
1095        }
1096        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1097    }
1098
1099    #[fuchsia::test]
1100    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1101        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1102        let block_client_ref = &block_client;
1103        let test_one = |offset, len, fill| async move {
1104            let buf = vec![fill; len];
1105            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1106            // Read back an extra block either side.
1107            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1108            block_client_ref
1109                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1110                .await
1111                .expect("read_at failed");
1112            assert_eq!(
1113                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1114                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1115            );
1116            assert_eq!(
1117                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1118                &buf[..]
1119            );
1120            assert_eq!(
1121                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1122                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1123            );
1124        };
1125        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1126        join!(
1127            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1128            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1129        );
1130    }
1131
1132    // Implements dummy server which can be used by test cases to verify whether
1133    // channel messages and fifo operations are being received - by using set_channel_handler or
1134    // set_fifo_hander respectively
1135    struct FakeBlockServer<'a> {
1136        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1137        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1138        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1139    }
1140
1141    impl<'a> FakeBlockServer<'a> {
1142        // Creates a new FakeBlockServer given a channel to listen on.
1143        //
1144        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1145        // handles requests received from channel or the fifo respectfully.
1146        //
1147        // 'channel_handler' receives a message before it is handled by the default implementation
1148        // and can return 'true' to indicate all processing is done and no further processing of
1149        // that message is required
1150        //
1151        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1152        // FakeBlockServer will send over the fifo.
1153        fn new(
1154            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1155            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1156            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1157        ) -> FakeBlockServer<'a> {
1158            FakeBlockServer {
1159                server_channel: Some(server_channel),
1160                channel_handler: Box::new(channel_handler),
1161                fifo_handler: Box::new(fifo_handler),
1162            }
1163        }
1164
1165        // Runs the server
1166        async fn run(&mut self) {
1167            let server = self.server_channel.take().unwrap();
1168
1169            // Set up a mock server.
1170            let (server_fifo, client_fifo) =
1171                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1172                    .expect("Fifo::create failed");
1173            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1174
1175            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1176            let fifo_future = Abortable::new(
1177                async {
1178                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1179                    let (mut reader, mut writer) = fifo.async_io();
1180                    let mut request = BlockFifoRequest::default();
1181                    loop {
1182                        match reader.read_entries(&mut request).await {
1183                            Ok(1) => {}
1184                            Err(zx::Status::PEER_CLOSED) => break,
1185                            Err(e) => panic!("read_entry failed {:?}", e),
1186                            _ => unreachable!(),
1187                        };
1188
1189                        let response = self.fifo_handler.as_ref()(request);
1190                        writer
1191                            .write_entries(std::slice::from_ref(&response))
1192                            .await
1193                            .expect("write_entries failed");
1194                    }
1195                },
1196                fifo_future_abort_registration,
1197            );
1198
1199            let channel_future = async {
1200                server
1201                    .into_stream()
1202                    .for_each_concurrent(None, |request| async {
1203                        let request = request.expect("unexpected fidl error");
1204
1205                        match request {
1206                            block::BlockRequest::GetInfo { responder } => {
1207                                responder
1208                                    .send(Ok(&block::BlockInfo {
1209                                        block_count: 1024,
1210                                        block_size: 512,
1211                                        max_transfer_size: 1024 * 1024,
1212                                        flags: block::DeviceFlag::empty(),
1213                                    }))
1214                                    .expect("send failed");
1215                            }
1216                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1217                                let stream = session.into_stream();
1218                                stream
1219                                    .for_each(|request| async {
1220                                        let request = request.expect("unexpected fidl error");
1221                                        // Give a chance for the test to register and potentially
1222                                        // handle the event
1223                                        if self.channel_handler.as_ref()(&request) {
1224                                            return;
1225                                        }
1226                                        match request {
1227                                            block::SessionRequest::GetFifo { responder } => {
1228                                                match maybe_server_fifo.lock().take() {
1229                                                    Some(fifo) => {
1230                                                        responder.send(Ok(fifo.downcast()))
1231                                                    }
1232                                                    None => responder.send(Err(
1233                                                        zx::Status::NO_RESOURCES.into_raw(),
1234                                                    )),
1235                                                }
1236                                                .expect("send failed")
1237                                            }
1238                                            block::SessionRequest::AttachVmo {
1239                                                vmo: _,
1240                                                responder,
1241                                            } => responder
1242                                                .send(Ok(&block::VmoId { id: 1 }))
1243                                                .expect("send failed"),
1244                                            block::SessionRequest::Close { responder } => {
1245                                                fifo_future_abort.abort();
1246                                                responder.send(Ok(())).expect("send failed")
1247                                            }
1248                                        }
1249                                    })
1250                                    .await
1251                            }
1252                            _ => panic!("Unexpected message"),
1253                        }
1254                    })
1255                    .await;
1256            };
1257
1258            let _result = join!(fifo_future, channel_future);
1259            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1260        }
1261    }
1262
1263    #[fuchsia::test]
1264    async fn test_block_close_is_called() {
1265        let close_called = fuchsia_sync::Mutex::new(false);
1266        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1267
1268        std::thread::spawn(move || {
1269            let _block_client =
1270                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1271            // The drop here should cause Close to be sent.
1272        });
1273
1274        let channel_handler = |request: &block::SessionRequest| -> bool {
1275            if let block::SessionRequest::Close { .. } = request {
1276                *close_called.lock() = true;
1277            }
1278            false
1279        };
1280        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1281
1282        // After the server has finished running, we can check to see that close was called.
1283        assert!(*close_called.lock());
1284    }
1285
1286    #[fuchsia::test]
1287    async fn test_block_flush_is_called() {
1288        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1289
1290        struct Interface {
1291            flush_called: Arc<AtomicBool>,
1292        }
1293        impl block_server::async_interface::Interface for Interface {
1294            fn get_info(&self) -> Cow<'_, DeviceInfo> {
1295                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1296                    device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1297                    max_transfer_blocks: None,
1298                    block_range: Some(0..1000),
1299                    type_guid: [0; 16],
1300                    instance_guid: [0; 16],
1301                    name: "foo".to_string(),
1302                    flags: 0,
1303                }))
1304            }
1305
1306            async fn read(
1307                &self,
1308                _device_block_offset: u64,
1309                _block_count: u32,
1310                _vmo: &Arc<zx::Vmo>,
1311                _vmo_offset: u64,
1312                _opts: ReadOptions,
1313                _trace_flow_id: Option<NonZero<u64>>,
1314            ) -> Result<(), zx::Status> {
1315                unreachable!();
1316            }
1317
1318            async fn write(
1319                &self,
1320                _device_block_offset: u64,
1321                _block_count: u32,
1322                _vmo: &Arc<zx::Vmo>,
1323                _vmo_offset: u64,
1324                _opts: WriteOptions,
1325                _trace_flow_id: Option<NonZero<u64>>,
1326            ) -> Result<(), zx::Status> {
1327                unreachable!();
1328            }
1329
1330            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1331                self.flush_called.store(true, Ordering::Relaxed);
1332                Ok(())
1333            }
1334
1335            async fn trim(
1336                &self,
1337                _device_block_offset: u64,
1338                _block_count: u32,
1339                _trace_flow_id: Option<NonZero<u64>>,
1340            ) -> Result<(), zx::Status> {
1341                unreachable!();
1342            }
1343        }
1344
1345        let flush_called = Arc::new(AtomicBool::new(false));
1346
1347        futures::join!(
1348            async {
1349                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1350
1351                block_client.flush().await.expect("flush failed");
1352            },
1353            async {
1354                let block_server = BlockServer::new(
1355                    512,
1356                    Arc::new(Interface { flush_called: flush_called.clone() }),
1357                );
1358                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1359            }
1360        );
1361
1362        assert!(flush_called.load(Ordering::Relaxed));
1363    }
1364
1365    #[fuchsia::test]
1366    async fn test_trace_flow_ids_set() {
1367        let (proxy, server) = fidl::endpoints::create_proxy();
1368
1369        futures::join!(
1370            async {
1371                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1372                block_client.flush().await.expect("flush failed");
1373            },
1374            async {
1375                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1376                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1377                    if request.trace_flow_id > 0 {
1378                        *flow_id.lock() = Some(request.trace_flow_id);
1379                    }
1380                    BlockFifoResponse {
1381                        status: zx::Status::OK.into_raw(),
1382                        reqid: request.reqid,
1383                        ..Default::default()
1384                    }
1385                };
1386                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1387                // After the server has finished running, verify the trace flow ID was set to some value.
1388                assert!(flow_id.lock().is_some());
1389            }
1390        );
1391    }
1392}