block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
14use fidl_fuchsia_hardware_block_partition::PartitionProxy;
15use fidl_fuchsia_hardware_block_volume::VolumeProxy;
16use fuchsia_sync::Mutex;
17use futures::channel::oneshot;
18use futures::executor::block_on;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use zx::sys::zx_handle_t;
30use zx::{self as zx, HandleBased as _};
31use {
32    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
33    fuchsia_async as fasync, storage_trace as trace,
34};
35
36pub use cache::Cache;
37
38pub use block::Flag as BlockFlags;
39
40pub use block_protocol::*;
41
42pub mod cache;
43
44const TEMP_VMO_SIZE: usize = 65536;
45
46/// If a trace flow ID isn't specified for requests, one will be generated.
47pub const NO_TRACE_ID: u64 = 0;
48
49pub use block_driver::{BlockIoFlag, BlockOpcode};
50
51fn fidl_to_status(error: fidl::Error) -> zx::Status {
52    match error {
53        fidl::Error::ClientChannelClosed { status, .. } => status,
54        _ => zx::Status::INTERNAL,
55    }
56}
57
58fn opcode_str(opcode: u8) -> &'static str {
59    match BlockOpcode::from_primitive(opcode) {
60        Some(BlockOpcode::Read) => "read",
61        Some(BlockOpcode::Write) => "write",
62        Some(BlockOpcode::Flush) => "flush",
63        Some(BlockOpcode::Trim) => "trim",
64        Some(BlockOpcode::CloseVmo) => "close_vmo",
65        None => "unknown",
66    }
67}
68
69// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
70// reused within this process).
71fn generate_trace_flow_id(request_id: u32) -> u64 {
72    static SELF_HANDLE: LazyLock<zx_handle_t> =
73        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
74    *SELF_HANDLE as u64 + (request_id as u64) << 32
75}
76
77pub enum BufferSlice<'a> {
78    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
79    Memory(&'a [u8]),
80}
81
82impl<'a> BufferSlice<'a> {
83    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
84        BufferSlice::VmoId { vmo_id, offset, length }
85    }
86}
87
88impl<'a> From<&'a [u8]> for BufferSlice<'a> {
89    fn from(buf: &'a [u8]) -> Self {
90        BufferSlice::Memory(buf)
91    }
92}
93
94pub enum MutableBufferSlice<'a> {
95    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
96    Memory(&'a mut [u8]),
97}
98
99impl<'a> MutableBufferSlice<'a> {
100    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
101        MutableBufferSlice::VmoId { vmo_id, offset, length }
102    }
103}
104
105impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
106    fn from(buf: &'a mut [u8]) -> Self {
107        MutableBufferSlice::Memory(buf)
108    }
109}
110
111#[derive(Default)]
112struct RequestState {
113    result: Option<zx::Status>,
114    waker: Option<Waker>,
115}
116
117#[derive(Default)]
118struct FifoState {
119    // The fifo.
120    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
121
122    // The next request ID to be used.
123    next_request_id: u32,
124
125    // A queue of messages to be sent on the fifo.
126    queue: std::collections::VecDeque<BlockFifoRequest>,
127
128    // Map from request ID to RequestState.
129    map: HashMap<u32, RequestState>,
130
131    // The waker for the FifoPoller.
132    poller_waker: Option<Waker>,
133
134    // If set, attach a barrier to the next write request
135    attach_barrier: bool,
136}
137
138impl FifoState {
139    fn terminate(&mut self) {
140        self.fifo.take();
141        for (_, request_state) in self.map.iter_mut() {
142            request_state.result.get_or_insert(zx::Status::CANCELED);
143            if let Some(waker) = request_state.waker.take() {
144                waker.wake();
145            }
146        }
147        if let Some(waker) = self.poller_waker.take() {
148            waker.wake();
149        }
150    }
151
152    // Returns true if polling should be terminated.
153    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
154        let fifo = if let Some(fifo) = self.fifo.as_ref() {
155            fifo
156        } else {
157            return true;
158        };
159
160        loop {
161            let slice = self.queue.as_slices().0;
162            if slice.is_empty() {
163                return false;
164            }
165            match fifo.try_write(context, slice) {
166                Poll::Ready(Ok(sent)) => {
167                    self.queue.drain(0..sent);
168                }
169                Poll::Ready(Err(_)) => {
170                    self.terminate();
171                    return true;
172                }
173                Poll::Pending => {
174                    return false;
175                }
176            }
177        }
178    }
179}
180
181type FifoStateRef = Arc<Mutex<FifoState>>;
182
183// A future used for fifo responses.
184struct ResponseFuture {
185    request_id: u32,
186    fifo_state: FifoStateRef,
187}
188
189impl ResponseFuture {
190    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
191        ResponseFuture { request_id, fifo_state }
192    }
193}
194
195impl Future for ResponseFuture {
196    type Output = Result<(), zx::Status>;
197
198    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
199        let mut state = self.fifo_state.lock();
200        let request_state = state.map.get_mut(&self.request_id).unwrap();
201        if let Some(result) = request_state.result {
202            Poll::Ready(result.into())
203        } else {
204            request_state.waker.replace(context.waker().clone());
205            Poll::Pending
206        }
207    }
208}
209
210impl Drop for ResponseFuture {
211    fn drop(&mut self) {
212        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
213    }
214}
215
216/// Wraps a vmo-id. Will panic if you forget to detach.
217#[derive(Debug)]
218#[must_use]
219pub struct VmoId(AtomicU16);
220
221impl VmoId {
222    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
223    pub fn new(id: u16) -> Self {
224        Self(AtomicU16::new(id))
225    }
226
227    /// Invalidates self and returns a new VmoId with the same underlying ID.
228    pub fn take(&self) -> Self {
229        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
230    }
231
232    pub fn is_valid(&self) -> bool {
233        self.id() != block_driver::BLOCK_VMOID_INVALID
234    }
235
236    /// Takes the ID.  The caller assumes responsibility for detaching.
237    #[must_use]
238    pub fn into_id(self) -> u16 {
239        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
240    }
241
242    pub fn id(&self) -> u16 {
243        self.0.load(Ordering::Relaxed)
244    }
245}
246
247impl PartialEq for VmoId {
248    fn eq(&self, other: &Self) -> bool {
249        self.id() == other.id()
250    }
251}
252
253impl Eq for VmoId {}
254
255impl Drop for VmoId {
256    fn drop(&mut self) {
257        assert_eq!(
258            self.0.load(Ordering::Relaxed),
259            block_driver::BLOCK_VMOID_INVALID,
260            "Did you forget to detach?"
261        );
262    }
263}
264
265impl Hash for VmoId {
266    fn hash<H: Hasher>(&self, state: &mut H) {
267        self.id().hash(state);
268    }
269}
270
271/// Represents a client connection to a block device. This is a simplified version of the block.fidl
272/// interface.
273/// Most users will use the RemoteBlockClient instantiation of this trait.
274pub trait BlockClient: Send + Sync {
275    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
276    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
277
278    /// Detaches the given vmo-id from the device.
279    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
280
281    /// Reads from the device at |device_offset| into the given buffer slice.
282    fn read_at(
283        &self,
284        buffer_slice: MutableBufferSlice<'_>,
285        device_offset: u64,
286    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
288    }
289
290    fn read_at_with_opts(
291        &self,
292        buffer_slice: MutableBufferSlice<'_>,
293        device_offset: u64,
294        opts: ReadOptions,
295    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
296        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
297    }
298
299    fn read_at_with_opts_traced(
300        &self,
301        buffer_slice: MutableBufferSlice<'_>,
302        device_offset: u64,
303        opts: ReadOptions,
304        trace_flow_id: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
306
307    /// Writes the data in |buffer_slice| to the device.
308    fn write_at(
309        &self,
310        buffer_slice: BufferSlice<'_>,
311        device_offset: u64,
312    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
313        self.write_at_with_opts_traced(
314            buffer_slice,
315            device_offset,
316            WriteOptions::default(),
317            NO_TRACE_ID,
318        )
319    }
320
321    fn write_at_with_opts(
322        &self,
323        buffer_slice: BufferSlice<'_>,
324        device_offset: u64,
325        opts: WriteOptions,
326    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
327        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
328    }
329
330    fn write_at_with_opts_traced(
331        &self,
332        buffer_slice: BufferSlice<'_>,
333        device_offset: u64,
334        opts: WriteOptions,
335        trace_flow_id: u64,
336    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
337
338    /// Trims the given range on the block device.
339    fn trim(
340        &self,
341        device_range: Range<u64>,
342    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
343        self.trim_traced(device_range, NO_TRACE_ID)
344    }
345
346    fn trim_traced(
347        &self,
348        device_range: Range<u64>,
349        trace_flow_id: u64,
350    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
351
352    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
353    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
354    /// This method makes it easier to guarantee that the barrier is attached to the correct
355    /// write operation when subsequent write operations can get reordered.
356    fn barrier(&self);
357
358    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
359        self.flush_traced(NO_TRACE_ID)
360    }
361
362    /// Sends a flush request to the underlying block device.
363    fn flush_traced(
364        &self,
365        trace_flow_id: u64,
366    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
367
368    /// Closes the fifo.
369    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
370
371    /// Returns the block size of the device.
372    fn block_size(&self) -> u32;
373
374    /// Returns the size, in blocks, of the device.
375    fn block_count(&self) -> u64;
376
377    /// Returns the maximum number of blocks which can be transferred in a single request.
378    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
379
380    /// Returns the block flags reported by the device.
381    fn block_flags(&self) -> BlockFlags;
382
383    /// Returns true if the remote fifo is still connected.
384    fn is_connected(&self) -> bool;
385}
386
387struct Common {
388    block_size: u32,
389    block_count: u64,
390    max_transfer_blocks: Option<NonZero<u32>>,
391    block_flags: BlockFlags,
392    fifo_state: FifoStateRef,
393    temp_vmo: futures::lock::Mutex<zx::Vmo>,
394    temp_vmo_id: VmoId,
395}
396
397impl Common {
398    fn new(
399        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
400        info: &block::BlockInfo,
401        temp_vmo: zx::Vmo,
402        temp_vmo_id: VmoId,
403    ) -> Self {
404        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
405        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
406        Self {
407            block_size: info.block_size,
408            block_count: info.block_count,
409            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
410                NonZero::new(info.max_transfer_size / info.block_size)
411            } else {
412                None
413            },
414            block_flags: info.flags,
415            fifo_state,
416            temp_vmo: futures::lock::Mutex::new(temp_vmo),
417            temp_vmo_id,
418        }
419    }
420
421    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
422        if bytes % self.block_size as u64 != 0 {
423            Err(zx::Status::INVALID_ARGS)
424        } else {
425            Ok(bytes / self.block_size as u64)
426        }
427    }
428
429    // Sends the request and waits for the response.
430    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
431        let (request_id, trace_flow_id) = {
432            let mut state = self.fifo_state.lock();
433
434            let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
435            if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
436                && state.attach_barrier
437            {
438                flags |= BlockIoFlag::PRE_BARRIER;
439                request.command.flags = flags.bits();
440                state.attach_barrier = false;
441            }
442
443            if state.fifo.is_none() {
444                // Fifo has been closed.
445                return Err(zx::Status::CANCELED);
446            }
447            trace::duration!(
448                c"storage",
449                c"block_client::send::start",
450                "op" => opcode_str(request.command.opcode),
451                "len" => request.length * self.block_size
452            );
453            let request_id = state.next_request_id;
454            state.next_request_id = state.next_request_id.overflowing_add(1).0;
455            assert!(
456                state.map.insert(request_id, RequestState::default()).is_none(),
457                "request id in use!"
458            );
459            request.reqid = request_id;
460            if request.trace_flow_id == NO_TRACE_ID {
461                request.trace_flow_id = generate_trace_flow_id(request_id);
462            }
463            let trace_flow_id = request.trace_flow_id;
464            trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
465            state.queue.push_back(request);
466            if let Some(waker) = state.poller_waker.clone() {
467                state.poll_send_requests(&mut Context::from_waker(&waker));
468            }
469            (request_id, trace_flow_id)
470        };
471        ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
472        trace::duration!(c"storage", c"block_client::send::end");
473        trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
474        Ok(())
475    }
476
477    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
478        self.send(BlockFifoRequest {
479            command: BlockFifoCommand {
480                opcode: BlockOpcode::CloseVmo.into_primitive(),
481                flags: 0,
482                ..Default::default()
483            },
484            vmoid: vmo_id.into_id(),
485            ..Default::default()
486        })
487        .await
488    }
489
490    async fn read_at(
491        &self,
492        buffer_slice: MutableBufferSlice<'_>,
493        device_offset: u64,
494        opts: ReadOptions,
495        trace_flow_id: u64,
496    ) -> Result<(), zx::Status> {
497        match buffer_slice {
498            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
499                self.send(BlockFifoRequest {
500                    command: BlockFifoCommand {
501                        opcode: BlockOpcode::Read.into_primitive(),
502                        flags: 0,
503                        ..Default::default()
504                    },
505                    vmoid: vmo_id.id(),
506                    length: self
507                        .to_blocks(length)?
508                        .try_into()
509                        .map_err(|_| zx::Status::INVALID_ARGS)?,
510                    vmo_offset: self.to_blocks(offset)?,
511                    dev_offset: self.to_blocks(device_offset)?,
512                    trace_flow_id,
513                    dun: opts.inline_crypto_options.dun,
514                    slot: opts.inline_crypto_options.slot,
515                    ..Default::default()
516                })
517                .await?
518            }
519            MutableBufferSlice::Memory(mut slice) => {
520                let temp_vmo = self.temp_vmo.lock().await;
521                let mut device_block = self.to_blocks(device_offset)?;
522                loop {
523                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
524                    let block_count = self.to_blocks(to_do as u64)? as u32;
525                    self.send(BlockFifoRequest {
526                        command: BlockFifoCommand {
527                            opcode: BlockOpcode::Read.into_primitive(),
528                            flags: 0,
529                            ..Default::default()
530                        },
531                        vmoid: self.temp_vmo_id.id(),
532                        length: block_count,
533                        vmo_offset: 0,
534                        dev_offset: device_block,
535                        trace_flow_id,
536                        dun: opts.inline_crypto_options.dun,
537                        slot: opts.inline_crypto_options.slot,
538                        ..Default::default()
539                    })
540                    .await?;
541                    temp_vmo.read(&mut slice[..to_do], 0)?;
542                    if to_do == slice.len() {
543                        break;
544                    }
545                    device_block += block_count as u64;
546                    slice = &mut slice[to_do..];
547                }
548            }
549        }
550        Ok(())
551    }
552
553    async fn write_at(
554        &self,
555        buffer_slice: BufferSlice<'_>,
556        device_offset: u64,
557        opts: WriteOptions,
558        trace_flow_id: u64,
559    ) -> Result<(), zx::Status> {
560        let mut flags = BlockIoFlag::empty();
561
562        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
563            flags |= BlockIoFlag::FORCE_ACCESS;
564        }
565
566        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
567            flags |= BlockIoFlag::PRE_BARRIER;
568        }
569
570        match buffer_slice {
571            BufferSlice::VmoId { vmo_id, offset, length } => {
572                self.send(BlockFifoRequest {
573                    command: BlockFifoCommand {
574                        opcode: BlockOpcode::Write.into_primitive(),
575                        flags: flags.bits(),
576                        ..Default::default()
577                    },
578                    vmoid: vmo_id.id(),
579                    length: self
580                        .to_blocks(length)?
581                        .try_into()
582                        .map_err(|_| zx::Status::INVALID_ARGS)?,
583                    vmo_offset: self.to_blocks(offset)?,
584                    dev_offset: self.to_blocks(device_offset)?,
585                    trace_flow_id,
586                    dun: opts.inline_crypto_options.dun,
587                    slot: opts.inline_crypto_options.slot,
588                    ..Default::default()
589                })
590                .await?;
591            }
592            BufferSlice::Memory(mut slice) => {
593                let temp_vmo = self.temp_vmo.lock().await;
594                let mut device_block = self.to_blocks(device_offset)?;
595                loop {
596                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
597                    let block_count = self.to_blocks(to_do as u64)? as u32;
598                    temp_vmo.write(&slice[..to_do], 0)?;
599                    self.send(BlockFifoRequest {
600                        command: BlockFifoCommand {
601                            opcode: BlockOpcode::Write.into_primitive(),
602                            flags: flags.bits(),
603                            ..Default::default()
604                        },
605                        vmoid: self.temp_vmo_id.id(),
606                        length: block_count,
607                        vmo_offset: 0,
608                        dev_offset: device_block,
609                        trace_flow_id,
610                        dun: opts.inline_crypto_options.dun,
611                        slot: opts.inline_crypto_options.slot,
612                        ..Default::default()
613                    })
614                    .await?;
615                    if to_do == slice.len() {
616                        break;
617                    }
618                    device_block += block_count as u64;
619                    slice = &slice[to_do..];
620                }
621            }
622        }
623        Ok(())
624    }
625
626    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
627        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
628        let dev_offset = self.to_blocks(device_range.start)?;
629        self.send(BlockFifoRequest {
630            command: BlockFifoCommand {
631                opcode: BlockOpcode::Trim.into_primitive(),
632                flags: 0,
633                ..Default::default()
634            },
635            vmoid: block_driver::BLOCK_VMOID_INVALID,
636            length,
637            dev_offset,
638            trace_flow_id,
639            ..Default::default()
640        })
641        .await
642    }
643
644    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
645        self.send(BlockFifoRequest {
646            command: BlockFifoCommand {
647                opcode: BlockOpcode::Flush.into_primitive(),
648                flags: 0,
649                ..Default::default()
650            },
651            vmoid: block_driver::BLOCK_VMOID_INVALID,
652            trace_flow_id,
653            ..Default::default()
654        })
655        .await
656    }
657
658    fn barrier(&self) {
659        self.fifo_state.lock().attach_barrier = true;
660    }
661
662    fn block_size(&self) -> u32 {
663        self.block_size
664    }
665
666    fn block_count(&self) -> u64 {
667        self.block_count
668    }
669
670    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
671        self.max_transfer_blocks.clone()
672    }
673
674    fn block_flags(&self) -> BlockFlags {
675        self.block_flags
676    }
677
678    fn is_connected(&self) -> bool {
679        self.fifo_state.lock().fifo.is_some()
680    }
681}
682
683impl Drop for Common {
684    fn drop(&mut self) {
685        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
686        // down.
687        let _ = self.temp_vmo_id.take().into_id();
688        self.fifo_state.lock().terminate();
689    }
690}
691
692/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
693pub struct RemoteBlockClient {
694    session: block::SessionProxy,
695    common: Common,
696}
697
698pub trait AsBlockProxy {
699    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
700
701    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
702}
703
704impl<T: AsBlockProxy> AsBlockProxy for &T {
705    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
706        AsBlockProxy::get_info(*self)
707    }
708    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
709        AsBlockProxy::open_session(*self, session)
710    }
711}
712
713macro_rules! impl_as_block_proxy {
714    ($name:ident) => {
715        impl AsBlockProxy for $name {
716            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
717                $name::get_info(self).await
718            }
719
720            fn open_session(
721                &self,
722                session: ServerEnd<block::SessionMarker>,
723            ) -> Result<(), fidl::Error> {
724                $name::open_session(self, session)
725            }
726        }
727    };
728}
729
730impl_as_block_proxy!(BlockProxy);
731impl_as_block_proxy!(PartitionProxy);
732impl_as_block_proxy!(VolumeProxy);
733
734impl RemoteBlockClient {
735    /// Returns a connection to a remote block device via the given channel.
736    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
737        let info =
738            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
739        let (session, server) = fidl::endpoints::create_proxy();
740        let () = remote.open_session(server).map_err(fidl_to_status)?;
741        Self::from_session(info, session).await
742    }
743
744    pub async fn from_session(
745        info: block::BlockInfo,
746        session: block::SessionProxy,
747    ) -> Result<Self, zx::Status> {
748        let fifo =
749            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
750        let fifo = fasync::Fifo::from_fifo(fifo);
751        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
752        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
753        let vmo_id =
754            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
755        let vmo_id = VmoId::new(vmo_id.id);
756        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
757    }
758}
759
760impl BlockClient for RemoteBlockClient {
761    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
762        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
763        let vmo_id = self
764            .session
765            .attach_vmo(dup)
766            .await
767            .map_err(fidl_to_status)?
768            .map_err(zx::Status::from_raw)?;
769        Ok(VmoId::new(vmo_id.id))
770    }
771
772    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
773        self.common.detach_vmo(vmo_id).await
774    }
775
776    async fn read_at_with_opts_traced(
777        &self,
778        buffer_slice: MutableBufferSlice<'_>,
779        device_offset: u64,
780        opts: ReadOptions,
781        trace_flow_id: u64,
782    ) -> Result<(), zx::Status> {
783        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
784    }
785
786    async fn write_at_with_opts_traced(
787        &self,
788        buffer_slice: BufferSlice<'_>,
789        device_offset: u64,
790        opts: WriteOptions,
791        trace_flow_id: u64,
792    ) -> Result<(), zx::Status> {
793        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
794    }
795
796    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
797        self.common.trim(range, trace_flow_id).await
798    }
799
800    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
801        self.common.flush(trace_flow_id).await
802    }
803
804    fn barrier(&self) {
805        self.common.barrier()
806    }
807
808    async fn close(&self) -> Result<(), zx::Status> {
809        let () =
810            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
811        Ok(())
812    }
813
814    fn block_size(&self) -> u32 {
815        self.common.block_size()
816    }
817
818    fn block_count(&self) -> u64 {
819        self.common.block_count()
820    }
821
822    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
823        self.common.max_transfer_blocks()
824    }
825
826    fn block_flags(&self) -> BlockFlags {
827        self.common.block_flags()
828    }
829
830    fn is_connected(&self) -> bool {
831        self.common.is_connected()
832    }
833}
834
835pub struct RemoteBlockClientSync {
836    session: block::SessionSynchronousProxy,
837    common: Common,
838}
839
840impl RemoteBlockClientSync {
841    /// Returns a connection to a remote block device via the given channel, but spawns a separate
842    /// thread for polling the fifo which makes it work in cases where no executor is configured for
843    /// the calling thread.
844    pub fn new(
845        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
846    ) -> Result<Self, zx::Status> {
847        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
848        let info = remote
849            .get_info(zx::MonotonicInstant::INFINITE)
850            .map_err(fidl_to_status)?
851            .map_err(zx::Status::from_raw)?;
852        let (client, server) = fidl::endpoints::create_endpoints();
853        let () = remote.open_session(server).map_err(fidl_to_status)?;
854        let session = block::SessionSynchronousProxy::new(client.into_channel());
855        let fifo = session
856            .get_fifo(zx::MonotonicInstant::INFINITE)
857            .map_err(fidl_to_status)?
858            .map_err(zx::Status::from_raw)?;
859        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
860        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
861        let vmo_id = session
862            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
863            .map_err(fidl_to_status)?
864            .map_err(zx::Status::from_raw)?;
865        let vmo_id = VmoId::new(vmo_id.id);
866
867        // The fifo needs to be instantiated from the thread that has the executor as that's where
868        // the fifo registers for notifications to be delivered.
869        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
870        std::thread::spawn(move || {
871            let mut executor = fasync::LocalExecutor::default();
872            let fifo = fasync::Fifo::from_fifo(fifo);
873            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
874            let fifo_state = common.fifo_state.clone();
875            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
876            executor.run_singlethreaded(FifoPoller { fifo_state });
877        });
878        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
879    }
880
881    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
882        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
883        let vmo_id = self
884            .session
885            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
886            .map_err(fidl_to_status)?
887            .map_err(zx::Status::from_raw)?;
888        Ok(VmoId::new(vmo_id.id))
889    }
890
891    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
892        block_on(self.common.detach_vmo(vmo_id))
893    }
894
895    pub fn read_at(
896        &self,
897        buffer_slice: MutableBufferSlice<'_>,
898        device_offset: u64,
899    ) -> Result<(), zx::Status> {
900        block_on(self.common.read_at(
901            buffer_slice,
902            device_offset,
903            ReadOptions::default(),
904            NO_TRACE_ID,
905        ))
906    }
907
908    pub fn write_at(
909        &self,
910        buffer_slice: BufferSlice<'_>,
911        device_offset: u64,
912    ) -> Result<(), zx::Status> {
913        block_on(self.common.write_at(
914            buffer_slice,
915            device_offset,
916            WriteOptions::default(),
917            NO_TRACE_ID,
918        ))
919    }
920
921    pub fn flush(&self) -> Result<(), zx::Status> {
922        block_on(self.common.flush(NO_TRACE_ID))
923    }
924
925    pub fn close(&self) -> Result<(), zx::Status> {
926        let () = self
927            .session
928            .close(zx::MonotonicInstant::INFINITE)
929            .map_err(fidl_to_status)?
930            .map_err(zx::Status::from_raw)?;
931        Ok(())
932    }
933
934    pub fn block_size(&self) -> u32 {
935        self.common.block_size()
936    }
937
938    pub fn block_count(&self) -> u64 {
939        self.common.block_count()
940    }
941
942    pub fn is_connected(&self) -> bool {
943        self.common.is_connected()
944    }
945}
946
947impl Drop for RemoteBlockClientSync {
948    fn drop(&mut self) {
949        // Ignore errors here as there is not much we can do about it.
950        let _ = self.close();
951    }
952}
953
954// FifoPoller is a future responsible for sending and receiving from the fifo.
955struct FifoPoller {
956    fifo_state: FifoStateRef,
957}
958
959impl Future for FifoPoller {
960    type Output = ();
961
962    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
963        let mut state_lock = self.fifo_state.lock();
964        let state = state_lock.deref_mut(); // So that we can split the borrow.
965
966        // Send requests.
967        if state.poll_send_requests(context) {
968            return Poll::Ready(());
969        }
970
971        // Receive responses.
972        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
973        loop {
974            let mut response = MaybeUninit::uninit();
975            match fifo.try_read(context, &mut response) {
976                Poll::Pending => {
977                    state.poller_waker = Some(context.waker().clone());
978                    return Poll::Pending;
979                }
980                Poll::Ready(Ok(_)) => {
981                    let response = unsafe { response.assume_init() };
982                    let request_id = response.reqid;
983                    // If the request isn't in the map, assume that it's a cancelled read.
984                    if let Some(request_state) = state.map.get_mut(&request_id) {
985                        request_state.result.replace(zx::Status::from_raw(response.status));
986                        if let Some(waker) = request_state.waker.take() {
987                            waker.wake();
988                        }
989                    }
990                }
991                Poll::Ready(Err(_)) => {
992                    state.terminate();
993                    return Poll::Ready(());
994                }
995            }
996        }
997    }
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::{
1003        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
1004        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
1005    };
1006    use block_protocol::ReadOptions;
1007    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
1008    use fidl::endpoints::RequestStream as _;
1009    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
1010    use futures::join;
1011    use futures::stream::StreamExt as _;
1012    use futures::stream::futures_unordered::FuturesUnordered;
1013    use ramdevice_client::RamdiskClient;
1014    use std::borrow::Cow;
1015    use std::num::NonZero;
1016    use std::sync::Arc;
1017    use std::sync::atomic::{AtomicBool, Ordering};
1018    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
1019
1020    const RAMDISK_BLOCK_SIZE: u64 = 1024;
1021    const RAMDISK_BLOCK_COUNT: u64 = 1024;
1022
1023    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1024        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1025            .await
1026            .expect("RamdiskClient::create failed");
1027        let client_end = ramdisk.open().expect("ramdisk.open failed");
1028        let proxy = client_end.into_proxy();
1029        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1030        assert_eq!(block_client.block_size(), 1024);
1031        let client_end = ramdisk.open().expect("ramdisk.open failed");
1032        let proxy = client_end.into_proxy();
1033        (ramdisk, proxy, block_client)
1034    }
1035
1036    #[fuchsia::test]
1037    async fn test_against_ram_disk() {
1038        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1039
1040        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1041        vmo.write(b"hello", 5).expect("vmo.write failed");
1042        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1043        block_client
1044            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1045            .await
1046            .expect("write_at failed");
1047        block_client
1048            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1049            .await
1050            .expect("read_at failed");
1051        let mut buf: [u8; 5] = Default::default();
1052        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1053        assert_eq!(&buf, b"hello");
1054        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1055    }
1056
1057    #[fuchsia::test]
1058    async fn test_alignment() {
1059        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1060        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1061        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1062        block_client
1063            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1064            .await
1065            .expect_err("expected failure due to bad alignment");
1066        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1067    }
1068
1069    #[fuchsia::test]
1070    async fn test_parallel_io() {
1071        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1072        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1073        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1074        let mut reads = Vec::new();
1075        for _ in 0..1024 {
1076            reads.push(
1077                block_client
1078                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1079                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1080            );
1081        }
1082        futures::future::join_all(reads).await;
1083        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1084    }
1085
1086    #[fuchsia::test]
1087    async fn test_closed_device() {
1088        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1089        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1090        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1091        let mut reads = Vec::new();
1092        for _ in 0..1024 {
1093            reads.push(
1094                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1095            );
1096        }
1097        assert!(block_client.is_connected());
1098        let _ = futures::join!(futures::future::join_all(reads), async {
1099            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1100        });
1101        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1102        while block_client
1103            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1104            .await
1105            .is_ok()
1106        {}
1107
1108        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1109        // get false-positives from is_connected.
1110        while block_client.is_connected() {
1111            // Sleep for a bit to minimise lock contention.
1112            fasync::Timer::new(fasync::MonotonicInstant::after(
1113                zx::MonotonicDuration::from_millis(500),
1114            ))
1115            .await;
1116        }
1117
1118        // But once is_connected goes negative, it should stay negative.
1119        assert_eq!(block_client.is_connected(), false);
1120        let _ = block_client.detach_vmo(vmo_id).await;
1121    }
1122
1123    #[fuchsia::test]
1124    async fn test_cancelled_reads() {
1125        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1126        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1127        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1128        {
1129            let mut reads = FuturesUnordered::new();
1130            for _ in 0..1024 {
1131                reads.push(
1132                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1133                );
1134            }
1135            // Read the first 500 results and then dump the rest.
1136            for _ in 0..500 {
1137                reads.next().await;
1138            }
1139        }
1140        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1141    }
1142
1143    #[fuchsia::test]
1144    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1145        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1146        let block_client_ref = &block_client;
1147        let test_one = |offset, len, fill| async move {
1148            let buf = vec![fill; len];
1149            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1150            // Read back an extra block either side.
1151            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1152            block_client_ref
1153                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1154                .await
1155                .expect("read_at failed");
1156            assert_eq!(
1157                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1158                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1159            );
1160            assert_eq!(
1161                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1162                &buf[..]
1163            );
1164            assert_eq!(
1165                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1166                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1167            );
1168        };
1169        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1170        join!(
1171            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1172            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1173        );
1174    }
1175
1176    // Implements dummy server which can be used by test cases to verify whether
1177    // channel messages and fifo operations are being received - by using set_channel_handler or
1178    // set_fifo_hander respectively
1179    struct FakeBlockServer<'a> {
1180        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1181        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1182        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1183    }
1184
1185    impl<'a> FakeBlockServer<'a> {
1186        // Creates a new FakeBlockServer given a channel to listen on.
1187        //
1188        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1189        // handles requests received from channel or the fifo respectfully.
1190        //
1191        // 'channel_handler' receives a message before it is handled by the default implementation
1192        // and can return 'true' to indicate all processing is done and no further processing of
1193        // that message is required
1194        //
1195        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1196        // FakeBlockServer will send over the fifo.
1197        fn new(
1198            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1199            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1200            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1201        ) -> FakeBlockServer<'a> {
1202            FakeBlockServer {
1203                server_channel: Some(server_channel),
1204                channel_handler: Box::new(channel_handler),
1205                fifo_handler: Box::new(fifo_handler),
1206            }
1207        }
1208
1209        // Runs the server
1210        async fn run(&mut self) {
1211            let server = self.server_channel.take().unwrap();
1212
1213            // Set up a mock server.
1214            let (server_fifo, client_fifo) =
1215                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1216                    .expect("Fifo::create failed");
1217            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1218
1219            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1220            let fifo_future = Abortable::new(
1221                async {
1222                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1223                    let (mut reader, mut writer) = fifo.async_io();
1224                    let mut request = BlockFifoRequest::default();
1225                    loop {
1226                        match reader.read_entries(&mut request).await {
1227                            Ok(1) => {}
1228                            Err(zx::Status::PEER_CLOSED) => break,
1229                            Err(e) => panic!("read_entry failed {:?}", e),
1230                            _ => unreachable!(),
1231                        };
1232
1233                        let response = self.fifo_handler.as_ref()(request);
1234                        writer
1235                            .write_entries(std::slice::from_ref(&response))
1236                            .await
1237                            .expect("write_entries failed");
1238                    }
1239                },
1240                fifo_future_abort_registration,
1241            );
1242
1243            let channel_future = async {
1244                server
1245                    .into_stream()
1246                    .for_each_concurrent(None, |request| async {
1247                        let request = request.expect("unexpected fidl error");
1248
1249                        match request {
1250                            block::BlockRequest::GetInfo { responder } => {
1251                                responder
1252                                    .send(Ok(&block::BlockInfo {
1253                                        block_count: 1024,
1254                                        block_size: 512,
1255                                        max_transfer_size: 1024 * 1024,
1256                                        flags: block::Flag::empty(),
1257                                    }))
1258                                    .expect("send failed");
1259                            }
1260                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1261                                let stream = session.into_stream();
1262                                stream
1263                                    .for_each(|request| async {
1264                                        let request = request.expect("unexpected fidl error");
1265                                        // Give a chance for the test to register and potentially
1266                                        // handle the event
1267                                        if self.channel_handler.as_ref()(&request) {
1268                                            return;
1269                                        }
1270                                        match request {
1271                                            block::SessionRequest::GetFifo { responder } => {
1272                                                match maybe_server_fifo.lock().take() {
1273                                                    Some(fifo) => {
1274                                                        responder.send(Ok(fifo.downcast()))
1275                                                    }
1276                                                    None => responder.send(Err(
1277                                                        zx::Status::NO_RESOURCES.into_raw(),
1278                                                    )),
1279                                                }
1280                                                .expect("send failed")
1281                                            }
1282                                            block::SessionRequest::AttachVmo {
1283                                                vmo: _,
1284                                                responder,
1285                                            } => responder
1286                                                .send(Ok(&block::VmoId { id: 1 }))
1287                                                .expect("send failed"),
1288                                            block::SessionRequest::Close { responder } => {
1289                                                fifo_future_abort.abort();
1290                                                responder.send(Ok(())).expect("send failed")
1291                                            }
1292                                        }
1293                                    })
1294                                    .await
1295                            }
1296                            _ => panic!("Unexpected message"),
1297                        }
1298                    })
1299                    .await;
1300            };
1301
1302            let _result = join!(fifo_future, channel_future);
1303            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1304        }
1305    }
1306
1307    #[fuchsia::test]
1308    async fn test_block_close_is_called() {
1309        let close_called = fuchsia_sync::Mutex::new(false);
1310        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1311
1312        std::thread::spawn(move || {
1313            let _block_client =
1314                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1315            // The drop here should cause Close to be sent.
1316        });
1317
1318        let channel_handler = |request: &block::SessionRequest| -> bool {
1319            if let block::SessionRequest::Close { .. } = request {
1320                *close_called.lock() = true;
1321            }
1322            false
1323        };
1324        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1325
1326        // After the server has finished running, we can check to see that close was called.
1327        assert!(*close_called.lock());
1328    }
1329
1330    #[fuchsia::test]
1331    async fn test_block_flush_is_called() {
1332        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1333
1334        struct Interface {
1335            flush_called: Arc<AtomicBool>,
1336        }
1337        impl block_server::async_interface::Interface for Interface {
1338            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1339                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1340                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1341                    max_transfer_blocks: None,
1342                    block_range: Some(0..1000),
1343                    type_guid: [0; 16],
1344                    instance_guid: [0; 16],
1345                    name: "foo".to_string(),
1346                    flags: 0,
1347                })))
1348            }
1349
1350            async fn read(
1351                &self,
1352                _device_block_offset: u64,
1353                _block_count: u32,
1354                _vmo: &Arc<zx::Vmo>,
1355                _vmo_offset: u64,
1356                _opts: ReadOptions,
1357                _trace_flow_id: Option<NonZero<u64>>,
1358            ) -> Result<(), zx::Status> {
1359                unreachable!();
1360            }
1361
1362            fn barrier(&self) -> Result<(), zx::Status> {
1363                unreachable!()
1364            }
1365
1366            async fn write(
1367                &self,
1368                _device_block_offset: u64,
1369                _block_count: u32,
1370                _vmo: &Arc<zx::Vmo>,
1371                _vmo_offset: u64,
1372                _opts: WriteOptions,
1373                _trace_flow_id: Option<NonZero<u64>>,
1374            ) -> Result<(), zx::Status> {
1375                unreachable!();
1376            }
1377
1378            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1379                self.flush_called.store(true, Ordering::Relaxed);
1380                Ok(())
1381            }
1382
1383            async fn trim(
1384                &self,
1385                _device_block_offset: u64,
1386                _block_count: u32,
1387                _trace_flow_id: Option<NonZero<u64>>,
1388            ) -> Result<(), zx::Status> {
1389                unreachable!();
1390            }
1391        }
1392
1393        let flush_called = Arc::new(AtomicBool::new(false));
1394
1395        futures::join!(
1396            async {
1397                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1398
1399                block_client.flush().await.expect("flush failed");
1400            },
1401            async {
1402                let block_server = BlockServer::new(
1403                    512,
1404                    Arc::new(Interface { flush_called: flush_called.clone() }),
1405                );
1406                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1407            }
1408        );
1409
1410        assert!(flush_called.load(Ordering::Relaxed));
1411    }
1412
1413    #[fuchsia::test]
1414    async fn test_trace_flow_ids_set() {
1415        let (proxy, server) = fidl::endpoints::create_proxy();
1416
1417        futures::join!(
1418            async {
1419                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1420                block_client.flush().await.expect("flush failed");
1421            },
1422            async {
1423                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1424                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1425                    if request.trace_flow_id > 0 {
1426                        *flow_id.lock() = Some(request.trace_flow_id);
1427                    }
1428                    BlockFifoResponse {
1429                        status: zx::Status::OK.into_raw(),
1430                        reqid: request.reqid,
1431                        ..Default::default()
1432                    }
1433                };
1434                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1435                // After the server has finished running, verify the trace flow ID was set to some value.
1436                assert!(flow_id.lock().is_some());
1437            }
1438        );
1439    }
1440}