1use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::BlockProxy;
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_sync::Mutex;
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::mem::MaybeUninit;
18use std::ops::{DerefMut, Range};
19use std::pin::Pin;
20use std::sync::atomic::{AtomicU16, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll, Waker};
23use zx::sys::zx_handle_t;
24use zx::{self as zx, HandleBased as _};
25use {
26 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
27 fuchsia_async as fasync, storage_trace as trace,
28};
29
30pub use cache::Cache;
31
32pub use block::Flag as BlockFlags;
33
34pub use block_protocol::*;
35
36pub mod cache;
37
38const TEMP_VMO_SIZE: usize = 65536;
39
40pub const NO_TRACE_ID: u64 = 0;
42
43pub use block_driver::{BlockIoFlag, BlockOpcode};
44
45fn fidl_to_status(error: fidl::Error) -> zx::Status {
46 match error {
47 fidl::Error::ClientChannelClosed { status, .. } => status,
48 _ => zx::Status::INTERNAL,
49 }
50}
51
52fn opcode_str(opcode: u8) -> &'static str {
53 match BlockOpcode::from_primitive(opcode) {
54 Some(BlockOpcode::Read) => "read",
55 Some(BlockOpcode::Write) => "write",
56 Some(BlockOpcode::Flush) => "flush",
57 Some(BlockOpcode::Trim) => "trim",
58 Some(BlockOpcode::CloseVmo) => "close_vmo",
59 None => "unknown",
60 }
61}
62
63fn generate_trace_flow_id(request_id: u32) -> u64 {
66 lazy_static! {
67 static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
68 };
69 *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74 Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79 BufferSlice::VmoId { vmo_id, offset, length }
80 }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84 fn from(buf: &'a [u8]) -> Self {
85 BufferSlice::Memory(buf)
86 }
87}
88
89pub enum MutableBufferSlice<'a> {
90 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91 Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96 MutableBufferSlice::VmoId { vmo_id, offset, length }
97 }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101 fn from(buf: &'a mut [u8]) -> Self {
102 MutableBufferSlice::Memory(buf)
103 }
104}
105
106#[derive(Default)]
107struct RequestState {
108 result: Option<zx::Status>,
109 waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117 next_request_id: u32,
119
120 queue: std::collections::VecDeque<BlockFifoRequest>,
122
123 map: HashMap<u32, RequestState>,
125
126 poller_waker: Option<Waker>,
128}
129
130impl FifoState {
131 fn terminate(&mut self) {
132 self.fifo.take();
133 for (_, request_state) in self.map.iter_mut() {
134 request_state.result.get_or_insert(zx::Status::CANCELED);
135 if let Some(waker) = request_state.waker.take() {
136 waker.wake();
137 }
138 }
139 if let Some(waker) = self.poller_waker.take() {
140 waker.wake();
141 }
142 }
143
144 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
146 let fifo = if let Some(fifo) = self.fifo.as_ref() {
147 fifo
148 } else {
149 return true;
150 };
151
152 loop {
153 let slice = self.queue.as_slices().0;
154 if slice.is_empty() {
155 return false;
156 }
157 match fifo.try_write(context, slice) {
158 Poll::Ready(Ok(sent)) => {
159 self.queue.drain(0..sent);
160 }
161 Poll::Ready(Err(_)) => {
162 self.terminate();
163 return true;
164 }
165 Poll::Pending => {
166 return false;
167 }
168 }
169 }
170 }
171}
172
173type FifoStateRef = Arc<Mutex<FifoState>>;
174
175struct ResponseFuture {
177 request_id: u32,
178 fifo_state: FifoStateRef,
179}
180
181impl ResponseFuture {
182 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
183 ResponseFuture { request_id, fifo_state }
184 }
185}
186
187impl Future for ResponseFuture {
188 type Output = Result<(), zx::Status>;
189
190 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
191 let mut state = self.fifo_state.lock();
192 let request_state = state.map.get_mut(&self.request_id).unwrap();
193 if let Some(result) = request_state.result {
194 Poll::Ready(result.into())
195 } else {
196 request_state.waker.replace(context.waker().clone());
197 Poll::Pending
198 }
199 }
200}
201
202impl Drop for ResponseFuture {
203 fn drop(&mut self) {
204 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
205 }
206}
207
208#[derive(Debug)]
210#[must_use]
211pub struct VmoId(AtomicU16);
212
213impl VmoId {
214 pub fn new(id: u16) -> Self {
216 Self(AtomicU16::new(id))
217 }
218
219 pub fn take(&self) -> Self {
221 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
222 }
223
224 pub fn is_valid(&self) -> bool {
225 self.id() != block_driver::BLOCK_VMOID_INVALID
226 }
227
228 #[must_use]
230 pub fn into_id(self) -> u16 {
231 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
232 }
233
234 pub fn id(&self) -> u16 {
235 self.0.load(Ordering::Relaxed)
236 }
237}
238
239impl PartialEq for VmoId {
240 fn eq(&self, other: &Self) -> bool {
241 self.id() == other.id()
242 }
243}
244
245impl Eq for VmoId {}
246
247impl Drop for VmoId {
248 fn drop(&mut self) {
249 assert_eq!(
250 self.0.load(Ordering::Relaxed),
251 block_driver::BLOCK_VMOID_INVALID,
252 "Did you forget to detach?"
253 );
254 }
255}
256
257impl Hash for VmoId {
258 fn hash<H: Hasher>(&self, state: &mut H) {
259 self.id().hash(state);
260 }
261}
262
263#[async_trait]
267pub trait BlockClient: Send + Sync {
268 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
270
271 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
273
274 async fn read_at(
276 &self,
277 buffer_slice: MutableBufferSlice<'_>,
278 device_offset: u64,
279 ) -> Result<(), zx::Status> {
280 self.read_at_traced(buffer_slice, device_offset, 0).await
281 }
282
283 async fn read_at_traced(
284 &self,
285 buffer_slice: MutableBufferSlice<'_>,
286 device_offset: u64,
287 trace_flow_id: u64,
288 ) -> Result<(), zx::Status>;
289
290 async fn write_at(
292 &self,
293 buffer_slice: BufferSlice<'_>,
294 device_offset: u64,
295 ) -> Result<(), zx::Status> {
296 self.write_at_with_opts_traced(
297 buffer_slice,
298 device_offset,
299 WriteOptions::empty(),
300 NO_TRACE_ID,
301 )
302 .await
303 }
304
305 async fn write_at_with_opts(
306 &self,
307 buffer_slice: BufferSlice<'_>,
308 device_offset: u64,
309 opts: WriteOptions,
310 ) -> Result<(), zx::Status> {
311 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
312 }
313
314 async fn write_at_with_opts_traced(
315 &self,
316 buffer_slice: BufferSlice<'_>,
317 device_offset: u64,
318 opts: WriteOptions,
319 trace_flow_id: u64,
320 ) -> Result<(), zx::Status>;
321
322 async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
324 self.trim_traced(device_range, NO_TRACE_ID).await
325 }
326
327 async fn trim_traced(
328 &self,
329 device_range: Range<u64>,
330 trace_flow_id: u64,
331 ) -> Result<(), zx::Status>;
332
333 async fn flush(&self) -> Result<(), zx::Status> {
334 self.flush_traced(NO_TRACE_ID).await
335 }
336
337 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
339
340 async fn close(&self) -> Result<(), zx::Status>;
342
343 fn block_size(&self) -> u32;
345
346 fn block_count(&self) -> u64;
348
349 fn block_flags(&self) -> BlockFlags;
351
352 fn is_connected(&self) -> bool;
354}
355
356struct Common {
357 block_size: u32,
358 block_count: u64,
359 block_flags: BlockFlags,
360 fifo_state: FifoStateRef,
361 temp_vmo: futures::lock::Mutex<zx::Vmo>,
362 temp_vmo_id: VmoId,
363}
364
365impl Common {
366 fn new(
367 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
368 info: &block::BlockInfo,
369 temp_vmo: zx::Vmo,
370 temp_vmo_id: VmoId,
371 ) -> Self {
372 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
373 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
374 Self {
375 block_size: info.block_size,
376 block_count: info.block_count,
377 block_flags: info.flags,
378 fifo_state,
379 temp_vmo: futures::lock::Mutex::new(temp_vmo),
380 temp_vmo_id,
381 }
382 }
383
384 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
385 if bytes % self.block_size as u64 != 0 {
386 Err(zx::Status::INVALID_ARGS)
387 } else {
388 Ok(bytes / self.block_size as u64)
389 }
390 }
391
392 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
394 async move {
395 let (request_id, trace_flow_id) = {
396 let mut state = self.fifo_state.lock();
397 if state.fifo.is_none() {
398 return Err(zx::Status::CANCELED);
400 }
401 trace::duration!(
402 c"storage",
403 c"block_client::send::start",
404 "op" => opcode_str(request.command.opcode)
405 );
406 let request_id = state.next_request_id;
407 state.next_request_id = state.next_request_id.overflowing_add(1).0;
408 assert!(
409 state.map.insert(request_id, RequestState::default()).is_none(),
410 "request id in use!"
411 );
412 request.reqid = request_id;
413 if request.trace_flow_id == NO_TRACE_ID {
414 request.trace_flow_id = generate_trace_flow_id(request_id);
415 }
416 let trace_flow_id = request.trace_flow_id;
417 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
418 state.queue.push_back(request);
419 if let Some(waker) = state.poller_waker.clone() {
420 state.poll_send_requests(&mut Context::from_waker(&waker));
421 }
422 (request_id, trace_flow_id)
423 };
424 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
425 trace::duration!(c"storage", c"block_client::send::end");
426 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
427 Ok(())
428 }
429 .await
430 }
431
432 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
433 self.send(BlockFifoRequest {
434 command: BlockFifoCommand {
435 opcode: BlockOpcode::CloseVmo.into_primitive(),
436 flags: 0,
437 ..Default::default()
438 },
439 vmoid: vmo_id.into_id(),
440 ..Default::default()
441 })
442 .await
443 }
444
445 async fn read_at(
446 &self,
447 buffer_slice: MutableBufferSlice<'_>,
448 device_offset: u64,
449 trace_flow_id: u64,
450 ) -> Result<(), zx::Status> {
451 match buffer_slice {
452 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
453 self.send(BlockFifoRequest {
454 command: BlockFifoCommand {
455 opcode: BlockOpcode::Read.into_primitive(),
456 flags: 0,
457 ..Default::default()
458 },
459 vmoid: vmo_id.id(),
460 length: self
461 .to_blocks(length)?
462 .try_into()
463 .map_err(|_| zx::Status::INVALID_ARGS)?,
464 vmo_offset: self.to_blocks(offset)?,
465 dev_offset: self.to_blocks(device_offset)?,
466 trace_flow_id,
467 ..Default::default()
468 })
469 .await?
470 }
471 MutableBufferSlice::Memory(mut slice) => {
472 let temp_vmo = self.temp_vmo.lock().await;
473 let mut device_block = self.to_blocks(device_offset)?;
474 loop {
475 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
476 let block_count = self.to_blocks(to_do as u64)? as u32;
477 self.send(BlockFifoRequest {
478 command: BlockFifoCommand {
479 opcode: BlockOpcode::Read.into_primitive(),
480 flags: 0,
481 ..Default::default()
482 },
483 vmoid: self.temp_vmo_id.id(),
484 length: block_count,
485 vmo_offset: 0,
486 dev_offset: device_block,
487 trace_flow_id,
488 ..Default::default()
489 })
490 .await?;
491 temp_vmo.read(&mut slice[..to_do], 0)?;
492 if to_do == slice.len() {
493 break;
494 }
495 device_block += block_count as u64;
496 slice = &mut slice[to_do..];
497 }
498 }
499 }
500 Ok(())
501 }
502
503 async fn write_at(
504 &self,
505 buffer_slice: BufferSlice<'_>,
506 device_offset: u64,
507 opts: WriteOptions,
508 trace_flow_id: u64,
509 ) -> Result<(), zx::Status> {
510 let flags = if opts.contains(WriteOptions::FORCE_ACCESS) {
511 BlockIoFlag::FORCE_ACCESS.bits()
512 } else {
513 0
514 };
515 match buffer_slice {
516 BufferSlice::VmoId { vmo_id, offset, length } => {
517 self.send(BlockFifoRequest {
518 command: BlockFifoCommand {
519 opcode: BlockOpcode::Write.into_primitive(),
520 flags,
521 ..Default::default()
522 },
523 vmoid: vmo_id.id(),
524 length: self
525 .to_blocks(length)?
526 .try_into()
527 .map_err(|_| zx::Status::INVALID_ARGS)?,
528 vmo_offset: self.to_blocks(offset)?,
529 dev_offset: self.to_blocks(device_offset)?,
530 trace_flow_id,
531 ..Default::default()
532 })
533 .await?;
534 }
535 BufferSlice::Memory(mut slice) => {
536 let temp_vmo = self.temp_vmo.lock().await;
537 let mut device_block = self.to_blocks(device_offset)?;
538 loop {
539 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
540 let block_count = self.to_blocks(to_do as u64)? as u32;
541 temp_vmo.write(&slice[..to_do], 0)?;
542 self.send(BlockFifoRequest {
543 command: BlockFifoCommand {
544 opcode: BlockOpcode::Write.into_primitive(),
545 flags,
546 ..Default::default()
547 },
548 vmoid: self.temp_vmo_id.id(),
549 length: block_count,
550 vmo_offset: 0,
551 dev_offset: device_block,
552 trace_flow_id,
553 ..Default::default()
554 })
555 .await?;
556 if to_do == slice.len() {
557 break;
558 }
559 device_block += block_count as u64;
560 slice = &slice[to_do..];
561 }
562 }
563 }
564 Ok(())
565 }
566
567 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
568 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
569 let dev_offset = self.to_blocks(device_range.start)?;
570 self.send(BlockFifoRequest {
571 command: BlockFifoCommand {
572 opcode: BlockOpcode::Trim.into_primitive(),
573 flags: 0,
574 ..Default::default()
575 },
576 vmoid: block_driver::BLOCK_VMOID_INVALID,
577 length,
578 dev_offset,
579 trace_flow_id,
580 ..Default::default()
581 })
582 .await
583 }
584
585 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
586 self.send(BlockFifoRequest {
587 command: BlockFifoCommand {
588 opcode: BlockOpcode::Flush.into_primitive(),
589 flags: 0,
590 ..Default::default()
591 },
592 vmoid: block_driver::BLOCK_VMOID_INVALID,
593 trace_flow_id,
594 ..Default::default()
595 })
596 .await
597 }
598
599 fn block_size(&self) -> u32 {
600 self.block_size
601 }
602
603 fn block_count(&self) -> u64 {
604 self.block_count
605 }
606
607 fn block_flags(&self) -> BlockFlags {
608 self.block_flags
609 }
610
611 fn is_connected(&self) -> bool {
612 self.fifo_state.lock().fifo.is_some()
613 }
614}
615
616impl Drop for Common {
617 fn drop(&mut self) {
618 let _ = self.temp_vmo_id.take().into_id();
621 self.fifo_state.lock().terminate();
622 }
623}
624
625pub struct RemoteBlockClient {
627 session: block::SessionProxy,
628 common: Common,
629}
630
631pub trait AsBlockProxy {
632 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
633
634 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
635}
636
637impl<T: AsBlockProxy> AsBlockProxy for &T {
638 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
639 AsBlockProxy::get_info(*self)
640 }
641 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
642 AsBlockProxy::open_session(*self, session)
643 }
644}
645
646macro_rules! impl_as_block_proxy {
647 ($name:ident) => {
648 impl AsBlockProxy for $name {
649 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
650 $name::get_info(self).await
651 }
652
653 fn open_session(
654 &self,
655 session: ServerEnd<block::SessionMarker>,
656 ) -> Result<(), fidl::Error> {
657 $name::open_session(self, session)
658 }
659 }
660 };
661}
662
663impl_as_block_proxy!(BlockProxy);
664impl_as_block_proxy!(PartitionProxy);
665impl_as_block_proxy!(VolumeProxy);
666
667impl RemoteBlockClient {
668 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
670 let info =
671 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
672 let (session, server) = fidl::endpoints::create_proxy();
673 let () = remote.open_session(server).map_err(fidl_to_status)?;
674 Self::from_session(info, session).await
675 }
676
677 pub async fn from_session(
678 info: block::BlockInfo,
679 session: block::SessionProxy,
680 ) -> Result<Self, zx::Status> {
681 let fifo =
682 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
683 let fifo = fasync::Fifo::from_fifo(fifo);
684 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
685 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
686 let vmo_id =
687 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
688 let vmo_id = VmoId::new(vmo_id.id);
689 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
690 }
691}
692
693#[async_trait]
694impl BlockClient for RemoteBlockClient {
695 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
696 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
697 let vmo_id = self
698 .session
699 .attach_vmo(dup)
700 .await
701 .map_err(fidl_to_status)?
702 .map_err(zx::Status::from_raw)?;
703 Ok(VmoId::new(vmo_id.id))
704 }
705
706 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
707 self.common.detach_vmo(vmo_id).await
708 }
709
710 async fn read_at_traced(
711 &self,
712 buffer_slice: MutableBufferSlice<'_>,
713 device_offset: u64,
714 trace_flow_id: u64,
715 ) -> Result<(), zx::Status> {
716 self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
717 }
718
719 async fn write_at_with_opts_traced(
720 &self,
721 buffer_slice: BufferSlice<'_>,
722 device_offset: u64,
723 opts: WriteOptions,
724 trace_flow_id: u64,
725 ) -> Result<(), zx::Status> {
726 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
727 }
728
729 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
730 self.common.trim(range, trace_flow_id).await
731 }
732
733 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
734 self.common.flush(trace_flow_id).await
735 }
736
737 async fn close(&self) -> Result<(), zx::Status> {
738 let () =
739 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
740 Ok(())
741 }
742
743 fn block_size(&self) -> u32 {
744 self.common.block_size()
745 }
746
747 fn block_count(&self) -> u64 {
748 self.common.block_count()
749 }
750
751 fn block_flags(&self) -> BlockFlags {
752 self.common.block_flags()
753 }
754
755 fn is_connected(&self) -> bool {
756 self.common.is_connected()
757 }
758}
759
760pub struct RemoteBlockClientSync {
761 session: block::SessionSynchronousProxy,
762 common: Common,
763}
764
765impl RemoteBlockClientSync {
766 pub fn new(
770 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
771 ) -> Result<Self, zx::Status> {
772 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
773 let info = remote
774 .get_info(zx::MonotonicInstant::INFINITE)
775 .map_err(fidl_to_status)?
776 .map_err(zx::Status::from_raw)?;
777 let (client, server) = fidl::endpoints::create_endpoints();
778 let () = remote.open_session(server).map_err(fidl_to_status)?;
779 let session = block::SessionSynchronousProxy::new(client.into_channel());
780 let fifo = session
781 .get_fifo(zx::MonotonicInstant::INFINITE)
782 .map_err(fidl_to_status)?
783 .map_err(zx::Status::from_raw)?;
784 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
785 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
786 let vmo_id = session
787 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
788 .map_err(fidl_to_status)?
789 .map_err(zx::Status::from_raw)?;
790 let vmo_id = VmoId::new(vmo_id.id);
791
792 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
795 std::thread::spawn(move || {
796 let mut executor = fasync::LocalExecutor::new();
797 let fifo = fasync::Fifo::from_fifo(fifo);
798 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
799 let fifo_state = common.fifo_state.clone();
800 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
801 executor.run_singlethreaded(FifoPoller { fifo_state });
802 });
803 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
804 }
805
806 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
807 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
808 let vmo_id = self
809 .session
810 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
811 .map_err(fidl_to_status)?
812 .map_err(zx::Status::from_raw)?;
813 Ok(VmoId::new(vmo_id.id))
814 }
815
816 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
817 block_on(self.common.detach_vmo(vmo_id))
818 }
819
820 pub fn read_at(
821 &self,
822 buffer_slice: MutableBufferSlice<'_>,
823 device_offset: u64,
824 ) -> Result<(), zx::Status> {
825 block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
826 }
827
828 pub fn write_at(
829 &self,
830 buffer_slice: BufferSlice<'_>,
831 device_offset: u64,
832 ) -> Result<(), zx::Status> {
833 block_on(self.common.write_at(
834 buffer_slice,
835 device_offset,
836 WriteOptions::empty(),
837 NO_TRACE_ID,
838 ))
839 }
840
841 pub fn flush(&self) -> Result<(), zx::Status> {
842 block_on(self.common.flush(NO_TRACE_ID))
843 }
844
845 pub fn close(&self) -> Result<(), zx::Status> {
846 let () = self
847 .session
848 .close(zx::MonotonicInstant::INFINITE)
849 .map_err(fidl_to_status)?
850 .map_err(zx::Status::from_raw)?;
851 Ok(())
852 }
853
854 pub fn block_size(&self) -> u32 {
855 self.common.block_size()
856 }
857
858 pub fn block_count(&self) -> u64 {
859 self.common.block_count()
860 }
861
862 pub fn is_connected(&self) -> bool {
863 self.common.is_connected()
864 }
865}
866
867impl Drop for RemoteBlockClientSync {
868 fn drop(&mut self) {
869 let _ = self.close();
871 }
872}
873
874struct FifoPoller {
876 fifo_state: FifoStateRef,
877}
878
879impl Future for FifoPoller {
880 type Output = ();
881
882 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
883 let mut state_lock = self.fifo_state.lock();
884 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
888 return Poll::Ready(());
889 }
890
891 let fifo = state.fifo.as_ref().unwrap(); loop {
894 let mut response = MaybeUninit::uninit();
895 match fifo.try_read(context, &mut response) {
896 Poll::Pending => {
897 state.poller_waker = Some(context.waker().clone());
898 return Poll::Pending;
899 }
900 Poll::Ready(Ok(_)) => {
901 let response = unsafe { response.assume_init() };
902 let request_id = response.reqid;
903 if let Some(request_state) = state.map.get_mut(&request_id) {
905 request_state.result.replace(zx::Status::from_raw(response.status));
906 if let Some(waker) = request_state.waker.take() {
907 waker.wake();
908 }
909 }
910 }
911 Poll::Ready(Err(_)) => {
912 state.terminate();
913 return Poll::Ready(());
914 }
915 }
916 }
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use super::{
923 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
924 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
925 };
926 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
927 use fidl::endpoints::RequestStream as _;
928 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
929 use futures::join;
930 use futures::stream::futures_unordered::FuturesUnordered;
931 use futures::stream::StreamExt as _;
932 use ramdevice_client::RamdiskClient;
933 use std::borrow::Cow;
934 use std::num::NonZero;
935 use std::sync::atomic::{AtomicBool, Ordering};
936 use std::sync::Arc;
937 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
938
939 const RAMDISK_BLOCK_SIZE: u64 = 1024;
940 const RAMDISK_BLOCK_COUNT: u64 = 1024;
941
942 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
943 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
944 .await
945 .expect("RamdiskClient::create failed");
946 let client_end = ramdisk.open().expect("ramdisk.open failed");
947 let proxy = client_end.into_proxy();
948 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
949 assert_eq!(block_client.block_size(), 1024);
950 let client_end = ramdisk.open().expect("ramdisk.open failed");
951 let proxy = client_end.into_proxy();
952 (ramdisk, proxy, block_client)
953 }
954
955 #[fuchsia::test]
956 async fn test_against_ram_disk() {
957 let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
958
959 let stats_before = block_proxy
960 .get_stats(false)
961 .await
962 .expect("get_stats failed")
963 .map_err(zx::Status::from_raw)
964 .expect("get_stats error");
965
966 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
967 vmo.write(b"hello", 5).expect("vmo.write failed");
968 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
969 block_client
970 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
971 .await
972 .expect("write_at failed");
973 block_client
974 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
975 .await
976 .expect("read_at failed");
977 let mut buf: [u8; 5] = Default::default();
978 vmo.read(&mut buf, 1029).expect("vmo.read failed");
979 assert_eq!(&buf, b"hello");
980 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
981
982 let stats_after = block_proxy
984 .get_stats(false)
985 .await
986 .expect("get_stats failed")
987 .map_err(zx::Status::from_raw)
988 .expect("get_stats error");
989 assert_eq!(
991 stats_before.write.success.total_calls + 1,
992 stats_after.write.success.total_calls
993 );
994 assert_eq!(
995 stats_before.write.success.bytes_transferred + 1024,
996 stats_after.write.success.bytes_transferred
997 );
998 assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
999 assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1001 assert_eq!(
1002 stats_before.read.success.bytes_transferred + 2048,
1003 stats_after.read.success.bytes_transferred
1004 );
1005 assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1006 }
1007
1008 #[fuchsia::test]
1009 async fn test_against_ram_disk_with_flush() {
1010 let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
1011
1012 let stats_before = block_proxy
1013 .get_stats(false)
1014 .await
1015 .expect("get_stats failed")
1016 .map_err(zx::Status::from_raw)
1017 .expect("get_stats error");
1018
1019 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1020 vmo.write(b"hello", 5).expect("vmo.write failed");
1021 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1022 block_client
1023 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1024 .await
1025 .expect("write_at failed");
1026 block_client.flush().await.expect("flush failed");
1027 block_client
1028 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1029 .await
1030 .expect("read_at failed");
1031 let mut buf: [u8; 5] = Default::default();
1032 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1033 assert_eq!(&buf, b"hello");
1034 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1035
1036 let stats_after = block_proxy
1038 .get_stats(false)
1039 .await
1040 .expect("get_stats failed")
1041 .map_err(zx::Status::from_raw)
1042 .expect("get_stats error");
1043 assert_eq!(
1045 stats_before.write.success.total_calls + 1,
1046 stats_after.write.success.total_calls
1047 );
1048 assert_eq!(
1049 stats_before.write.success.bytes_transferred + 1024,
1050 stats_after.write.success.bytes_transferred
1051 );
1052 assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1053 assert_eq!(
1055 stats_before.flush.success.total_calls + 1,
1056 stats_after.flush.success.total_calls
1057 );
1058 assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
1059 assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1061 assert_eq!(
1062 stats_before.read.success.bytes_transferred + 2048,
1063 stats_after.read.success.bytes_transferred
1064 );
1065 assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1066 }
1067
1068 #[fuchsia::test]
1069 async fn test_alignment() {
1070 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1071 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1072 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1073 block_client
1074 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1075 .await
1076 .expect_err("expected failure due to bad alignment");
1077 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1078 }
1079
1080 #[fuchsia::test]
1081 async fn test_parallel_io() {
1082 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1083 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1084 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1085 let mut reads = Vec::new();
1086 for _ in 0..1024 {
1087 reads.push(
1088 block_client
1089 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1090 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1091 );
1092 }
1093 futures::future::join_all(reads).await;
1094 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1095 }
1096
1097 #[fuchsia::test]
1098 async fn test_closed_device() {
1099 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1100 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1101 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1102 let mut reads = Vec::new();
1103 for _ in 0..1024 {
1104 reads.push(
1105 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1106 );
1107 }
1108 assert!(block_client.is_connected());
1109 let _ = futures::join!(futures::future::join_all(reads), async {
1110 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1111 });
1112 while block_client
1114 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1115 .await
1116 .is_ok()
1117 {}
1118
1119 while block_client.is_connected() {
1122 fasync::Timer::new(fasync::MonotonicInstant::after(
1124 zx::MonotonicDuration::from_millis(500),
1125 ))
1126 .await;
1127 }
1128
1129 assert_eq!(block_client.is_connected(), false);
1131 let _ = block_client.detach_vmo(vmo_id).await;
1132 }
1133
1134 #[fuchsia::test]
1135 async fn test_cancelled_reads() {
1136 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1137 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1138 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1139 {
1140 let mut reads = FuturesUnordered::new();
1141 for _ in 0..1024 {
1142 reads.push(
1143 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1144 );
1145 }
1146 for _ in 0..500 {
1148 reads.next().await;
1149 }
1150 }
1151 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1152 }
1153
1154 #[fuchsia::test]
1155 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1156 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1157 let block_client_ref = &block_client;
1158 let test_one = |offset, len, fill| async move {
1159 let buf = vec![fill; len];
1160 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1161 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1163 block_client_ref
1164 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1165 .await
1166 .expect("read_at failed");
1167 assert_eq!(
1168 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1169 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1170 );
1171 assert_eq!(
1172 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1173 &buf[..]
1174 );
1175 assert_eq!(
1176 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1177 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1178 );
1179 };
1180 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1181 join!(
1182 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1183 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1184 );
1185 }
1186
1187 struct FakeBlockServer<'a> {
1191 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1192 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1193 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1194 }
1195
1196 impl<'a> FakeBlockServer<'a> {
1197 fn new(
1209 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1210 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1211 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1212 ) -> FakeBlockServer<'a> {
1213 FakeBlockServer {
1214 server_channel: Some(server_channel),
1215 channel_handler: Box::new(channel_handler),
1216 fifo_handler: Box::new(fifo_handler),
1217 }
1218 }
1219
1220 async fn run(&mut self) {
1222 let server = self.server_channel.take().unwrap();
1223
1224 let (server_fifo, client_fifo) =
1226 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1227 .expect("Fifo::create failed");
1228 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1229
1230 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1231 let fifo_future = Abortable::new(
1232 async {
1233 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1234 let (mut reader, mut writer) = fifo.async_io();
1235 let mut request = BlockFifoRequest::default();
1236 loop {
1237 match reader.read_entries(&mut request).await {
1238 Ok(1) => {}
1239 Err(zx::Status::PEER_CLOSED) => break,
1240 Err(e) => panic!("read_entry failed {:?}", e),
1241 _ => unreachable!(),
1242 };
1243
1244 let response = self.fifo_handler.as_ref()(request);
1245 writer
1246 .write_entries(std::slice::from_ref(&response))
1247 .await
1248 .expect("write_entries failed");
1249 }
1250 },
1251 fifo_future_abort_registration,
1252 );
1253
1254 let channel_future = async {
1255 server
1256 .into_stream()
1257 .for_each_concurrent(None, |request| async {
1258 let request = request.expect("unexpected fidl error");
1259
1260 match request {
1261 block::BlockRequest::GetInfo { responder } => {
1262 responder
1263 .send(Ok(&block::BlockInfo {
1264 block_count: 1024,
1265 block_size: 512,
1266 max_transfer_size: 1024 * 1024,
1267 flags: block::Flag::empty(),
1268 }))
1269 .expect("send failed");
1270 }
1271 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1272 let stream = session.into_stream();
1273 stream
1274 .for_each(|request| async {
1275 let request = request.expect("unexpected fidl error");
1276 if self.channel_handler.as_ref()(&request) {
1279 return;
1280 }
1281 match request {
1282 block::SessionRequest::GetFifo { responder } => {
1283 match maybe_server_fifo.lock().take() {
1284 Some(fifo) => {
1285 responder.send(Ok(fifo.downcast()))
1286 }
1287 None => responder.send(Err(
1288 zx::Status::NO_RESOURCES.into_raw(),
1289 )),
1290 }
1291 .expect("send failed")
1292 }
1293 block::SessionRequest::AttachVmo {
1294 vmo: _,
1295 responder,
1296 } => responder
1297 .send(Ok(&block::VmoId { id: 1 }))
1298 .expect("send failed"),
1299 block::SessionRequest::Close { responder } => {
1300 fifo_future_abort.abort();
1301 responder.send(Ok(())).expect("send failed")
1302 }
1303 }
1304 })
1305 .await
1306 }
1307 _ => panic!("Unexpected message"),
1308 }
1309 })
1310 .await;
1311 };
1312
1313 let _result = join!(fifo_future, channel_future);
1314 }
1316 }
1317
1318 #[fuchsia::test]
1319 async fn test_block_close_is_called() {
1320 let close_called = fuchsia_sync::Mutex::new(false);
1321 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1322
1323 std::thread::spawn(move || {
1324 let _block_client =
1325 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1326 });
1328
1329 let channel_handler = |request: &block::SessionRequest| -> bool {
1330 if let block::SessionRequest::Close { .. } = request {
1331 *close_called.lock() = true;
1332 }
1333 false
1334 };
1335 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1336
1337 assert!(*close_called.lock());
1339 }
1340
1341 #[fuchsia::test]
1342 async fn test_block_flush_is_called() {
1343 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1344
1345 struct Interface {
1346 flush_called: Arc<AtomicBool>,
1347 }
1348 impl block_server::async_interface::Interface for Interface {
1349 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1350 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1351 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1352 block_range: Some(0..1000),
1353 type_guid: [0; 16],
1354 instance_guid: [0; 16],
1355 name: "foo".to_string(),
1356 flags: 0,
1357 })))
1358 }
1359
1360 async fn read(
1361 &self,
1362 _device_block_offset: u64,
1363 _block_count: u32,
1364 _vmo: &Arc<zx::Vmo>,
1365 _vmo_offset: u64,
1366 _trace_flow_id: Option<NonZero<u64>>,
1367 ) -> Result<(), zx::Status> {
1368 unreachable!();
1369 }
1370
1371 async fn write(
1372 &self,
1373 _device_block_offset: u64,
1374 _block_count: u32,
1375 _vmo: &Arc<zx::Vmo>,
1376 _vmo_offset: u64,
1377 _opts: WriteOptions,
1378 _trace_flow_id: Option<NonZero<u64>>,
1379 ) -> Result<(), zx::Status> {
1380 unreachable!();
1381 }
1382
1383 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1384 self.flush_called.store(true, Ordering::Relaxed);
1385 Ok(())
1386 }
1387
1388 async fn trim(
1389 &self,
1390 _device_block_offset: u64,
1391 _block_count: u32,
1392 _trace_flow_id: Option<NonZero<u64>>,
1393 ) -> Result<(), zx::Status> {
1394 unreachable!();
1395 }
1396 }
1397
1398 let flush_called = Arc::new(AtomicBool::new(false));
1399
1400 futures::join!(
1401 async {
1402 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1403
1404 block_client.flush().await.expect("flush failed");
1405 },
1406 async {
1407 let block_server = BlockServer::new(
1408 512,
1409 Arc::new(Interface { flush_called: flush_called.clone() }),
1410 );
1411 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1412 }
1413 );
1414
1415 assert!(flush_called.load(Ordering::Relaxed));
1416 }
1417
1418 #[fuchsia::test]
1419 async fn test_trace_flow_ids_set() {
1420 let (proxy, server) = fidl::endpoints::create_proxy();
1421
1422 futures::join!(
1423 async {
1424 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1425 block_client.flush().await.expect("flush failed");
1426 },
1427 async {
1428 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1429 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1430 if request.trace_flow_id > 0 {
1431 *flow_id.lock() = Some(request.trace_flow_id);
1432 }
1433 BlockFifoResponse {
1434 status: zx::Status::OK.into_raw(),
1435 reqid: request.reqid,
1436 ..Default::default()
1437 }
1438 };
1439 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1440 assert!(flow_id.lock().is_some());
1442 }
1443 );
1444 }
1445}