block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::BlockProxy;
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_sync::Mutex;
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::mem::MaybeUninit;
18use std::ops::{DerefMut, Range};
19use std::pin::Pin;
20use std::sync::atomic::{AtomicU16, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll, Waker};
23use zx::sys::zx_handle_t;
24use zx::{self as zx, HandleBased as _};
25use {
26    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
27    fuchsia_async as fasync, storage_trace as trace,
28};
29
30pub use cache::Cache;
31
32pub use block::Flag as BlockFlags;
33
34pub use block_protocol::*;
35
36pub mod cache;
37
38const TEMP_VMO_SIZE: usize = 65536;
39
40/// If a trace flow ID isn't specified for requests, one will be generated.
41pub const NO_TRACE_ID: u64 = 0;
42
43pub use block_driver::{BlockIoFlag, BlockOpcode};
44
45fn fidl_to_status(error: fidl::Error) -> zx::Status {
46    match error {
47        fidl::Error::ClientChannelClosed { status, .. } => status,
48        _ => zx::Status::INTERNAL,
49    }
50}
51
52fn opcode_str(opcode: u8) -> &'static str {
53    match BlockOpcode::from_primitive(opcode) {
54        Some(BlockOpcode::Read) => "read",
55        Some(BlockOpcode::Write) => "write",
56        Some(BlockOpcode::Flush) => "flush",
57        Some(BlockOpcode::Trim) => "trim",
58        Some(BlockOpcode::CloseVmo) => "close_vmo",
59        None => "unknown",
60    }
61}
62
63// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
64// reused within this process).
65fn generate_trace_flow_id(request_id: u32) -> u64 {
66    lazy_static! {
67        static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
68    };
69    *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74    Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79        BufferSlice::VmoId { vmo_id, offset, length }
80    }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84    fn from(buf: &'a [u8]) -> Self {
85        BufferSlice::Memory(buf)
86    }
87}
88
89pub enum MutableBufferSlice<'a> {
90    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91    Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96        MutableBufferSlice::VmoId { vmo_id, offset, length }
97    }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101    fn from(buf: &'a mut [u8]) -> Self {
102        MutableBufferSlice::Memory(buf)
103    }
104}
105
106#[derive(Default)]
107struct RequestState {
108    result: Option<zx::Status>,
109    waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114    // The fifo.
115    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117    // The next request ID to be used.
118    next_request_id: u32,
119
120    // A queue of messages to be sent on the fifo.
121    queue: std::collections::VecDeque<BlockFifoRequest>,
122
123    // Map from request ID to RequestState.
124    map: HashMap<u32, RequestState>,
125
126    // The waker for the FifoPoller.
127    poller_waker: Option<Waker>,
128}
129
130impl FifoState {
131    fn terminate(&mut self) {
132        self.fifo.take();
133        for (_, request_state) in self.map.iter_mut() {
134            request_state.result.get_or_insert(zx::Status::CANCELED);
135            if let Some(waker) = request_state.waker.take() {
136                waker.wake();
137            }
138        }
139        if let Some(waker) = self.poller_waker.take() {
140            waker.wake();
141        }
142    }
143
144    // Returns true if polling should be terminated.
145    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
146        let fifo = if let Some(fifo) = self.fifo.as_ref() {
147            fifo
148        } else {
149            return true;
150        };
151
152        loop {
153            let slice = self.queue.as_slices().0;
154            if slice.is_empty() {
155                return false;
156            }
157            match fifo.try_write(context, slice) {
158                Poll::Ready(Ok(sent)) => {
159                    self.queue.drain(0..sent);
160                }
161                Poll::Ready(Err(_)) => {
162                    self.terminate();
163                    return true;
164                }
165                Poll::Pending => {
166                    return false;
167                }
168            }
169        }
170    }
171}
172
173type FifoStateRef = Arc<Mutex<FifoState>>;
174
175// A future used for fifo responses.
176struct ResponseFuture {
177    request_id: u32,
178    fifo_state: FifoStateRef,
179}
180
181impl ResponseFuture {
182    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
183        ResponseFuture { request_id, fifo_state }
184    }
185}
186
187impl Future for ResponseFuture {
188    type Output = Result<(), zx::Status>;
189
190    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
191        let mut state = self.fifo_state.lock();
192        let request_state = state.map.get_mut(&self.request_id).unwrap();
193        if let Some(result) = request_state.result {
194            Poll::Ready(result.into())
195        } else {
196            request_state.waker.replace(context.waker().clone());
197            Poll::Pending
198        }
199    }
200}
201
202impl Drop for ResponseFuture {
203    fn drop(&mut self) {
204        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
205    }
206}
207
208/// Wraps a vmo-id. Will panic if you forget to detach.
209#[derive(Debug)]
210#[must_use]
211pub struct VmoId(AtomicU16);
212
213impl VmoId {
214    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
215    pub fn new(id: u16) -> Self {
216        Self(AtomicU16::new(id))
217    }
218
219    /// Invalidates self and returns a new VmoId with the same underlying ID.
220    pub fn take(&self) -> Self {
221        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
222    }
223
224    pub fn is_valid(&self) -> bool {
225        self.id() != block_driver::BLOCK_VMOID_INVALID
226    }
227
228    /// Takes the ID.  The caller assumes responsibility for detaching.
229    #[must_use]
230    pub fn into_id(self) -> u16 {
231        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
232    }
233
234    pub fn id(&self) -> u16 {
235        self.0.load(Ordering::Relaxed)
236    }
237}
238
239impl PartialEq for VmoId {
240    fn eq(&self, other: &Self) -> bool {
241        self.id() == other.id()
242    }
243}
244
245impl Eq for VmoId {}
246
247impl Drop for VmoId {
248    fn drop(&mut self) {
249        assert_eq!(
250            self.0.load(Ordering::Relaxed),
251            block_driver::BLOCK_VMOID_INVALID,
252            "Did you forget to detach?"
253        );
254    }
255}
256
257impl Hash for VmoId {
258    fn hash<H: Hasher>(&self, state: &mut H) {
259        self.id().hash(state);
260    }
261}
262
263/// Represents a client connection to a block device. This is a simplified version of the block.fidl
264/// interface.
265/// Most users will use the RemoteBlockClient instantiation of this trait.
266#[async_trait]
267pub trait BlockClient: Send + Sync {
268    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
269    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
270
271    /// Detaches the given vmo-id from the device.
272    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
273
274    /// Reads from the device at |device_offset| into the given buffer slice.
275    async fn read_at(
276        &self,
277        buffer_slice: MutableBufferSlice<'_>,
278        device_offset: u64,
279    ) -> Result<(), zx::Status> {
280        self.read_at_traced(buffer_slice, device_offset, 0).await
281    }
282
283    async fn read_at_traced(
284        &self,
285        buffer_slice: MutableBufferSlice<'_>,
286        device_offset: u64,
287        trace_flow_id: u64,
288    ) -> Result<(), zx::Status>;
289
290    /// Writes the data in |buffer_slice| to the device.
291    async fn write_at(
292        &self,
293        buffer_slice: BufferSlice<'_>,
294        device_offset: u64,
295    ) -> Result<(), zx::Status> {
296        self.write_at_with_opts_traced(
297            buffer_slice,
298            device_offset,
299            WriteOptions::empty(),
300            NO_TRACE_ID,
301        )
302        .await
303    }
304
305    async fn write_at_with_opts(
306        &self,
307        buffer_slice: BufferSlice<'_>,
308        device_offset: u64,
309        opts: WriteOptions,
310    ) -> Result<(), zx::Status> {
311        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
312    }
313
314    async fn write_at_with_opts_traced(
315        &self,
316        buffer_slice: BufferSlice<'_>,
317        device_offset: u64,
318        opts: WriteOptions,
319        trace_flow_id: u64,
320    ) -> Result<(), zx::Status>;
321
322    /// Trims the given range on the block device.
323    async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
324        self.trim_traced(device_range, NO_TRACE_ID).await
325    }
326
327    async fn trim_traced(
328        &self,
329        device_range: Range<u64>,
330        trace_flow_id: u64,
331    ) -> Result<(), zx::Status>;
332
333    async fn flush(&self) -> Result<(), zx::Status> {
334        self.flush_traced(NO_TRACE_ID).await
335    }
336
337    /// Sends a flush request to the underlying block device.
338    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
339
340    /// Closes the fifo.
341    async fn close(&self) -> Result<(), zx::Status>;
342
343    /// Returns the block size of the device.
344    fn block_size(&self) -> u32;
345
346    /// Returns the size, in blocks, of the device.
347    fn block_count(&self) -> u64;
348
349    /// Returns the block flags reported by the device.
350    fn block_flags(&self) -> BlockFlags;
351
352    /// Returns true if the remote fifo is still connected.
353    fn is_connected(&self) -> bool;
354}
355
356struct Common {
357    block_size: u32,
358    block_count: u64,
359    block_flags: BlockFlags,
360    fifo_state: FifoStateRef,
361    temp_vmo: futures::lock::Mutex<zx::Vmo>,
362    temp_vmo_id: VmoId,
363}
364
365impl Common {
366    fn new(
367        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
368        info: &block::BlockInfo,
369        temp_vmo: zx::Vmo,
370        temp_vmo_id: VmoId,
371    ) -> Self {
372        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
373        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
374        Self {
375            block_size: info.block_size,
376            block_count: info.block_count,
377            block_flags: info.flags,
378            fifo_state,
379            temp_vmo: futures::lock::Mutex::new(temp_vmo),
380            temp_vmo_id,
381        }
382    }
383
384    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
385        if bytes % self.block_size as u64 != 0 {
386            Err(zx::Status::INVALID_ARGS)
387        } else {
388            Ok(bytes / self.block_size as u64)
389        }
390    }
391
392    // Sends the request and waits for the response.
393    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
394        async move {
395            let (request_id, trace_flow_id) = {
396                let mut state = self.fifo_state.lock();
397                if state.fifo.is_none() {
398                    // Fifo has been closed.
399                    return Err(zx::Status::CANCELED);
400                }
401                trace::duration!(
402                    c"storage",
403                    c"block_client::send::start",
404                    "op" => opcode_str(request.command.opcode)
405                );
406                let request_id = state.next_request_id;
407                state.next_request_id = state.next_request_id.overflowing_add(1).0;
408                assert!(
409                    state.map.insert(request_id, RequestState::default()).is_none(),
410                    "request id in use!"
411                );
412                request.reqid = request_id;
413                if request.trace_flow_id == NO_TRACE_ID {
414                    request.trace_flow_id = generate_trace_flow_id(request_id);
415                }
416                let trace_flow_id = request.trace_flow_id;
417                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
418                state.queue.push_back(request);
419                if let Some(waker) = state.poller_waker.clone() {
420                    state.poll_send_requests(&mut Context::from_waker(&waker));
421                }
422                (request_id, trace_flow_id)
423            };
424            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
425            trace::duration!(c"storage", c"block_client::send::end");
426            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
427            Ok(())
428        }
429        .await
430    }
431
432    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
433        self.send(BlockFifoRequest {
434            command: BlockFifoCommand {
435                opcode: BlockOpcode::CloseVmo.into_primitive(),
436                flags: 0,
437                ..Default::default()
438            },
439            vmoid: vmo_id.into_id(),
440            ..Default::default()
441        })
442        .await
443    }
444
445    async fn read_at(
446        &self,
447        buffer_slice: MutableBufferSlice<'_>,
448        device_offset: u64,
449        trace_flow_id: u64,
450    ) -> Result<(), zx::Status> {
451        match buffer_slice {
452            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
453                self.send(BlockFifoRequest {
454                    command: BlockFifoCommand {
455                        opcode: BlockOpcode::Read.into_primitive(),
456                        flags: 0,
457                        ..Default::default()
458                    },
459                    vmoid: vmo_id.id(),
460                    length: self
461                        .to_blocks(length)?
462                        .try_into()
463                        .map_err(|_| zx::Status::INVALID_ARGS)?,
464                    vmo_offset: self.to_blocks(offset)?,
465                    dev_offset: self.to_blocks(device_offset)?,
466                    trace_flow_id,
467                    ..Default::default()
468                })
469                .await?
470            }
471            MutableBufferSlice::Memory(mut slice) => {
472                let temp_vmo = self.temp_vmo.lock().await;
473                let mut device_block = self.to_blocks(device_offset)?;
474                loop {
475                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
476                    let block_count = self.to_blocks(to_do as u64)? as u32;
477                    self.send(BlockFifoRequest {
478                        command: BlockFifoCommand {
479                            opcode: BlockOpcode::Read.into_primitive(),
480                            flags: 0,
481                            ..Default::default()
482                        },
483                        vmoid: self.temp_vmo_id.id(),
484                        length: block_count,
485                        vmo_offset: 0,
486                        dev_offset: device_block,
487                        trace_flow_id,
488                        ..Default::default()
489                    })
490                    .await?;
491                    temp_vmo.read(&mut slice[..to_do], 0)?;
492                    if to_do == slice.len() {
493                        break;
494                    }
495                    device_block += block_count as u64;
496                    slice = &mut slice[to_do..];
497                }
498            }
499        }
500        Ok(())
501    }
502
503    async fn write_at(
504        &self,
505        buffer_slice: BufferSlice<'_>,
506        device_offset: u64,
507        opts: WriteOptions,
508        trace_flow_id: u64,
509    ) -> Result<(), zx::Status> {
510        let flags = if opts.contains(WriteOptions::FORCE_ACCESS) {
511            BlockIoFlag::FORCE_ACCESS.bits()
512        } else {
513            0
514        };
515        match buffer_slice {
516            BufferSlice::VmoId { vmo_id, offset, length } => {
517                self.send(BlockFifoRequest {
518                    command: BlockFifoCommand {
519                        opcode: BlockOpcode::Write.into_primitive(),
520                        flags,
521                        ..Default::default()
522                    },
523                    vmoid: vmo_id.id(),
524                    length: self
525                        .to_blocks(length)?
526                        .try_into()
527                        .map_err(|_| zx::Status::INVALID_ARGS)?,
528                    vmo_offset: self.to_blocks(offset)?,
529                    dev_offset: self.to_blocks(device_offset)?,
530                    trace_flow_id,
531                    ..Default::default()
532                })
533                .await?;
534            }
535            BufferSlice::Memory(mut slice) => {
536                let temp_vmo = self.temp_vmo.lock().await;
537                let mut device_block = self.to_blocks(device_offset)?;
538                loop {
539                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
540                    let block_count = self.to_blocks(to_do as u64)? as u32;
541                    temp_vmo.write(&slice[..to_do], 0)?;
542                    self.send(BlockFifoRequest {
543                        command: BlockFifoCommand {
544                            opcode: BlockOpcode::Write.into_primitive(),
545                            flags,
546                            ..Default::default()
547                        },
548                        vmoid: self.temp_vmo_id.id(),
549                        length: block_count,
550                        vmo_offset: 0,
551                        dev_offset: device_block,
552                        trace_flow_id,
553                        ..Default::default()
554                    })
555                    .await?;
556                    if to_do == slice.len() {
557                        break;
558                    }
559                    device_block += block_count as u64;
560                    slice = &slice[to_do..];
561                }
562            }
563        }
564        Ok(())
565    }
566
567    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
568        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
569        let dev_offset = self.to_blocks(device_range.start)?;
570        self.send(BlockFifoRequest {
571            command: BlockFifoCommand {
572                opcode: BlockOpcode::Trim.into_primitive(),
573                flags: 0,
574                ..Default::default()
575            },
576            vmoid: block_driver::BLOCK_VMOID_INVALID,
577            length,
578            dev_offset,
579            trace_flow_id,
580            ..Default::default()
581        })
582        .await
583    }
584
585    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
586        self.send(BlockFifoRequest {
587            command: BlockFifoCommand {
588                opcode: BlockOpcode::Flush.into_primitive(),
589                flags: 0,
590                ..Default::default()
591            },
592            vmoid: block_driver::BLOCK_VMOID_INVALID,
593            trace_flow_id,
594            ..Default::default()
595        })
596        .await
597    }
598
599    fn block_size(&self) -> u32 {
600        self.block_size
601    }
602
603    fn block_count(&self) -> u64 {
604        self.block_count
605    }
606
607    fn block_flags(&self) -> BlockFlags {
608        self.block_flags
609    }
610
611    fn is_connected(&self) -> bool {
612        self.fifo_state.lock().fifo.is_some()
613    }
614}
615
616impl Drop for Common {
617    fn drop(&mut self) {
618        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
619        // down.
620        let _ = self.temp_vmo_id.take().into_id();
621        self.fifo_state.lock().terminate();
622    }
623}
624
625/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
626pub struct RemoteBlockClient {
627    session: block::SessionProxy,
628    common: Common,
629}
630
631pub trait AsBlockProxy {
632    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
633
634    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
635}
636
637impl<T: AsBlockProxy> AsBlockProxy for &T {
638    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
639        AsBlockProxy::get_info(*self)
640    }
641    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
642        AsBlockProxy::open_session(*self, session)
643    }
644}
645
646macro_rules! impl_as_block_proxy {
647    ($name:ident) => {
648        impl AsBlockProxy for $name {
649            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
650                $name::get_info(self).await
651            }
652
653            fn open_session(
654                &self,
655                session: ServerEnd<block::SessionMarker>,
656            ) -> Result<(), fidl::Error> {
657                $name::open_session(self, session)
658            }
659        }
660    };
661}
662
663impl_as_block_proxy!(BlockProxy);
664impl_as_block_proxy!(PartitionProxy);
665impl_as_block_proxy!(VolumeProxy);
666
667impl RemoteBlockClient {
668    /// Returns a connection to a remote block device via the given channel.
669    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
670        let info =
671            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
672        let (session, server) = fidl::endpoints::create_proxy();
673        let () = remote.open_session(server).map_err(fidl_to_status)?;
674        Self::from_session(info, session).await
675    }
676
677    pub async fn from_session(
678        info: block::BlockInfo,
679        session: block::SessionProxy,
680    ) -> Result<Self, zx::Status> {
681        let fifo =
682            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
683        let fifo = fasync::Fifo::from_fifo(fifo);
684        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
685        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
686        let vmo_id =
687            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
688        let vmo_id = VmoId::new(vmo_id.id);
689        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
690    }
691}
692
693#[async_trait]
694impl BlockClient for RemoteBlockClient {
695    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
696        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
697        let vmo_id = self
698            .session
699            .attach_vmo(dup)
700            .await
701            .map_err(fidl_to_status)?
702            .map_err(zx::Status::from_raw)?;
703        Ok(VmoId::new(vmo_id.id))
704    }
705
706    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
707        self.common.detach_vmo(vmo_id).await
708    }
709
710    async fn read_at_traced(
711        &self,
712        buffer_slice: MutableBufferSlice<'_>,
713        device_offset: u64,
714        trace_flow_id: u64,
715    ) -> Result<(), zx::Status> {
716        self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
717    }
718
719    async fn write_at_with_opts_traced(
720        &self,
721        buffer_slice: BufferSlice<'_>,
722        device_offset: u64,
723        opts: WriteOptions,
724        trace_flow_id: u64,
725    ) -> Result<(), zx::Status> {
726        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
727    }
728
729    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
730        self.common.trim(range, trace_flow_id).await
731    }
732
733    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
734        self.common.flush(trace_flow_id).await
735    }
736
737    async fn close(&self) -> Result<(), zx::Status> {
738        let () =
739            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
740        Ok(())
741    }
742
743    fn block_size(&self) -> u32 {
744        self.common.block_size()
745    }
746
747    fn block_count(&self) -> u64 {
748        self.common.block_count()
749    }
750
751    fn block_flags(&self) -> BlockFlags {
752        self.common.block_flags()
753    }
754
755    fn is_connected(&self) -> bool {
756        self.common.is_connected()
757    }
758}
759
760pub struct RemoteBlockClientSync {
761    session: block::SessionSynchronousProxy,
762    common: Common,
763}
764
765impl RemoteBlockClientSync {
766    /// Returns a connection to a remote block device via the given channel, but spawns a separate
767    /// thread for polling the fifo which makes it work in cases where no executor is configured for
768    /// the calling thread.
769    pub fn new(
770        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
771    ) -> Result<Self, zx::Status> {
772        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
773        let info = remote
774            .get_info(zx::MonotonicInstant::INFINITE)
775            .map_err(fidl_to_status)?
776            .map_err(zx::Status::from_raw)?;
777        let (client, server) = fidl::endpoints::create_endpoints();
778        let () = remote.open_session(server).map_err(fidl_to_status)?;
779        let session = block::SessionSynchronousProxy::new(client.into_channel());
780        let fifo = session
781            .get_fifo(zx::MonotonicInstant::INFINITE)
782            .map_err(fidl_to_status)?
783            .map_err(zx::Status::from_raw)?;
784        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
785        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
786        let vmo_id = session
787            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
788            .map_err(fidl_to_status)?
789            .map_err(zx::Status::from_raw)?;
790        let vmo_id = VmoId::new(vmo_id.id);
791
792        // The fifo needs to be instantiated from the thread that has the executor as that's where
793        // the fifo registers for notifications to be delivered.
794        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
795        std::thread::spawn(move || {
796            let mut executor = fasync::LocalExecutor::new();
797            let fifo = fasync::Fifo::from_fifo(fifo);
798            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
799            let fifo_state = common.fifo_state.clone();
800            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
801            executor.run_singlethreaded(FifoPoller { fifo_state });
802        });
803        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
804    }
805
806    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
807        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
808        let vmo_id = self
809            .session
810            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
811            .map_err(fidl_to_status)?
812            .map_err(zx::Status::from_raw)?;
813        Ok(VmoId::new(vmo_id.id))
814    }
815
816    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
817        block_on(self.common.detach_vmo(vmo_id))
818    }
819
820    pub fn read_at(
821        &self,
822        buffer_slice: MutableBufferSlice<'_>,
823        device_offset: u64,
824    ) -> Result<(), zx::Status> {
825        block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
826    }
827
828    pub fn write_at(
829        &self,
830        buffer_slice: BufferSlice<'_>,
831        device_offset: u64,
832    ) -> Result<(), zx::Status> {
833        block_on(self.common.write_at(
834            buffer_slice,
835            device_offset,
836            WriteOptions::empty(),
837            NO_TRACE_ID,
838        ))
839    }
840
841    pub fn flush(&self) -> Result<(), zx::Status> {
842        block_on(self.common.flush(NO_TRACE_ID))
843    }
844
845    pub fn close(&self) -> Result<(), zx::Status> {
846        let () = self
847            .session
848            .close(zx::MonotonicInstant::INFINITE)
849            .map_err(fidl_to_status)?
850            .map_err(zx::Status::from_raw)?;
851        Ok(())
852    }
853
854    pub fn block_size(&self) -> u32 {
855        self.common.block_size()
856    }
857
858    pub fn block_count(&self) -> u64 {
859        self.common.block_count()
860    }
861
862    pub fn is_connected(&self) -> bool {
863        self.common.is_connected()
864    }
865}
866
867impl Drop for RemoteBlockClientSync {
868    fn drop(&mut self) {
869        // Ignore errors here as there is not much we can do about it.
870        let _ = self.close();
871    }
872}
873
874// FifoPoller is a future responsible for sending and receiving from the fifo.
875struct FifoPoller {
876    fifo_state: FifoStateRef,
877}
878
879impl Future for FifoPoller {
880    type Output = ();
881
882    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
883        let mut state_lock = self.fifo_state.lock();
884        let state = state_lock.deref_mut(); // So that we can split the borrow.
885
886        // Send requests.
887        if state.poll_send_requests(context) {
888            return Poll::Ready(());
889        }
890
891        // Receive responses.
892        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
893        loop {
894            let mut response = MaybeUninit::uninit();
895            match fifo.try_read(context, &mut response) {
896                Poll::Pending => {
897                    state.poller_waker = Some(context.waker().clone());
898                    return Poll::Pending;
899                }
900                Poll::Ready(Ok(_)) => {
901                    let response = unsafe { response.assume_init() };
902                    let request_id = response.reqid;
903                    // If the request isn't in the map, assume that it's a cancelled read.
904                    if let Some(request_state) = state.map.get_mut(&request_id) {
905                        request_state.result.replace(zx::Status::from_raw(response.status));
906                        if let Some(waker) = request_state.waker.take() {
907                            waker.wake();
908                        }
909                    }
910                }
911                Poll::Ready(Err(_)) => {
912                    state.terminate();
913                    return Poll::Ready(());
914                }
915            }
916        }
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use super::{
923        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
924        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
925    };
926    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
927    use fidl::endpoints::RequestStream as _;
928    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
929    use futures::join;
930    use futures::stream::futures_unordered::FuturesUnordered;
931    use futures::stream::StreamExt as _;
932    use ramdevice_client::RamdiskClient;
933    use std::borrow::Cow;
934    use std::num::NonZero;
935    use std::sync::atomic::{AtomicBool, Ordering};
936    use std::sync::Arc;
937    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
938
939    const RAMDISK_BLOCK_SIZE: u64 = 1024;
940    const RAMDISK_BLOCK_COUNT: u64 = 1024;
941
942    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
943        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
944            .await
945            .expect("RamdiskClient::create failed");
946        let client_end = ramdisk.open().expect("ramdisk.open failed");
947        let proxy = client_end.into_proxy();
948        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
949        assert_eq!(block_client.block_size(), 1024);
950        let client_end = ramdisk.open().expect("ramdisk.open failed");
951        let proxy = client_end.into_proxy();
952        (ramdisk, proxy, block_client)
953    }
954
955    #[fuchsia::test]
956    async fn test_against_ram_disk() {
957        let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
958
959        let stats_before = block_proxy
960            .get_stats(false)
961            .await
962            .expect("get_stats failed")
963            .map_err(zx::Status::from_raw)
964            .expect("get_stats error");
965
966        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
967        vmo.write(b"hello", 5).expect("vmo.write failed");
968        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
969        block_client
970            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
971            .await
972            .expect("write_at failed");
973        block_client
974            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
975            .await
976            .expect("read_at failed");
977        let mut buf: [u8; 5] = Default::default();
978        vmo.read(&mut buf, 1029).expect("vmo.read failed");
979        assert_eq!(&buf, b"hello");
980        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
981
982        // check that the stats are what we expect them to be
983        let stats_after = block_proxy
984            .get_stats(false)
985            .await
986            .expect("get_stats failed")
987            .map_err(zx::Status::from_raw)
988            .expect("get_stats error");
989        // write stats
990        assert_eq!(
991            stats_before.write.success.total_calls + 1,
992            stats_after.write.success.total_calls
993        );
994        assert_eq!(
995            stats_before.write.success.bytes_transferred + 1024,
996            stats_after.write.success.bytes_transferred
997        );
998        assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
999        // read stats
1000        assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1001        assert_eq!(
1002            stats_before.read.success.bytes_transferred + 2048,
1003            stats_after.read.success.bytes_transferred
1004        );
1005        assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1006    }
1007
1008    #[fuchsia::test]
1009    async fn test_against_ram_disk_with_flush() {
1010        let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
1011
1012        let stats_before = block_proxy
1013            .get_stats(false)
1014            .await
1015            .expect("get_stats failed")
1016            .map_err(zx::Status::from_raw)
1017            .expect("get_stats error");
1018
1019        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1020        vmo.write(b"hello", 5).expect("vmo.write failed");
1021        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1022        block_client
1023            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1024            .await
1025            .expect("write_at failed");
1026        block_client.flush().await.expect("flush failed");
1027        block_client
1028            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1029            .await
1030            .expect("read_at failed");
1031        let mut buf: [u8; 5] = Default::default();
1032        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1033        assert_eq!(&buf, b"hello");
1034        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1035
1036        // check that the stats are what we expect them to be
1037        let stats_after = block_proxy
1038            .get_stats(false)
1039            .await
1040            .expect("get_stats failed")
1041            .map_err(zx::Status::from_raw)
1042            .expect("get_stats error");
1043        // write stats
1044        assert_eq!(
1045            stats_before.write.success.total_calls + 1,
1046            stats_after.write.success.total_calls
1047        );
1048        assert_eq!(
1049            stats_before.write.success.bytes_transferred + 1024,
1050            stats_after.write.success.bytes_transferred
1051        );
1052        assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1053        // flush stats
1054        assert_eq!(
1055            stats_before.flush.success.total_calls + 1,
1056            stats_after.flush.success.total_calls
1057        );
1058        assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
1059        // read stats
1060        assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1061        assert_eq!(
1062            stats_before.read.success.bytes_transferred + 2048,
1063            stats_after.read.success.bytes_transferred
1064        );
1065        assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1066    }
1067
1068    #[fuchsia::test]
1069    async fn test_alignment() {
1070        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1071        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1072        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1073        block_client
1074            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1075            .await
1076            .expect_err("expected failure due to bad alignment");
1077        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1078    }
1079
1080    #[fuchsia::test]
1081    async fn test_parallel_io() {
1082        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1083        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1084        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1085        let mut reads = Vec::new();
1086        for _ in 0..1024 {
1087            reads.push(
1088                block_client
1089                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1090                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1091            );
1092        }
1093        futures::future::join_all(reads).await;
1094        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1095    }
1096
1097    #[fuchsia::test]
1098    async fn test_closed_device() {
1099        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1100        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1101        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1102        let mut reads = Vec::new();
1103        for _ in 0..1024 {
1104            reads.push(
1105                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1106            );
1107        }
1108        assert!(block_client.is_connected());
1109        let _ = futures::join!(futures::future::join_all(reads), async {
1110            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1111        });
1112        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1113        while block_client
1114            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1115            .await
1116            .is_ok()
1117        {}
1118
1119        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1120        // get false-positives from is_connected.
1121        while block_client.is_connected() {
1122            // Sleep for a bit to minimise lock contention.
1123            fasync::Timer::new(fasync::MonotonicInstant::after(
1124                zx::MonotonicDuration::from_millis(500),
1125            ))
1126            .await;
1127        }
1128
1129        // But once is_connected goes negative, it should stay negative.
1130        assert_eq!(block_client.is_connected(), false);
1131        let _ = block_client.detach_vmo(vmo_id).await;
1132    }
1133
1134    #[fuchsia::test]
1135    async fn test_cancelled_reads() {
1136        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1137        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1138        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1139        {
1140            let mut reads = FuturesUnordered::new();
1141            for _ in 0..1024 {
1142                reads.push(
1143                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1144                );
1145            }
1146            // Read the first 500 results and then dump the rest.
1147            for _ in 0..500 {
1148                reads.next().await;
1149            }
1150        }
1151        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1152    }
1153
1154    #[fuchsia::test]
1155    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1156        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1157        let block_client_ref = &block_client;
1158        let test_one = |offset, len, fill| async move {
1159            let buf = vec![fill; len];
1160            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1161            // Read back an extra block either side.
1162            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1163            block_client_ref
1164                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1165                .await
1166                .expect("read_at failed");
1167            assert_eq!(
1168                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1169                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1170            );
1171            assert_eq!(
1172                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1173                &buf[..]
1174            );
1175            assert_eq!(
1176                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1177                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1178            );
1179        };
1180        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1181        join!(
1182            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1183            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1184        );
1185    }
1186
1187    // Implements dummy server which can be used by test cases to verify whether
1188    // channel messages and fifo operations are being received - by using set_channel_handler or
1189    // set_fifo_hander respectively
1190    struct FakeBlockServer<'a> {
1191        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1192        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1193        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1194    }
1195
1196    impl<'a> FakeBlockServer<'a> {
1197        // Creates a new FakeBlockServer given a channel to listen on.
1198        //
1199        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1200        // handles requests received from channel or the fifo respectfully.
1201        //
1202        // 'channel_handler' receives a message before it is handled by the default implementation
1203        // and can return 'true' to indicate all processing is done and no further processing of
1204        // that message is required
1205        //
1206        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1207        // FakeBlockServer will send over the fifo.
1208        fn new(
1209            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1210            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1211            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1212        ) -> FakeBlockServer<'a> {
1213            FakeBlockServer {
1214                server_channel: Some(server_channel),
1215                channel_handler: Box::new(channel_handler),
1216                fifo_handler: Box::new(fifo_handler),
1217            }
1218        }
1219
1220        // Runs the server
1221        async fn run(&mut self) {
1222            let server = self.server_channel.take().unwrap();
1223
1224            // Set up a mock server.
1225            let (server_fifo, client_fifo) =
1226                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1227                    .expect("Fifo::create failed");
1228            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1229
1230            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1231            let fifo_future = Abortable::new(
1232                async {
1233                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1234                    let (mut reader, mut writer) = fifo.async_io();
1235                    let mut request = BlockFifoRequest::default();
1236                    loop {
1237                        match reader.read_entries(&mut request).await {
1238                            Ok(1) => {}
1239                            Err(zx::Status::PEER_CLOSED) => break,
1240                            Err(e) => panic!("read_entry failed {:?}", e),
1241                            _ => unreachable!(),
1242                        };
1243
1244                        let response = self.fifo_handler.as_ref()(request);
1245                        writer
1246                            .write_entries(std::slice::from_ref(&response))
1247                            .await
1248                            .expect("write_entries failed");
1249                    }
1250                },
1251                fifo_future_abort_registration,
1252            );
1253
1254            let channel_future = async {
1255                server
1256                    .into_stream()
1257                    .for_each_concurrent(None, |request| async {
1258                        let request = request.expect("unexpected fidl error");
1259
1260                        match request {
1261                            block::BlockRequest::GetInfo { responder } => {
1262                                responder
1263                                    .send(Ok(&block::BlockInfo {
1264                                        block_count: 1024,
1265                                        block_size: 512,
1266                                        max_transfer_size: 1024 * 1024,
1267                                        flags: block::Flag::empty(),
1268                                    }))
1269                                    .expect("send failed");
1270                            }
1271                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1272                                let stream = session.into_stream();
1273                                stream
1274                                    .for_each(|request| async {
1275                                        let request = request.expect("unexpected fidl error");
1276                                        // Give a chance for the test to register and potentially
1277                                        // handle the event
1278                                        if self.channel_handler.as_ref()(&request) {
1279                                            return;
1280                                        }
1281                                        match request {
1282                                            block::SessionRequest::GetFifo { responder } => {
1283                                                match maybe_server_fifo.lock().take() {
1284                                                    Some(fifo) => {
1285                                                        responder.send(Ok(fifo.downcast()))
1286                                                    }
1287                                                    None => responder.send(Err(
1288                                                        zx::Status::NO_RESOURCES.into_raw(),
1289                                                    )),
1290                                                }
1291                                                .expect("send failed")
1292                                            }
1293                                            block::SessionRequest::AttachVmo {
1294                                                vmo: _,
1295                                                responder,
1296                                            } => responder
1297                                                .send(Ok(&block::VmoId { id: 1 }))
1298                                                .expect("send failed"),
1299                                            block::SessionRequest::Close { responder } => {
1300                                                fifo_future_abort.abort();
1301                                                responder.send(Ok(())).expect("send failed")
1302                                            }
1303                                        }
1304                                    })
1305                                    .await
1306                            }
1307                            _ => panic!("Unexpected message"),
1308                        }
1309                    })
1310                    .await;
1311            };
1312
1313            let _result = join!(fifo_future, channel_future);
1314            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1315        }
1316    }
1317
1318    #[fuchsia::test]
1319    async fn test_block_close_is_called() {
1320        let close_called = fuchsia_sync::Mutex::new(false);
1321        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1322
1323        std::thread::spawn(move || {
1324            let _block_client =
1325                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1326            // The drop here should cause Close to be sent.
1327        });
1328
1329        let channel_handler = |request: &block::SessionRequest| -> bool {
1330            if let block::SessionRequest::Close { .. } = request {
1331                *close_called.lock() = true;
1332            }
1333            false
1334        };
1335        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1336
1337        // After the server has finished running, we can check to see that close was called.
1338        assert!(*close_called.lock());
1339    }
1340
1341    #[fuchsia::test]
1342    async fn test_block_flush_is_called() {
1343        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1344
1345        struct Interface {
1346            flush_called: Arc<AtomicBool>,
1347        }
1348        impl block_server::async_interface::Interface for Interface {
1349            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1350                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1351                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1352                    block_range: Some(0..1000),
1353                    type_guid: [0; 16],
1354                    instance_guid: [0; 16],
1355                    name: "foo".to_string(),
1356                    flags: 0,
1357                })))
1358            }
1359
1360            async fn read(
1361                &self,
1362                _device_block_offset: u64,
1363                _block_count: u32,
1364                _vmo: &Arc<zx::Vmo>,
1365                _vmo_offset: u64,
1366                _trace_flow_id: Option<NonZero<u64>>,
1367            ) -> Result<(), zx::Status> {
1368                unreachable!();
1369            }
1370
1371            async fn write(
1372                &self,
1373                _device_block_offset: u64,
1374                _block_count: u32,
1375                _vmo: &Arc<zx::Vmo>,
1376                _vmo_offset: u64,
1377                _opts: WriteOptions,
1378                _trace_flow_id: Option<NonZero<u64>>,
1379            ) -> Result<(), zx::Status> {
1380                unreachable!();
1381            }
1382
1383            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1384                self.flush_called.store(true, Ordering::Relaxed);
1385                Ok(())
1386            }
1387
1388            async fn trim(
1389                &self,
1390                _device_block_offset: u64,
1391                _block_count: u32,
1392                _trace_flow_id: Option<NonZero<u64>>,
1393            ) -> Result<(), zx::Status> {
1394                unreachable!();
1395            }
1396        }
1397
1398        let flush_called = Arc::new(AtomicBool::new(false));
1399
1400        futures::join!(
1401            async {
1402                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1403
1404                block_client.flush().await.expect("flush failed");
1405            },
1406            async {
1407                let block_server = BlockServer::new(
1408                    512,
1409                    Arc::new(Interface { flush_called: flush_called.clone() }),
1410                );
1411                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1412            }
1413        );
1414
1415        assert!(flush_called.load(Ordering::Relaxed));
1416    }
1417
1418    #[fuchsia::test]
1419    async fn test_trace_flow_ids_set() {
1420        let (proxy, server) = fidl::endpoints::create_proxy();
1421
1422        futures::join!(
1423            async {
1424                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1425                block_client.flush().await.expect("flush failed");
1426            },
1427            async {
1428                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1429                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1430                    if request.trace_flow_id > 0 {
1431                        *flow_id.lock() = Some(request.trace_flow_id);
1432                    }
1433                    BlockFifoResponse {
1434                        status: zx::Status::OK.into_raw(),
1435                        reqid: request.reqid,
1436                        ..Default::default()
1437                    }
1438                };
1439                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1440                // After the server has finished running, verify the trace flow ID was set to some value.
1441                assert!(flow_id.lock().is_some());
1442            }
1443        );
1444    }
1445}