1use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
14use fidl_fuchsia_hardware_block_partition::PartitionProxy;
15use fidl_fuchsia_hardware_block_volume::VolumeProxy;
16use fuchsia_sync::Mutex;
17use futures::channel::oneshot;
18use futures::executor::block_on;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use zx::sys::zx_handle_t;
30use zx::{self as zx, HandleBased as _};
31use {
32 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
33 fuchsia_async as fasync, storage_trace as trace,
34};
35
36pub use cache::Cache;
37
38pub use block::Flag as BlockFlags;
39
40pub use block_protocol::*;
41
42pub mod cache;
43
44const TEMP_VMO_SIZE: usize = 65536;
45
46pub const NO_TRACE_ID: u64 = 0;
48
49pub use block_driver::{BlockIoFlag, BlockOpcode};
50
51fn fidl_to_status(error: fidl::Error) -> zx::Status {
52 match error {
53 fidl::Error::ClientChannelClosed { status, .. } => status,
54 _ => zx::Status::INTERNAL,
55 }
56}
57
58fn opcode_str(opcode: u8) -> &'static str {
59 match BlockOpcode::from_primitive(opcode) {
60 Some(BlockOpcode::Read) => "read",
61 Some(BlockOpcode::Write) => "write",
62 Some(BlockOpcode::Flush) => "flush",
63 Some(BlockOpcode::Trim) => "trim",
64 Some(BlockOpcode::CloseVmo) => "close_vmo",
65 None => "unknown",
66 }
67}
68
69fn generate_trace_flow_id(request_id: u32) -> u64 {
72 static SELF_HANDLE: LazyLock<zx_handle_t> =
73 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
74 *SELF_HANDLE as u64 + (request_id as u64) << 32
75}
76
77pub enum BufferSlice<'a> {
78 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
79 Memory(&'a [u8]),
80}
81
82impl<'a> BufferSlice<'a> {
83 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
84 BufferSlice::VmoId { vmo_id, offset, length }
85 }
86}
87
88impl<'a> From<&'a [u8]> for BufferSlice<'a> {
89 fn from(buf: &'a [u8]) -> Self {
90 BufferSlice::Memory(buf)
91 }
92}
93
94pub enum MutableBufferSlice<'a> {
95 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
96 Memory(&'a mut [u8]),
97}
98
99impl<'a> MutableBufferSlice<'a> {
100 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
101 MutableBufferSlice::VmoId { vmo_id, offset, length }
102 }
103}
104
105impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
106 fn from(buf: &'a mut [u8]) -> Self {
107 MutableBufferSlice::Memory(buf)
108 }
109}
110
111#[derive(Default)]
112struct RequestState {
113 result: Option<zx::Status>,
114 waker: Option<Waker>,
115}
116
117#[derive(Default)]
118struct FifoState {
119 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
121
122 next_request_id: u32,
124
125 queue: std::collections::VecDeque<BlockFifoRequest>,
127
128 map: HashMap<u32, RequestState>,
130
131 poller_waker: Option<Waker>,
133
134 attach_barrier: bool,
136}
137
138impl FifoState {
139 fn terminate(&mut self) {
140 self.fifo.take();
141 for (_, request_state) in self.map.iter_mut() {
142 request_state.result.get_or_insert(zx::Status::CANCELED);
143 if let Some(waker) = request_state.waker.take() {
144 waker.wake();
145 }
146 }
147 if let Some(waker) = self.poller_waker.take() {
148 waker.wake();
149 }
150 }
151
152 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
154 let fifo = if let Some(fifo) = self.fifo.as_ref() {
155 fifo
156 } else {
157 return true;
158 };
159
160 loop {
161 let slice = self.queue.as_slices().0;
162 if slice.is_empty() {
163 return false;
164 }
165 match fifo.try_write(context, slice) {
166 Poll::Ready(Ok(sent)) => {
167 self.queue.drain(0..sent);
168 }
169 Poll::Ready(Err(_)) => {
170 self.terminate();
171 return true;
172 }
173 Poll::Pending => {
174 return false;
175 }
176 }
177 }
178 }
179}
180
181type FifoStateRef = Arc<Mutex<FifoState>>;
182
183struct ResponseFuture {
185 request_id: u32,
186 fifo_state: FifoStateRef,
187}
188
189impl ResponseFuture {
190 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
191 ResponseFuture { request_id, fifo_state }
192 }
193}
194
195impl Future for ResponseFuture {
196 type Output = Result<(), zx::Status>;
197
198 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
199 let mut state = self.fifo_state.lock();
200 let request_state = state.map.get_mut(&self.request_id).unwrap();
201 if let Some(result) = request_state.result {
202 Poll::Ready(result.into())
203 } else {
204 request_state.waker.replace(context.waker().clone());
205 Poll::Pending
206 }
207 }
208}
209
210impl Drop for ResponseFuture {
211 fn drop(&mut self) {
212 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
213 }
214}
215
216#[derive(Debug)]
218#[must_use]
219pub struct VmoId(AtomicU16);
220
221impl VmoId {
222 pub fn new(id: u16) -> Self {
224 Self(AtomicU16::new(id))
225 }
226
227 pub fn take(&self) -> Self {
229 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
230 }
231
232 pub fn is_valid(&self) -> bool {
233 self.id() != block_driver::BLOCK_VMOID_INVALID
234 }
235
236 #[must_use]
238 pub fn into_id(self) -> u16 {
239 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
240 }
241
242 pub fn id(&self) -> u16 {
243 self.0.load(Ordering::Relaxed)
244 }
245}
246
247impl PartialEq for VmoId {
248 fn eq(&self, other: &Self) -> bool {
249 self.id() == other.id()
250 }
251}
252
253impl Eq for VmoId {}
254
255impl Drop for VmoId {
256 fn drop(&mut self) {
257 assert_eq!(
258 self.0.load(Ordering::Relaxed),
259 block_driver::BLOCK_VMOID_INVALID,
260 "Did you forget to detach?"
261 );
262 }
263}
264
265impl Hash for VmoId {
266 fn hash<H: Hasher>(&self, state: &mut H) {
267 self.id().hash(state);
268 }
269}
270
271pub trait BlockClient: Send + Sync {
275 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
277
278 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
280
281 fn read_at(
283 &self,
284 buffer_slice: MutableBufferSlice<'_>,
285 device_offset: u64,
286 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
288 }
289
290 fn read_at_with_opts(
291 &self,
292 buffer_slice: MutableBufferSlice<'_>,
293 device_offset: u64,
294 opts: ReadOptions,
295 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
296 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
297 }
298
299 fn read_at_with_opts_traced(
300 &self,
301 buffer_slice: MutableBufferSlice<'_>,
302 device_offset: u64,
303 opts: ReadOptions,
304 trace_flow_id: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
306
307 fn write_at(
309 &self,
310 buffer_slice: BufferSlice<'_>,
311 device_offset: u64,
312 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
313 self.write_at_with_opts_traced(
314 buffer_slice,
315 device_offset,
316 WriteOptions::default(),
317 NO_TRACE_ID,
318 )
319 }
320
321 fn write_at_with_opts(
322 &self,
323 buffer_slice: BufferSlice<'_>,
324 device_offset: u64,
325 opts: WriteOptions,
326 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
327 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
328 }
329
330 fn write_at_with_opts_traced(
331 &self,
332 buffer_slice: BufferSlice<'_>,
333 device_offset: u64,
334 opts: WriteOptions,
335 trace_flow_id: u64,
336 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
337
338 fn trim(
340 &self,
341 device_range: Range<u64>,
342 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
343 self.trim_traced(device_range, NO_TRACE_ID)
344 }
345
346 fn trim_traced(
347 &self,
348 device_range: Range<u64>,
349 trace_flow_id: u64,
350 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
351
352 fn barrier(&self);
357
358 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
359 self.flush_traced(NO_TRACE_ID)
360 }
361
362 fn flush_traced(
364 &self,
365 trace_flow_id: u64,
366 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
367
368 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
370
371 fn block_size(&self) -> u32;
373
374 fn block_count(&self) -> u64;
376
377 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
379
380 fn block_flags(&self) -> BlockFlags;
382
383 fn is_connected(&self) -> bool;
385}
386
387struct Common {
388 block_size: u32,
389 block_count: u64,
390 max_transfer_blocks: Option<NonZero<u32>>,
391 block_flags: BlockFlags,
392 fifo_state: FifoStateRef,
393 temp_vmo: futures::lock::Mutex<zx::Vmo>,
394 temp_vmo_id: VmoId,
395}
396
397impl Common {
398 fn new(
399 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
400 info: &block::BlockInfo,
401 temp_vmo: zx::Vmo,
402 temp_vmo_id: VmoId,
403 ) -> Self {
404 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
405 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
406 Self {
407 block_size: info.block_size,
408 block_count: info.block_count,
409 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
410 NonZero::new(info.max_transfer_size / info.block_size)
411 } else {
412 None
413 },
414 block_flags: info.flags,
415 fifo_state,
416 temp_vmo: futures::lock::Mutex::new(temp_vmo),
417 temp_vmo_id,
418 }
419 }
420
421 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
422 if bytes % self.block_size as u64 != 0 {
423 Err(zx::Status::INVALID_ARGS)
424 } else {
425 Ok(bytes / self.block_size as u64)
426 }
427 }
428
429 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
431 let (request_id, trace_flow_id) = {
432 let mut state = self.fifo_state.lock();
433
434 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
435 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
436 && state.attach_barrier
437 {
438 flags |= BlockIoFlag::PRE_BARRIER;
439 request.command.flags = flags.bits();
440 state.attach_barrier = false;
441 }
442
443 if state.fifo.is_none() {
444 return Err(zx::Status::CANCELED);
446 }
447 trace::duration!(
448 c"storage",
449 c"block_client::send::start",
450 "op" => opcode_str(request.command.opcode),
451 "len" => request.length * self.block_size
452 );
453 let request_id = state.next_request_id;
454 state.next_request_id = state.next_request_id.overflowing_add(1).0;
455 assert!(
456 state.map.insert(request_id, RequestState::default()).is_none(),
457 "request id in use!"
458 );
459 request.reqid = request_id;
460 if request.trace_flow_id == NO_TRACE_ID {
461 request.trace_flow_id = generate_trace_flow_id(request_id);
462 }
463 let trace_flow_id = request.trace_flow_id;
464 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
465 state.queue.push_back(request);
466 if let Some(waker) = state.poller_waker.clone() {
467 state.poll_send_requests(&mut Context::from_waker(&waker));
468 }
469 (request_id, trace_flow_id)
470 };
471 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
472 trace::duration!(c"storage", c"block_client::send::end");
473 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
474 Ok(())
475 }
476
477 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
478 self.send(BlockFifoRequest {
479 command: BlockFifoCommand {
480 opcode: BlockOpcode::CloseVmo.into_primitive(),
481 flags: 0,
482 ..Default::default()
483 },
484 vmoid: vmo_id.into_id(),
485 ..Default::default()
486 })
487 .await
488 }
489
490 async fn read_at(
491 &self,
492 buffer_slice: MutableBufferSlice<'_>,
493 device_offset: u64,
494 opts: ReadOptions,
495 trace_flow_id: u64,
496 ) -> Result<(), zx::Status> {
497 match buffer_slice {
498 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
499 self.send(BlockFifoRequest {
500 command: BlockFifoCommand {
501 opcode: BlockOpcode::Read.into_primitive(),
502 flags: 0,
503 ..Default::default()
504 },
505 vmoid: vmo_id.id(),
506 length: self
507 .to_blocks(length)?
508 .try_into()
509 .map_err(|_| zx::Status::INVALID_ARGS)?,
510 vmo_offset: self.to_blocks(offset)?,
511 dev_offset: self.to_blocks(device_offset)?,
512 trace_flow_id,
513 dun: opts.inline_crypto_options.dun,
514 slot: opts.inline_crypto_options.slot,
515 ..Default::default()
516 })
517 .await?
518 }
519 MutableBufferSlice::Memory(mut slice) => {
520 let temp_vmo = self.temp_vmo.lock().await;
521 let mut device_block = self.to_blocks(device_offset)?;
522 loop {
523 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
524 let block_count = self.to_blocks(to_do as u64)? as u32;
525 self.send(BlockFifoRequest {
526 command: BlockFifoCommand {
527 opcode: BlockOpcode::Read.into_primitive(),
528 flags: 0,
529 ..Default::default()
530 },
531 vmoid: self.temp_vmo_id.id(),
532 length: block_count,
533 vmo_offset: 0,
534 dev_offset: device_block,
535 trace_flow_id,
536 dun: opts.inline_crypto_options.dun,
537 slot: opts.inline_crypto_options.slot,
538 ..Default::default()
539 })
540 .await?;
541 temp_vmo.read(&mut slice[..to_do], 0)?;
542 if to_do == slice.len() {
543 break;
544 }
545 device_block += block_count as u64;
546 slice = &mut slice[to_do..];
547 }
548 }
549 }
550 Ok(())
551 }
552
553 async fn write_at(
554 &self,
555 buffer_slice: BufferSlice<'_>,
556 device_offset: u64,
557 opts: WriteOptions,
558 trace_flow_id: u64,
559 ) -> Result<(), zx::Status> {
560 let mut flags = BlockIoFlag::empty();
561
562 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
563 flags |= BlockIoFlag::FORCE_ACCESS;
564 }
565
566 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
567 flags |= BlockIoFlag::PRE_BARRIER;
568 }
569
570 match buffer_slice {
571 BufferSlice::VmoId { vmo_id, offset, length } => {
572 self.send(BlockFifoRequest {
573 command: BlockFifoCommand {
574 opcode: BlockOpcode::Write.into_primitive(),
575 flags: flags.bits(),
576 ..Default::default()
577 },
578 vmoid: vmo_id.id(),
579 length: self
580 .to_blocks(length)?
581 .try_into()
582 .map_err(|_| zx::Status::INVALID_ARGS)?,
583 vmo_offset: self.to_blocks(offset)?,
584 dev_offset: self.to_blocks(device_offset)?,
585 trace_flow_id,
586 dun: opts.inline_crypto_options.dun,
587 slot: opts.inline_crypto_options.slot,
588 ..Default::default()
589 })
590 .await?;
591 }
592 BufferSlice::Memory(mut slice) => {
593 let temp_vmo = self.temp_vmo.lock().await;
594 let mut device_block = self.to_blocks(device_offset)?;
595 loop {
596 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
597 let block_count = self.to_blocks(to_do as u64)? as u32;
598 temp_vmo.write(&slice[..to_do], 0)?;
599 self.send(BlockFifoRequest {
600 command: BlockFifoCommand {
601 opcode: BlockOpcode::Write.into_primitive(),
602 flags: flags.bits(),
603 ..Default::default()
604 },
605 vmoid: self.temp_vmo_id.id(),
606 length: block_count,
607 vmo_offset: 0,
608 dev_offset: device_block,
609 trace_flow_id,
610 dun: opts.inline_crypto_options.dun,
611 slot: opts.inline_crypto_options.slot,
612 ..Default::default()
613 })
614 .await?;
615 if to_do == slice.len() {
616 break;
617 }
618 device_block += block_count as u64;
619 slice = &slice[to_do..];
620 }
621 }
622 }
623 Ok(())
624 }
625
626 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
627 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
628 let dev_offset = self.to_blocks(device_range.start)?;
629 self.send(BlockFifoRequest {
630 command: BlockFifoCommand {
631 opcode: BlockOpcode::Trim.into_primitive(),
632 flags: 0,
633 ..Default::default()
634 },
635 vmoid: block_driver::BLOCK_VMOID_INVALID,
636 length,
637 dev_offset,
638 trace_flow_id,
639 ..Default::default()
640 })
641 .await
642 }
643
644 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
645 self.send(BlockFifoRequest {
646 command: BlockFifoCommand {
647 opcode: BlockOpcode::Flush.into_primitive(),
648 flags: 0,
649 ..Default::default()
650 },
651 vmoid: block_driver::BLOCK_VMOID_INVALID,
652 trace_flow_id,
653 ..Default::default()
654 })
655 .await
656 }
657
658 fn barrier(&self) {
659 self.fifo_state.lock().attach_barrier = true;
660 }
661
662 fn block_size(&self) -> u32 {
663 self.block_size
664 }
665
666 fn block_count(&self) -> u64 {
667 self.block_count
668 }
669
670 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
671 self.max_transfer_blocks.clone()
672 }
673
674 fn block_flags(&self) -> BlockFlags {
675 self.block_flags
676 }
677
678 fn is_connected(&self) -> bool {
679 self.fifo_state.lock().fifo.is_some()
680 }
681}
682
683impl Drop for Common {
684 fn drop(&mut self) {
685 let _ = self.temp_vmo_id.take().into_id();
688 self.fifo_state.lock().terminate();
689 }
690}
691
692pub struct RemoteBlockClient {
694 session: block::SessionProxy,
695 common: Common,
696}
697
698pub trait AsBlockProxy {
699 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
700
701 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
702}
703
704impl<T: AsBlockProxy> AsBlockProxy for &T {
705 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
706 AsBlockProxy::get_info(*self)
707 }
708 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
709 AsBlockProxy::open_session(*self, session)
710 }
711}
712
713macro_rules! impl_as_block_proxy {
714 ($name:ident) => {
715 impl AsBlockProxy for $name {
716 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
717 $name::get_info(self).await
718 }
719
720 fn open_session(
721 &self,
722 session: ServerEnd<block::SessionMarker>,
723 ) -> Result<(), fidl::Error> {
724 $name::open_session(self, session)
725 }
726 }
727 };
728}
729
730impl_as_block_proxy!(BlockProxy);
731impl_as_block_proxy!(PartitionProxy);
732impl_as_block_proxy!(VolumeProxy);
733
734impl RemoteBlockClient {
735 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
737 let info =
738 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
739 let (session, server) = fidl::endpoints::create_proxy();
740 let () = remote.open_session(server).map_err(fidl_to_status)?;
741 Self::from_session(info, session).await
742 }
743
744 pub async fn from_session(
745 info: block::BlockInfo,
746 session: block::SessionProxy,
747 ) -> Result<Self, zx::Status> {
748 let fifo =
749 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
750 let fifo = fasync::Fifo::from_fifo(fifo);
751 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
752 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
753 let vmo_id =
754 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
755 let vmo_id = VmoId::new(vmo_id.id);
756 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
757 }
758}
759
760impl BlockClient for RemoteBlockClient {
761 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
762 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
763 let vmo_id = self
764 .session
765 .attach_vmo(dup)
766 .await
767 .map_err(fidl_to_status)?
768 .map_err(zx::Status::from_raw)?;
769 Ok(VmoId::new(vmo_id.id))
770 }
771
772 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
773 self.common.detach_vmo(vmo_id).await
774 }
775
776 async fn read_at_with_opts_traced(
777 &self,
778 buffer_slice: MutableBufferSlice<'_>,
779 device_offset: u64,
780 opts: ReadOptions,
781 trace_flow_id: u64,
782 ) -> Result<(), zx::Status> {
783 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
784 }
785
786 async fn write_at_with_opts_traced(
787 &self,
788 buffer_slice: BufferSlice<'_>,
789 device_offset: u64,
790 opts: WriteOptions,
791 trace_flow_id: u64,
792 ) -> Result<(), zx::Status> {
793 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
794 }
795
796 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
797 self.common.trim(range, trace_flow_id).await
798 }
799
800 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
801 self.common.flush(trace_flow_id).await
802 }
803
804 fn barrier(&self) {
805 self.common.barrier()
806 }
807
808 async fn close(&self) -> Result<(), zx::Status> {
809 let () =
810 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
811 Ok(())
812 }
813
814 fn block_size(&self) -> u32 {
815 self.common.block_size()
816 }
817
818 fn block_count(&self) -> u64 {
819 self.common.block_count()
820 }
821
822 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
823 self.common.max_transfer_blocks()
824 }
825
826 fn block_flags(&self) -> BlockFlags {
827 self.common.block_flags()
828 }
829
830 fn is_connected(&self) -> bool {
831 self.common.is_connected()
832 }
833}
834
835pub struct RemoteBlockClientSync {
836 session: block::SessionSynchronousProxy,
837 common: Common,
838}
839
840impl RemoteBlockClientSync {
841 pub fn new(
845 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
846 ) -> Result<Self, zx::Status> {
847 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
848 let info = remote
849 .get_info(zx::MonotonicInstant::INFINITE)
850 .map_err(fidl_to_status)?
851 .map_err(zx::Status::from_raw)?;
852 let (client, server) = fidl::endpoints::create_endpoints();
853 let () = remote.open_session(server).map_err(fidl_to_status)?;
854 let session = block::SessionSynchronousProxy::new(client.into_channel());
855 let fifo = session
856 .get_fifo(zx::MonotonicInstant::INFINITE)
857 .map_err(fidl_to_status)?
858 .map_err(zx::Status::from_raw)?;
859 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
860 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
861 let vmo_id = session
862 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
863 .map_err(fidl_to_status)?
864 .map_err(zx::Status::from_raw)?;
865 let vmo_id = VmoId::new(vmo_id.id);
866
867 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
870 std::thread::spawn(move || {
871 let mut executor = fasync::LocalExecutor::default();
872 let fifo = fasync::Fifo::from_fifo(fifo);
873 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
874 let fifo_state = common.fifo_state.clone();
875 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
876 executor.run_singlethreaded(FifoPoller { fifo_state });
877 });
878 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
879 }
880
881 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
882 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
883 let vmo_id = self
884 .session
885 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
886 .map_err(fidl_to_status)?
887 .map_err(zx::Status::from_raw)?;
888 Ok(VmoId::new(vmo_id.id))
889 }
890
891 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
892 block_on(self.common.detach_vmo(vmo_id))
893 }
894
895 pub fn read_at(
896 &self,
897 buffer_slice: MutableBufferSlice<'_>,
898 device_offset: u64,
899 ) -> Result<(), zx::Status> {
900 block_on(self.common.read_at(
901 buffer_slice,
902 device_offset,
903 ReadOptions::default(),
904 NO_TRACE_ID,
905 ))
906 }
907
908 pub fn write_at(
909 &self,
910 buffer_slice: BufferSlice<'_>,
911 device_offset: u64,
912 ) -> Result<(), zx::Status> {
913 block_on(self.common.write_at(
914 buffer_slice,
915 device_offset,
916 WriteOptions::default(),
917 NO_TRACE_ID,
918 ))
919 }
920
921 pub fn flush(&self) -> Result<(), zx::Status> {
922 block_on(self.common.flush(NO_TRACE_ID))
923 }
924
925 pub fn close(&self) -> Result<(), zx::Status> {
926 let () = self
927 .session
928 .close(zx::MonotonicInstant::INFINITE)
929 .map_err(fidl_to_status)?
930 .map_err(zx::Status::from_raw)?;
931 Ok(())
932 }
933
934 pub fn block_size(&self) -> u32 {
935 self.common.block_size()
936 }
937
938 pub fn block_count(&self) -> u64 {
939 self.common.block_count()
940 }
941
942 pub fn is_connected(&self) -> bool {
943 self.common.is_connected()
944 }
945}
946
947impl Drop for RemoteBlockClientSync {
948 fn drop(&mut self) {
949 let _ = self.close();
951 }
952}
953
954struct FifoPoller {
956 fifo_state: FifoStateRef,
957}
958
959impl Future for FifoPoller {
960 type Output = ();
961
962 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
963 let mut state_lock = self.fifo_state.lock();
964 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
968 return Poll::Ready(());
969 }
970
971 let fifo = state.fifo.as_ref().unwrap(); loop {
974 let mut response = MaybeUninit::uninit();
975 match fifo.try_read(context, &mut response) {
976 Poll::Pending => {
977 state.poller_waker = Some(context.waker().clone());
978 return Poll::Pending;
979 }
980 Poll::Ready(Ok(_)) => {
981 let response = unsafe { response.assume_init() };
982 let request_id = response.reqid;
983 if let Some(request_state) = state.map.get_mut(&request_id) {
985 request_state.result.replace(zx::Status::from_raw(response.status));
986 if let Some(waker) = request_state.waker.take() {
987 waker.wake();
988 }
989 }
990 }
991 Poll::Ready(Err(_)) => {
992 state.terminate();
993 return Poll::Ready(());
994 }
995 }
996 }
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::{
1003 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
1004 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
1005 };
1006 use block_protocol::ReadOptions;
1007 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
1008 use fidl::endpoints::RequestStream as _;
1009 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
1010 use futures::join;
1011 use futures::stream::StreamExt as _;
1012 use futures::stream::futures_unordered::FuturesUnordered;
1013 use ramdevice_client::RamdiskClient;
1014 use std::borrow::Cow;
1015 use std::num::NonZero;
1016 use std::sync::Arc;
1017 use std::sync::atomic::{AtomicBool, Ordering};
1018 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
1019
1020 const RAMDISK_BLOCK_SIZE: u64 = 1024;
1021 const RAMDISK_BLOCK_COUNT: u64 = 1024;
1022
1023 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1024 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1025 .await
1026 .expect("RamdiskClient::create failed");
1027 let client_end = ramdisk.open().expect("ramdisk.open failed");
1028 let proxy = client_end.into_proxy();
1029 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1030 assert_eq!(block_client.block_size(), 1024);
1031 let client_end = ramdisk.open().expect("ramdisk.open failed");
1032 let proxy = client_end.into_proxy();
1033 (ramdisk, proxy, block_client)
1034 }
1035
1036 #[fuchsia::test]
1037 async fn test_against_ram_disk() {
1038 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1039
1040 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1041 vmo.write(b"hello", 5).expect("vmo.write failed");
1042 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1043 block_client
1044 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1045 .await
1046 .expect("write_at failed");
1047 block_client
1048 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1049 .await
1050 .expect("read_at failed");
1051 let mut buf: [u8; 5] = Default::default();
1052 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1053 assert_eq!(&buf, b"hello");
1054 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1055 }
1056
1057 #[fuchsia::test]
1058 async fn test_alignment() {
1059 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1060 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1061 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1062 block_client
1063 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1064 .await
1065 .expect_err("expected failure due to bad alignment");
1066 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1067 }
1068
1069 #[fuchsia::test]
1070 async fn test_parallel_io() {
1071 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1072 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1073 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1074 let mut reads = Vec::new();
1075 for _ in 0..1024 {
1076 reads.push(
1077 block_client
1078 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1079 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1080 );
1081 }
1082 futures::future::join_all(reads).await;
1083 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1084 }
1085
1086 #[fuchsia::test]
1087 async fn test_closed_device() {
1088 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1089 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1090 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1091 let mut reads = Vec::new();
1092 for _ in 0..1024 {
1093 reads.push(
1094 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1095 );
1096 }
1097 assert!(block_client.is_connected());
1098 let _ = futures::join!(futures::future::join_all(reads), async {
1099 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1100 });
1101 while block_client
1103 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1104 .await
1105 .is_ok()
1106 {}
1107
1108 while block_client.is_connected() {
1111 fasync::Timer::new(fasync::MonotonicInstant::after(
1113 zx::MonotonicDuration::from_millis(500),
1114 ))
1115 .await;
1116 }
1117
1118 assert_eq!(block_client.is_connected(), false);
1120 let _ = block_client.detach_vmo(vmo_id).await;
1121 }
1122
1123 #[fuchsia::test]
1124 async fn test_cancelled_reads() {
1125 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1126 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1127 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1128 {
1129 let mut reads = FuturesUnordered::new();
1130 for _ in 0..1024 {
1131 reads.push(
1132 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1133 );
1134 }
1135 for _ in 0..500 {
1137 reads.next().await;
1138 }
1139 }
1140 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1141 }
1142
1143 #[fuchsia::test]
1144 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1145 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1146 let block_client_ref = &block_client;
1147 let test_one = |offset, len, fill| async move {
1148 let buf = vec![fill; len];
1149 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1150 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1152 block_client_ref
1153 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1154 .await
1155 .expect("read_at failed");
1156 assert_eq!(
1157 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1158 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1159 );
1160 assert_eq!(
1161 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1162 &buf[..]
1163 );
1164 assert_eq!(
1165 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1166 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1167 );
1168 };
1169 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1170 join!(
1171 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1172 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1173 );
1174 }
1175
1176 struct FakeBlockServer<'a> {
1180 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1181 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1182 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1183 }
1184
1185 impl<'a> FakeBlockServer<'a> {
1186 fn new(
1198 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1199 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1200 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1201 ) -> FakeBlockServer<'a> {
1202 FakeBlockServer {
1203 server_channel: Some(server_channel),
1204 channel_handler: Box::new(channel_handler),
1205 fifo_handler: Box::new(fifo_handler),
1206 }
1207 }
1208
1209 async fn run(&mut self) {
1211 let server = self.server_channel.take().unwrap();
1212
1213 let (server_fifo, client_fifo) =
1215 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1216 .expect("Fifo::create failed");
1217 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1218
1219 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1220 let fifo_future = Abortable::new(
1221 async {
1222 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1223 let (mut reader, mut writer) = fifo.async_io();
1224 let mut request = BlockFifoRequest::default();
1225 loop {
1226 match reader.read_entries(&mut request).await {
1227 Ok(1) => {}
1228 Err(zx::Status::PEER_CLOSED) => break,
1229 Err(e) => panic!("read_entry failed {:?}", e),
1230 _ => unreachable!(),
1231 };
1232
1233 let response = self.fifo_handler.as_ref()(request);
1234 writer
1235 .write_entries(std::slice::from_ref(&response))
1236 .await
1237 .expect("write_entries failed");
1238 }
1239 },
1240 fifo_future_abort_registration,
1241 );
1242
1243 let channel_future = async {
1244 server
1245 .into_stream()
1246 .for_each_concurrent(None, |request| async {
1247 let request = request.expect("unexpected fidl error");
1248
1249 match request {
1250 block::BlockRequest::GetInfo { responder } => {
1251 responder
1252 .send(Ok(&block::BlockInfo {
1253 block_count: 1024,
1254 block_size: 512,
1255 max_transfer_size: 1024 * 1024,
1256 flags: block::Flag::empty(),
1257 }))
1258 .expect("send failed");
1259 }
1260 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1261 let stream = session.into_stream();
1262 stream
1263 .for_each(|request| async {
1264 let request = request.expect("unexpected fidl error");
1265 if self.channel_handler.as_ref()(&request) {
1268 return;
1269 }
1270 match request {
1271 block::SessionRequest::GetFifo { responder } => {
1272 match maybe_server_fifo.lock().take() {
1273 Some(fifo) => {
1274 responder.send(Ok(fifo.downcast()))
1275 }
1276 None => responder.send(Err(
1277 zx::Status::NO_RESOURCES.into_raw(),
1278 )),
1279 }
1280 .expect("send failed")
1281 }
1282 block::SessionRequest::AttachVmo {
1283 vmo: _,
1284 responder,
1285 } => responder
1286 .send(Ok(&block::VmoId { id: 1 }))
1287 .expect("send failed"),
1288 block::SessionRequest::Close { responder } => {
1289 fifo_future_abort.abort();
1290 responder.send(Ok(())).expect("send failed")
1291 }
1292 }
1293 })
1294 .await
1295 }
1296 _ => panic!("Unexpected message"),
1297 }
1298 })
1299 .await;
1300 };
1301
1302 let _result = join!(fifo_future, channel_future);
1303 }
1305 }
1306
1307 #[fuchsia::test]
1308 async fn test_block_close_is_called() {
1309 let close_called = fuchsia_sync::Mutex::new(false);
1310 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1311
1312 std::thread::spawn(move || {
1313 let _block_client =
1314 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1315 });
1317
1318 let channel_handler = |request: &block::SessionRequest| -> bool {
1319 if let block::SessionRequest::Close { .. } = request {
1320 *close_called.lock() = true;
1321 }
1322 false
1323 };
1324 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1325
1326 assert!(*close_called.lock());
1328 }
1329
1330 #[fuchsia::test]
1331 async fn test_block_flush_is_called() {
1332 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1333
1334 struct Interface {
1335 flush_called: Arc<AtomicBool>,
1336 }
1337 impl block_server::async_interface::Interface for Interface {
1338 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1339 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1340 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1341 max_transfer_blocks: None,
1342 block_range: Some(0..1000),
1343 type_guid: [0; 16],
1344 instance_guid: [0; 16],
1345 name: "foo".to_string(),
1346 flags: 0,
1347 })))
1348 }
1349
1350 async fn read(
1351 &self,
1352 _device_block_offset: u64,
1353 _block_count: u32,
1354 _vmo: &Arc<zx::Vmo>,
1355 _vmo_offset: u64,
1356 _opts: ReadOptions,
1357 _trace_flow_id: Option<NonZero<u64>>,
1358 ) -> Result<(), zx::Status> {
1359 unreachable!();
1360 }
1361
1362 fn barrier(&self) -> Result<(), zx::Status> {
1363 unreachable!()
1364 }
1365
1366 async fn write(
1367 &self,
1368 _device_block_offset: u64,
1369 _block_count: u32,
1370 _vmo: &Arc<zx::Vmo>,
1371 _vmo_offset: u64,
1372 _opts: WriteOptions,
1373 _trace_flow_id: Option<NonZero<u64>>,
1374 ) -> Result<(), zx::Status> {
1375 unreachable!();
1376 }
1377
1378 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1379 self.flush_called.store(true, Ordering::Relaxed);
1380 Ok(())
1381 }
1382
1383 async fn trim(
1384 &self,
1385 _device_block_offset: u64,
1386 _block_count: u32,
1387 _trace_flow_id: Option<NonZero<u64>>,
1388 ) -> Result<(), zx::Status> {
1389 unreachable!();
1390 }
1391 }
1392
1393 let flush_called = Arc::new(AtomicBool::new(false));
1394
1395 futures::join!(
1396 async {
1397 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1398
1399 block_client.flush().await.expect("flush failed");
1400 },
1401 async {
1402 let block_server = BlockServer::new(
1403 512,
1404 Arc::new(Interface { flush_called: flush_called.clone() }),
1405 );
1406 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1407 }
1408 );
1409
1410 assert!(flush_called.load(Ordering::Relaxed));
1411 }
1412
1413 #[fuchsia::test]
1414 async fn test_trace_flow_ids_set() {
1415 let (proxy, server) = fidl::endpoints::create_proxy();
1416
1417 futures::join!(
1418 async {
1419 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1420 block_client.flush().await.expect("flush failed");
1421 },
1422 async {
1423 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1424 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1425 if request.trace_flow_id > 0 {
1426 *flow_id.lock() = Some(request.trace_flow_id);
1427 }
1428 BlockFifoResponse {
1429 status: zx::Status::OK.into_raw(),
1430 reqid: request.reqid,
1431 ..Default::default()
1432 }
1433 };
1434 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1435 assert!(flow_id.lock().is_some());
1437 }
1438 );
1439 }
1440}