1use async_trait::async_trait;
13use fidl::endpoints::ServerEnd;
14use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
15use fidl_fuchsia_hardware_block_partition::PartitionProxy;
16use fidl_fuchsia_hardware_block_volume::VolumeProxy;
17use fuchsia_sync::Mutex;
18use futures::channel::oneshot;
19use futures::executor::block_on;
20use lazy_static::lazy_static;
21use std::collections::HashMap;
22use std::future::Future;
23use std::hash::{Hash, Hasher};
24use std::mem::MaybeUninit;
25use std::num::NonZero;
26use std::ops::{DerefMut, Range};
27use std::pin::Pin;
28use std::sync::atomic::{AtomicU16, Ordering};
29use std::sync::Arc;
30use std::task::{Context, Poll, Waker};
31use zx::sys::zx_handle_t;
32use zx::{self as zx, HandleBased as _};
33use {
34 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
35 fuchsia_async as fasync, storage_trace as trace,
36};
37
38pub use cache::Cache;
39
40pub use block::Flag as BlockFlags;
41
42pub use block_protocol::*;
43
44pub mod cache;
45
46const TEMP_VMO_SIZE: usize = 65536;
47
48pub const NO_TRACE_ID: u64 = 0;
50
51pub use block_driver::{BlockIoFlag, BlockOpcode};
52
53fn fidl_to_status(error: fidl::Error) -> zx::Status {
54 match error {
55 fidl::Error::ClientChannelClosed { status, .. } => status,
56 _ => zx::Status::INTERNAL,
57 }
58}
59
60fn opcode_str(opcode: u8) -> &'static str {
61 match BlockOpcode::from_primitive(opcode) {
62 Some(BlockOpcode::Read) => "read",
63 Some(BlockOpcode::Write) => "write",
64 Some(BlockOpcode::Flush) => "flush",
65 Some(BlockOpcode::Trim) => "trim",
66 Some(BlockOpcode::CloseVmo) => "close_vmo",
67 None => "unknown",
68 }
69}
70
71fn generate_trace_flow_id(request_id: u32) -> u64 {
74 lazy_static! {
75 static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
76 };
77 *SELF_HANDLE as u64 + (request_id as u64) << 32
78}
79
80pub enum BufferSlice<'a> {
81 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
82 Memory(&'a [u8]),
83}
84
85impl<'a> BufferSlice<'a> {
86 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
87 BufferSlice::VmoId { vmo_id, offset, length }
88 }
89}
90
91impl<'a> From<&'a [u8]> for BufferSlice<'a> {
92 fn from(buf: &'a [u8]) -> Self {
93 BufferSlice::Memory(buf)
94 }
95}
96
97pub enum MutableBufferSlice<'a> {
98 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
99 Memory(&'a mut [u8]),
100}
101
102impl<'a> MutableBufferSlice<'a> {
103 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
104 MutableBufferSlice::VmoId { vmo_id, offset, length }
105 }
106}
107
108impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
109 fn from(buf: &'a mut [u8]) -> Self {
110 MutableBufferSlice::Memory(buf)
111 }
112}
113
114#[derive(Default)]
115struct RequestState {
116 result: Option<zx::Status>,
117 waker: Option<Waker>,
118}
119
120#[derive(Default)]
121struct FifoState {
122 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
124
125 next_request_id: u32,
127
128 queue: std::collections::VecDeque<BlockFifoRequest>,
130
131 map: HashMap<u32, RequestState>,
133
134 poller_waker: Option<Waker>,
136
137 attach_barrier: bool,
139}
140
141impl FifoState {
142 fn terminate(&mut self) {
143 self.fifo.take();
144 for (_, request_state) in self.map.iter_mut() {
145 request_state.result.get_or_insert(zx::Status::CANCELED);
146 if let Some(waker) = request_state.waker.take() {
147 waker.wake();
148 }
149 }
150 if let Some(waker) = self.poller_waker.take() {
151 waker.wake();
152 }
153 }
154
155 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
157 let fifo = if let Some(fifo) = self.fifo.as_ref() {
158 fifo
159 } else {
160 return true;
161 };
162
163 loop {
164 let slice = self.queue.as_slices().0;
165 if slice.is_empty() {
166 return false;
167 }
168 match fifo.try_write(context, slice) {
169 Poll::Ready(Ok(sent)) => {
170 self.queue.drain(0..sent);
171 }
172 Poll::Ready(Err(_)) => {
173 self.terminate();
174 return true;
175 }
176 Poll::Pending => {
177 return false;
178 }
179 }
180 }
181 }
182}
183
184type FifoStateRef = Arc<Mutex<FifoState>>;
185
186struct ResponseFuture {
188 request_id: u32,
189 fifo_state: FifoStateRef,
190}
191
192impl ResponseFuture {
193 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
194 ResponseFuture { request_id, fifo_state }
195 }
196}
197
198impl Future for ResponseFuture {
199 type Output = Result<(), zx::Status>;
200
201 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
202 let mut state = self.fifo_state.lock();
203 let request_state = state.map.get_mut(&self.request_id).unwrap();
204 if let Some(result) = request_state.result {
205 Poll::Ready(result.into())
206 } else {
207 request_state.waker.replace(context.waker().clone());
208 Poll::Pending
209 }
210 }
211}
212
213impl Drop for ResponseFuture {
214 fn drop(&mut self) {
215 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
216 }
217}
218
219#[derive(Debug)]
221#[must_use]
222pub struct VmoId(AtomicU16);
223
224impl VmoId {
225 pub fn new(id: u16) -> Self {
227 Self(AtomicU16::new(id))
228 }
229
230 pub fn take(&self) -> Self {
232 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
233 }
234
235 pub fn is_valid(&self) -> bool {
236 self.id() != block_driver::BLOCK_VMOID_INVALID
237 }
238
239 #[must_use]
241 pub fn into_id(self) -> u16 {
242 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
243 }
244
245 pub fn id(&self) -> u16 {
246 self.0.load(Ordering::Relaxed)
247 }
248}
249
250impl PartialEq for VmoId {
251 fn eq(&self, other: &Self) -> bool {
252 self.id() == other.id()
253 }
254}
255
256impl Eq for VmoId {}
257
258impl Drop for VmoId {
259 fn drop(&mut self) {
260 assert_eq!(
261 self.0.load(Ordering::Relaxed),
262 block_driver::BLOCK_VMOID_INVALID,
263 "Did you forget to detach?"
264 );
265 }
266}
267
268impl Hash for VmoId {
269 fn hash<H: Hasher>(&self, state: &mut H) {
270 self.id().hash(state);
271 }
272}
273
274#[async_trait]
278pub trait BlockClient: Send + Sync {
279 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
281
282 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
284
285 async fn read_at(
287 &self,
288 buffer_slice: MutableBufferSlice<'_>,
289 device_offset: u64,
290 ) -> Result<(), zx::Status> {
291 self.read_at_traced(buffer_slice, device_offset, 0).await
292 }
293
294 async fn read_at_traced(
295 &self,
296 buffer_slice: MutableBufferSlice<'_>,
297 device_offset: u64,
298 trace_flow_id: u64,
299 ) -> Result<(), zx::Status>;
300
301 async fn write_at(
303 &self,
304 buffer_slice: BufferSlice<'_>,
305 device_offset: u64,
306 ) -> Result<(), zx::Status> {
307 self.write_at_with_opts_traced(
308 buffer_slice,
309 device_offset,
310 WriteOptions::empty(),
311 NO_TRACE_ID,
312 )
313 .await
314 }
315
316 async fn write_at_with_opts(
317 &self,
318 buffer_slice: BufferSlice<'_>,
319 device_offset: u64,
320 opts: WriteOptions,
321 ) -> Result<(), zx::Status> {
322 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
323 }
324
325 async fn write_at_with_opts_traced(
326 &self,
327 buffer_slice: BufferSlice<'_>,
328 device_offset: u64,
329 opts: WriteOptions,
330 trace_flow_id: u64,
331 ) -> Result<(), zx::Status>;
332
333 async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
335 self.trim_traced(device_range, NO_TRACE_ID).await
336 }
337
338 async fn trim_traced(
339 &self,
340 device_range: Range<u64>,
341 trace_flow_id: u64,
342 ) -> Result<(), zx::Status>;
343
344 fn barrier(&self);
349
350 async fn flush(&self) -> Result<(), zx::Status> {
351 self.flush_traced(NO_TRACE_ID).await
352 }
353
354 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
356
357 async fn close(&self) -> Result<(), zx::Status>;
359
360 fn block_size(&self) -> u32;
362
363 fn block_count(&self) -> u64;
365
366 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
368
369 fn block_flags(&self) -> BlockFlags;
371
372 fn is_connected(&self) -> bool;
374}
375
376struct Common {
377 block_size: u32,
378 block_count: u64,
379 max_transfer_blocks: Option<NonZero<u32>>,
380 block_flags: BlockFlags,
381 fifo_state: FifoStateRef,
382 temp_vmo: futures::lock::Mutex<zx::Vmo>,
383 temp_vmo_id: VmoId,
384}
385
386impl Common {
387 fn new(
388 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
389 info: &block::BlockInfo,
390 temp_vmo: zx::Vmo,
391 temp_vmo_id: VmoId,
392 ) -> Self {
393 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
394 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
395 Self {
396 block_size: info.block_size,
397 block_count: info.block_count,
398 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
399 NonZero::new(info.max_transfer_size / info.block_size)
400 } else {
401 None
402 },
403 block_flags: info.flags,
404 fifo_state,
405 temp_vmo: futures::lock::Mutex::new(temp_vmo),
406 temp_vmo_id,
407 }
408 }
409
410 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
411 if bytes % self.block_size as u64 != 0 {
412 Err(zx::Status::INVALID_ARGS)
413 } else {
414 Ok(bytes / self.block_size as u64)
415 }
416 }
417
418 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
420 async move {
421 let (request_id, trace_flow_id) = {
422 let mut state = self.fifo_state.lock();
423
424 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
425 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
426 && state.attach_barrier
427 {
428 flags |= BlockIoFlag::PRE_BARRIER;
429 request.command.flags = flags.bits();
430 state.attach_barrier = false;
431 }
432
433 if state.fifo.is_none() {
434 return Err(zx::Status::CANCELED);
436 }
437 trace::duration!(
438 c"storage",
439 c"block_client::send::start",
440 "op" => opcode_str(request.command.opcode)
441 );
442 let request_id = state.next_request_id;
443 state.next_request_id = state.next_request_id.overflowing_add(1).0;
444 assert!(
445 state.map.insert(request_id, RequestState::default()).is_none(),
446 "request id in use!"
447 );
448 request.reqid = request_id;
449 if request.trace_flow_id == NO_TRACE_ID {
450 request.trace_flow_id = generate_trace_flow_id(request_id);
451 }
452 let trace_flow_id = request.trace_flow_id;
453 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
454 state.queue.push_back(request);
455 if let Some(waker) = state.poller_waker.clone() {
456 state.poll_send_requests(&mut Context::from_waker(&waker));
457 }
458 (request_id, trace_flow_id)
459 };
460 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
461 trace::duration!(c"storage", c"block_client::send::end");
462 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
463 Ok(())
464 }
465 .await
466 }
467
468 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
469 self.send(BlockFifoRequest {
470 command: BlockFifoCommand {
471 opcode: BlockOpcode::CloseVmo.into_primitive(),
472 flags: 0,
473 ..Default::default()
474 },
475 vmoid: vmo_id.into_id(),
476 ..Default::default()
477 })
478 .await
479 }
480
481 async fn read_at(
482 &self,
483 buffer_slice: MutableBufferSlice<'_>,
484 device_offset: u64,
485 trace_flow_id: u64,
486 ) -> Result<(), zx::Status> {
487 match buffer_slice {
488 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
489 self.send(BlockFifoRequest {
490 command: BlockFifoCommand {
491 opcode: BlockOpcode::Read.into_primitive(),
492 flags: 0,
493 ..Default::default()
494 },
495 vmoid: vmo_id.id(),
496 length: self
497 .to_blocks(length)?
498 .try_into()
499 .map_err(|_| zx::Status::INVALID_ARGS)?,
500 vmo_offset: self.to_blocks(offset)?,
501 dev_offset: self.to_blocks(device_offset)?,
502 trace_flow_id,
503 ..Default::default()
504 })
505 .await?
506 }
507 MutableBufferSlice::Memory(mut slice) => {
508 let temp_vmo = self.temp_vmo.lock().await;
509 let mut device_block = self.to_blocks(device_offset)?;
510 loop {
511 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
512 let block_count = self.to_blocks(to_do as u64)? as u32;
513 self.send(BlockFifoRequest {
514 command: BlockFifoCommand {
515 opcode: BlockOpcode::Read.into_primitive(),
516 flags: 0,
517 ..Default::default()
518 },
519 vmoid: self.temp_vmo_id.id(),
520 length: block_count,
521 vmo_offset: 0,
522 dev_offset: device_block,
523 trace_flow_id,
524 ..Default::default()
525 })
526 .await?;
527 temp_vmo.read(&mut slice[..to_do], 0)?;
528 if to_do == slice.len() {
529 break;
530 }
531 device_block += block_count as u64;
532 slice = &mut slice[to_do..];
533 }
534 }
535 }
536 Ok(())
537 }
538
539 async fn write_at(
540 &self,
541 buffer_slice: BufferSlice<'_>,
542 device_offset: u64,
543 opts: WriteOptions,
544 trace_flow_id: u64,
545 ) -> Result<(), zx::Status> {
546 let mut flags = BlockIoFlag::empty();
547
548 if opts.contains(WriteOptions::FORCE_ACCESS) {
549 flags |= BlockIoFlag::FORCE_ACCESS;
550 }
551
552 if opts.contains(WriteOptions::PRE_BARRIER) {
553 flags |= BlockIoFlag::PRE_BARRIER;
554 }
555
556 match buffer_slice {
557 BufferSlice::VmoId { vmo_id, offset, length } => {
558 self.send(BlockFifoRequest {
559 command: BlockFifoCommand {
560 opcode: BlockOpcode::Write.into_primitive(),
561 flags: flags.bits(),
562 ..Default::default()
563 },
564 vmoid: vmo_id.id(),
565 length: self
566 .to_blocks(length)?
567 .try_into()
568 .map_err(|_| zx::Status::INVALID_ARGS)?,
569 vmo_offset: self.to_blocks(offset)?,
570 dev_offset: self.to_blocks(device_offset)?,
571 trace_flow_id,
572 ..Default::default()
573 })
574 .await?;
575 }
576 BufferSlice::Memory(mut slice) => {
577 let temp_vmo = self.temp_vmo.lock().await;
578 let mut device_block = self.to_blocks(device_offset)?;
579 loop {
580 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
581 let block_count = self.to_blocks(to_do as u64)? as u32;
582 temp_vmo.write(&slice[..to_do], 0)?;
583 self.send(BlockFifoRequest {
584 command: BlockFifoCommand {
585 opcode: BlockOpcode::Write.into_primitive(),
586 flags: flags.bits(),
587 ..Default::default()
588 },
589 vmoid: self.temp_vmo_id.id(),
590 length: block_count,
591 vmo_offset: 0,
592 dev_offset: device_block,
593 trace_flow_id,
594 ..Default::default()
595 })
596 .await?;
597 if to_do == slice.len() {
598 break;
599 }
600 device_block += block_count as u64;
601 slice = &slice[to_do..];
602 }
603 }
604 }
605 Ok(())
606 }
607
608 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
609 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
610 let dev_offset = self.to_blocks(device_range.start)?;
611 self.send(BlockFifoRequest {
612 command: BlockFifoCommand {
613 opcode: BlockOpcode::Trim.into_primitive(),
614 flags: 0,
615 ..Default::default()
616 },
617 vmoid: block_driver::BLOCK_VMOID_INVALID,
618 length,
619 dev_offset,
620 trace_flow_id,
621 ..Default::default()
622 })
623 .await
624 }
625
626 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
627 self.send(BlockFifoRequest {
628 command: BlockFifoCommand {
629 opcode: BlockOpcode::Flush.into_primitive(),
630 flags: 0,
631 ..Default::default()
632 },
633 vmoid: block_driver::BLOCK_VMOID_INVALID,
634 trace_flow_id,
635 ..Default::default()
636 })
637 .await
638 }
639
640 fn barrier(&self) {
641 self.fifo_state.lock().attach_barrier = true;
642 }
643
644 fn block_size(&self) -> u32 {
645 self.block_size
646 }
647
648 fn block_count(&self) -> u64 {
649 self.block_count
650 }
651
652 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
653 self.max_transfer_blocks.clone()
654 }
655
656 fn block_flags(&self) -> BlockFlags {
657 self.block_flags
658 }
659
660 fn is_connected(&self) -> bool {
661 self.fifo_state.lock().fifo.is_some()
662 }
663}
664
665impl Drop for Common {
666 fn drop(&mut self) {
667 let _ = self.temp_vmo_id.take().into_id();
670 self.fifo_state.lock().terminate();
671 }
672}
673
674pub struct RemoteBlockClient {
676 session: block::SessionProxy,
677 common: Common,
678}
679
680pub trait AsBlockProxy {
681 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
682
683 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
684}
685
686impl<T: AsBlockProxy> AsBlockProxy for &T {
687 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
688 AsBlockProxy::get_info(*self)
689 }
690 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
691 AsBlockProxy::open_session(*self, session)
692 }
693}
694
695macro_rules! impl_as_block_proxy {
696 ($name:ident) => {
697 impl AsBlockProxy for $name {
698 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
699 $name::get_info(self).await
700 }
701
702 fn open_session(
703 &self,
704 session: ServerEnd<block::SessionMarker>,
705 ) -> Result<(), fidl::Error> {
706 $name::open_session(self, session)
707 }
708 }
709 };
710}
711
712impl_as_block_proxy!(BlockProxy);
713impl_as_block_proxy!(PartitionProxy);
714impl_as_block_proxy!(VolumeProxy);
715
716impl RemoteBlockClient {
717 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
719 let info =
720 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
721 let (session, server) = fidl::endpoints::create_proxy();
722 let () = remote.open_session(server).map_err(fidl_to_status)?;
723 Self::from_session(info, session).await
724 }
725
726 pub async fn from_session(
727 info: block::BlockInfo,
728 session: block::SessionProxy,
729 ) -> Result<Self, zx::Status> {
730 let fifo =
731 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
732 let fifo = fasync::Fifo::from_fifo(fifo);
733 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
734 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
735 let vmo_id =
736 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
737 let vmo_id = VmoId::new(vmo_id.id);
738 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
739 }
740}
741
742#[async_trait]
743impl BlockClient for RemoteBlockClient {
744 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
745 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
746 let vmo_id = self
747 .session
748 .attach_vmo(dup)
749 .await
750 .map_err(fidl_to_status)?
751 .map_err(zx::Status::from_raw)?;
752 Ok(VmoId::new(vmo_id.id))
753 }
754
755 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
756 self.common.detach_vmo(vmo_id).await
757 }
758
759 async fn read_at_traced(
760 &self,
761 buffer_slice: MutableBufferSlice<'_>,
762 device_offset: u64,
763 trace_flow_id: u64,
764 ) -> Result<(), zx::Status> {
765 self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
766 }
767
768 async fn write_at_with_opts_traced(
769 &self,
770 buffer_slice: BufferSlice<'_>,
771 device_offset: u64,
772 opts: WriteOptions,
773 trace_flow_id: u64,
774 ) -> Result<(), zx::Status> {
775 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
776 }
777
778 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
779 self.common.trim(range, trace_flow_id).await
780 }
781
782 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
783 self.common.flush(trace_flow_id).await
784 }
785
786 fn barrier(&self) {
787 self.common.barrier()
788 }
789
790 async fn close(&self) -> Result<(), zx::Status> {
791 let () =
792 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
793 Ok(())
794 }
795
796 fn block_size(&self) -> u32 {
797 self.common.block_size()
798 }
799
800 fn block_count(&self) -> u64 {
801 self.common.block_count()
802 }
803
804 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
805 self.common.max_transfer_blocks()
806 }
807
808 fn block_flags(&self) -> BlockFlags {
809 self.common.block_flags()
810 }
811
812 fn is_connected(&self) -> bool {
813 self.common.is_connected()
814 }
815}
816
817pub struct RemoteBlockClientSync {
818 session: block::SessionSynchronousProxy,
819 common: Common,
820}
821
822impl RemoteBlockClientSync {
823 pub fn new(
827 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
828 ) -> Result<Self, zx::Status> {
829 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
830 let info = remote
831 .get_info(zx::MonotonicInstant::INFINITE)
832 .map_err(fidl_to_status)?
833 .map_err(zx::Status::from_raw)?;
834 let (client, server) = fidl::endpoints::create_endpoints();
835 let () = remote.open_session(server).map_err(fidl_to_status)?;
836 let session = block::SessionSynchronousProxy::new(client.into_channel());
837 let fifo = session
838 .get_fifo(zx::MonotonicInstant::INFINITE)
839 .map_err(fidl_to_status)?
840 .map_err(zx::Status::from_raw)?;
841 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
842 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
843 let vmo_id = session
844 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
845 .map_err(fidl_to_status)?
846 .map_err(zx::Status::from_raw)?;
847 let vmo_id = VmoId::new(vmo_id.id);
848
849 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
852 std::thread::spawn(move || {
853 let mut executor = fasync::LocalExecutor::new();
854 let fifo = fasync::Fifo::from_fifo(fifo);
855 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
856 let fifo_state = common.fifo_state.clone();
857 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
858 executor.run_singlethreaded(FifoPoller { fifo_state });
859 });
860 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
861 }
862
863 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
864 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
865 let vmo_id = self
866 .session
867 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
868 .map_err(fidl_to_status)?
869 .map_err(zx::Status::from_raw)?;
870 Ok(VmoId::new(vmo_id.id))
871 }
872
873 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
874 block_on(self.common.detach_vmo(vmo_id))
875 }
876
877 pub fn read_at(
878 &self,
879 buffer_slice: MutableBufferSlice<'_>,
880 device_offset: u64,
881 ) -> Result<(), zx::Status> {
882 block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
883 }
884
885 pub fn write_at(
886 &self,
887 buffer_slice: BufferSlice<'_>,
888 device_offset: u64,
889 ) -> Result<(), zx::Status> {
890 block_on(self.common.write_at(
891 buffer_slice,
892 device_offset,
893 WriteOptions::empty(),
894 NO_TRACE_ID,
895 ))
896 }
897
898 pub fn flush(&self) -> Result<(), zx::Status> {
899 block_on(self.common.flush(NO_TRACE_ID))
900 }
901
902 pub fn close(&self) -> Result<(), zx::Status> {
903 let () = self
904 .session
905 .close(zx::MonotonicInstant::INFINITE)
906 .map_err(fidl_to_status)?
907 .map_err(zx::Status::from_raw)?;
908 Ok(())
909 }
910
911 pub fn block_size(&self) -> u32 {
912 self.common.block_size()
913 }
914
915 pub fn block_count(&self) -> u64 {
916 self.common.block_count()
917 }
918
919 pub fn is_connected(&self) -> bool {
920 self.common.is_connected()
921 }
922}
923
924impl Drop for RemoteBlockClientSync {
925 fn drop(&mut self) {
926 let _ = self.close();
928 }
929}
930
931struct FifoPoller {
933 fifo_state: FifoStateRef,
934}
935
936impl Future for FifoPoller {
937 type Output = ();
938
939 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
940 let mut state_lock = self.fifo_state.lock();
941 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
945 return Poll::Ready(());
946 }
947
948 let fifo = state.fifo.as_ref().unwrap(); loop {
951 let mut response = MaybeUninit::uninit();
952 match fifo.try_read(context, &mut response) {
953 Poll::Pending => {
954 state.poller_waker = Some(context.waker().clone());
955 return Poll::Pending;
956 }
957 Poll::Ready(Ok(_)) => {
958 let response = unsafe { response.assume_init() };
959 let request_id = response.reqid;
960 if let Some(request_state) = state.map.get_mut(&request_id) {
962 request_state.result.replace(zx::Status::from_raw(response.status));
963 if let Some(waker) = request_state.waker.take() {
964 waker.wake();
965 }
966 }
967 }
968 Poll::Ready(Err(_)) => {
969 state.terminate();
970 return Poll::Ready(());
971 }
972 }
973 }
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use super::{
980 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
981 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
982 };
983 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
984 use fidl::endpoints::RequestStream as _;
985 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
986 use futures::join;
987 use futures::stream::futures_unordered::FuturesUnordered;
988 use futures::stream::StreamExt as _;
989 use ramdevice_client::RamdiskClient;
990 use std::borrow::Cow;
991 use std::num::NonZero;
992 use std::sync::atomic::{AtomicBool, Ordering};
993 use std::sync::Arc;
994 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
995
996 const RAMDISK_BLOCK_SIZE: u64 = 1024;
997 const RAMDISK_BLOCK_COUNT: u64 = 1024;
998
999 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1000 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1001 .await
1002 .expect("RamdiskClient::create failed");
1003 let client_end = ramdisk.open().expect("ramdisk.open failed");
1004 let proxy = client_end.into_proxy();
1005 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1006 assert_eq!(block_client.block_size(), 1024);
1007 let client_end = ramdisk.open().expect("ramdisk.open failed");
1008 let proxy = client_end.into_proxy();
1009 (ramdisk, proxy, block_client)
1010 }
1011
1012 #[fuchsia::test]
1013 async fn test_against_ram_disk() {
1014 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1015
1016 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1017 vmo.write(b"hello", 5).expect("vmo.write failed");
1018 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1019 block_client
1020 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1021 .await
1022 .expect("write_at failed");
1023 block_client
1024 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1025 .await
1026 .expect("read_at failed");
1027 let mut buf: [u8; 5] = Default::default();
1028 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1029 assert_eq!(&buf, b"hello");
1030 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1031 }
1032
1033 #[fuchsia::test]
1034 async fn test_alignment() {
1035 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1036 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1037 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1038 block_client
1039 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1040 .await
1041 .expect_err("expected failure due to bad alignment");
1042 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1043 }
1044
1045 #[fuchsia::test]
1046 async fn test_parallel_io() {
1047 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1048 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1049 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1050 let mut reads = Vec::new();
1051 for _ in 0..1024 {
1052 reads.push(
1053 block_client
1054 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1055 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1056 );
1057 }
1058 futures::future::join_all(reads).await;
1059 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1060 }
1061
1062 #[fuchsia::test]
1063 async fn test_closed_device() {
1064 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1065 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1066 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1067 let mut reads = Vec::new();
1068 for _ in 0..1024 {
1069 reads.push(
1070 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1071 );
1072 }
1073 assert!(block_client.is_connected());
1074 let _ = futures::join!(futures::future::join_all(reads), async {
1075 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1076 });
1077 while block_client
1079 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1080 .await
1081 .is_ok()
1082 {}
1083
1084 while block_client.is_connected() {
1087 fasync::Timer::new(fasync::MonotonicInstant::after(
1089 zx::MonotonicDuration::from_millis(500),
1090 ))
1091 .await;
1092 }
1093
1094 assert_eq!(block_client.is_connected(), false);
1096 let _ = block_client.detach_vmo(vmo_id).await;
1097 }
1098
1099 #[fuchsia::test]
1100 async fn test_cancelled_reads() {
1101 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1102 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1103 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1104 {
1105 let mut reads = FuturesUnordered::new();
1106 for _ in 0..1024 {
1107 reads.push(
1108 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1109 );
1110 }
1111 for _ in 0..500 {
1113 reads.next().await;
1114 }
1115 }
1116 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1117 }
1118
1119 #[fuchsia::test]
1120 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1121 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1122 let block_client_ref = &block_client;
1123 let test_one = |offset, len, fill| async move {
1124 let buf = vec![fill; len];
1125 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1126 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1128 block_client_ref
1129 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1130 .await
1131 .expect("read_at failed");
1132 assert_eq!(
1133 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1134 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1135 );
1136 assert_eq!(
1137 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1138 &buf[..]
1139 );
1140 assert_eq!(
1141 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1142 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1143 );
1144 };
1145 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1146 join!(
1147 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1148 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1149 );
1150 }
1151
1152 struct FakeBlockServer<'a> {
1156 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1157 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1158 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1159 }
1160
1161 impl<'a> FakeBlockServer<'a> {
1162 fn new(
1174 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1175 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1176 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1177 ) -> FakeBlockServer<'a> {
1178 FakeBlockServer {
1179 server_channel: Some(server_channel),
1180 channel_handler: Box::new(channel_handler),
1181 fifo_handler: Box::new(fifo_handler),
1182 }
1183 }
1184
1185 async fn run(&mut self) {
1187 let server = self.server_channel.take().unwrap();
1188
1189 let (server_fifo, client_fifo) =
1191 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1192 .expect("Fifo::create failed");
1193 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1194
1195 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1196 let fifo_future = Abortable::new(
1197 async {
1198 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1199 let (mut reader, mut writer) = fifo.async_io();
1200 let mut request = BlockFifoRequest::default();
1201 loop {
1202 match reader.read_entries(&mut request).await {
1203 Ok(1) => {}
1204 Err(zx::Status::PEER_CLOSED) => break,
1205 Err(e) => panic!("read_entry failed {:?}", e),
1206 _ => unreachable!(),
1207 };
1208
1209 let response = self.fifo_handler.as_ref()(request);
1210 writer
1211 .write_entries(std::slice::from_ref(&response))
1212 .await
1213 .expect("write_entries failed");
1214 }
1215 },
1216 fifo_future_abort_registration,
1217 );
1218
1219 let channel_future = async {
1220 server
1221 .into_stream()
1222 .for_each_concurrent(None, |request| async {
1223 let request = request.expect("unexpected fidl error");
1224
1225 match request {
1226 block::BlockRequest::GetInfo { responder } => {
1227 responder
1228 .send(Ok(&block::BlockInfo {
1229 block_count: 1024,
1230 block_size: 512,
1231 max_transfer_size: 1024 * 1024,
1232 flags: block::Flag::empty(),
1233 }))
1234 .expect("send failed");
1235 }
1236 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1237 let stream = session.into_stream();
1238 stream
1239 .for_each(|request| async {
1240 let request = request.expect("unexpected fidl error");
1241 if self.channel_handler.as_ref()(&request) {
1244 return;
1245 }
1246 match request {
1247 block::SessionRequest::GetFifo { responder } => {
1248 match maybe_server_fifo.lock().take() {
1249 Some(fifo) => {
1250 responder.send(Ok(fifo.downcast()))
1251 }
1252 None => responder.send(Err(
1253 zx::Status::NO_RESOURCES.into_raw(),
1254 )),
1255 }
1256 .expect("send failed")
1257 }
1258 block::SessionRequest::AttachVmo {
1259 vmo: _,
1260 responder,
1261 } => responder
1262 .send(Ok(&block::VmoId { id: 1 }))
1263 .expect("send failed"),
1264 block::SessionRequest::Close { responder } => {
1265 fifo_future_abort.abort();
1266 responder.send(Ok(())).expect("send failed")
1267 }
1268 }
1269 })
1270 .await
1271 }
1272 _ => panic!("Unexpected message"),
1273 }
1274 })
1275 .await;
1276 };
1277
1278 let _result = join!(fifo_future, channel_future);
1279 }
1281 }
1282
1283 #[fuchsia::test]
1284 async fn test_block_close_is_called() {
1285 let close_called = fuchsia_sync::Mutex::new(false);
1286 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1287
1288 std::thread::spawn(move || {
1289 let _block_client =
1290 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1291 });
1293
1294 let channel_handler = |request: &block::SessionRequest| -> bool {
1295 if let block::SessionRequest::Close { .. } = request {
1296 *close_called.lock() = true;
1297 }
1298 false
1299 };
1300 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1301
1302 assert!(*close_called.lock());
1304 }
1305
1306 #[fuchsia::test]
1307 async fn test_block_flush_is_called() {
1308 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1309
1310 struct Interface {
1311 flush_called: Arc<AtomicBool>,
1312 }
1313 impl block_server::async_interface::Interface for Interface {
1314 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1315 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1316 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1317 max_transfer_blocks: None,
1318 block_range: Some(0..1000),
1319 type_guid: [0; 16],
1320 instance_guid: [0; 16],
1321 name: "foo".to_string(),
1322 flags: 0,
1323 })))
1324 }
1325
1326 async fn read(
1327 &self,
1328 _device_block_offset: u64,
1329 _block_count: u32,
1330 _vmo: &Arc<zx::Vmo>,
1331 _vmo_offset: u64,
1332 _trace_flow_id: Option<NonZero<u64>>,
1333 ) -> Result<(), zx::Status> {
1334 unreachable!();
1335 }
1336
1337 fn barrier(&self) -> Result<(), zx::Status> {
1338 unreachable!()
1339 }
1340
1341 async fn write(
1342 &self,
1343 _device_block_offset: u64,
1344 _block_count: u32,
1345 _vmo: &Arc<zx::Vmo>,
1346 _vmo_offset: u64,
1347 _opts: WriteOptions,
1348 _trace_flow_id: Option<NonZero<u64>>,
1349 ) -> Result<(), zx::Status> {
1350 unreachable!();
1351 }
1352
1353 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1354 self.flush_called.store(true, Ordering::Relaxed);
1355 Ok(())
1356 }
1357
1358 async fn trim(
1359 &self,
1360 _device_block_offset: u64,
1361 _block_count: u32,
1362 _trace_flow_id: Option<NonZero<u64>>,
1363 ) -> Result<(), zx::Status> {
1364 unreachable!();
1365 }
1366 }
1367
1368 let flush_called = Arc::new(AtomicBool::new(false));
1369
1370 futures::join!(
1371 async {
1372 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1373
1374 block_client.flush().await.expect("flush failed");
1375 },
1376 async {
1377 let block_server = BlockServer::new(
1378 512,
1379 Arc::new(Interface { flush_called: flush_called.clone() }),
1380 );
1381 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1382 }
1383 );
1384
1385 assert!(flush_called.load(Ordering::Relaxed));
1386 }
1387
1388 #[fuchsia::test]
1389 async fn test_trace_flow_ids_set() {
1390 let (proxy, server) = fidl::endpoints::create_proxy();
1391
1392 futures::join!(
1393 async {
1394 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1395 block_client.flush().await.expect("flush failed");
1396 },
1397 async {
1398 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1399 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1400 if request.trace_flow_id > 0 {
1401 *flow_id.lock() = Some(request.trace_flow_id);
1402 }
1403 BlockFifoResponse {
1404 status: zx::Status::OK.into_raw(),
1405 reqid: request.reqid,
1406 ..Default::default()
1407 }
1408 };
1409 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1410 assert!(flow_id.lock().is_some());
1412 }
1413 );
1414 }
1415}