1use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
13use fuchsia_sync::Mutex;
14use futures::channel::oneshot;
15use futures::executor::block_on;
16use std::borrow::Borrow;
17use std::collections::HashMap;
18use std::future::Future;
19use std::hash::{Hash, Hasher};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::ops::{DerefMut, Range};
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU16, Ordering};
25use std::sync::{Arc, LazyLock};
26use std::task::{Context, Poll, Waker};
27use zx::sys::zx_handle_t;
28use zx::{self as zx, HandleBased as _};
29use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync, storage_trace as trace};
30
31pub use cache::Cache;
32
33pub use block::DeviceFlag as BlockDeviceFlag;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41pub const NO_TRACE_ID: u64 = 0;
43
44pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47 match error {
48 fidl::Error::ClientChannelClosed { status, .. } => status,
49 _ => zx::Status::INTERNAL,
50 }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54 match BlockOpcode::from_primitive(opcode) {
55 Some(BlockOpcode::Read) => "read",
56 Some(BlockOpcode::Write) => "write",
57 Some(BlockOpcode::Flush) => "flush",
58 Some(BlockOpcode::Trim) => "trim",
59 Some(BlockOpcode::CloseVmo) => "close_vmo",
60 None => "unknown",
61 }
62}
63
64fn generate_trace_flow_id(request_id: u32) -> u64 {
67 static SELF_HANDLE: LazyLock<zx_handle_t> =
68 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
69 *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74 Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79 BufferSlice::VmoId { vmo_id, offset, length }
80 }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84 fn from(buf: &'a [u8]) -> Self {
85 BufferSlice::Memory(buf)
86 }
87}
88
89pub enum MutableBufferSlice<'a> {
90 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91 Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96 MutableBufferSlice::VmoId { vmo_id, offset, length }
97 }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101 fn from(buf: &'a mut [u8]) -> Self {
102 MutableBufferSlice::Memory(buf)
103 }
104}
105
106#[derive(Default)]
107struct RequestState {
108 result: Option<zx::Status>,
109 waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117 next_request_id: u32,
119
120 queue: std::collections::VecDeque<BlockFifoRequest>,
122
123 map: HashMap<u32, RequestState>,
125
126 poller_waker: Option<Waker>,
128
129 attach_barrier: bool,
131}
132
133impl FifoState {
134 fn terminate(&mut self) {
135 self.fifo.take();
136 for (_, request_state) in self.map.iter_mut() {
137 request_state.result.get_or_insert(zx::Status::CANCELED);
138 if let Some(waker) = request_state.waker.take() {
139 waker.wake();
140 }
141 }
142 if let Some(waker) = self.poller_waker.take() {
143 waker.wake();
144 }
145 }
146
147 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
149 let fifo = if let Some(fifo) = self.fifo.as_ref() {
150 fifo
151 } else {
152 return true;
153 };
154
155 loop {
156 let slice = self.queue.as_slices().0;
157 if slice.is_empty() {
158 return false;
159 }
160 match fifo.try_write(context, slice) {
161 Poll::Ready(Ok(sent)) => {
162 self.queue.drain(0..sent);
163 }
164 Poll::Ready(Err(_)) => {
165 self.terminate();
166 return true;
167 }
168 Poll::Pending => {
169 return false;
170 }
171 }
172 }
173 }
174}
175
176type FifoStateRef = Arc<Mutex<FifoState>>;
177
178struct ResponseFuture {
180 request_id: u32,
181 fifo_state: FifoStateRef,
182}
183
184impl ResponseFuture {
185 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
186 ResponseFuture { request_id, fifo_state }
187 }
188}
189
190impl Future for ResponseFuture {
191 type Output = Result<(), zx::Status>;
192
193 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194 let mut state = self.fifo_state.lock();
195 let request_state = state.map.get_mut(&self.request_id).unwrap();
196 if let Some(result) = request_state.result {
197 Poll::Ready(result.into())
198 } else {
199 request_state.waker.replace(context.waker().clone());
200 Poll::Pending
201 }
202 }
203}
204
205impl Drop for ResponseFuture {
206 fn drop(&mut self) {
207 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
208 }
209}
210
211#[derive(Debug)]
213#[must_use]
214pub struct VmoId(AtomicU16);
215
216impl VmoId {
217 pub fn new(id: u16) -> Self {
219 Self(AtomicU16::new(id))
220 }
221
222 pub fn take(&self) -> Self {
224 Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
225 }
226
227 pub fn is_valid(&self) -> bool {
228 self.id() != VMOID_INVALID
229 }
230
231 #[must_use]
233 pub fn into_id(self) -> u16 {
234 self.0.swap(VMOID_INVALID, Ordering::Relaxed)
235 }
236
237 pub fn id(&self) -> u16 {
238 self.0.load(Ordering::Relaxed)
239 }
240}
241
242impl PartialEq for VmoId {
243 fn eq(&self, other: &Self) -> bool {
244 self.id() == other.id()
245 }
246}
247
248impl Eq for VmoId {}
249
250impl Drop for VmoId {
251 fn drop(&mut self) {
252 assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
253 }
254}
255
256impl Hash for VmoId {
257 fn hash<H: Hasher>(&self, state: &mut H) {
258 self.id().hash(state);
259 }
260}
261
262pub trait BlockClient: Send + Sync {
266 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
268
269 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
271
272 fn read_at(
274 &self,
275 buffer_slice: MutableBufferSlice<'_>,
276 device_offset: u64,
277 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
278 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
279 }
280
281 fn read_at_with_opts(
282 &self,
283 buffer_slice: MutableBufferSlice<'_>,
284 device_offset: u64,
285 opts: ReadOptions,
286 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
288 }
289
290 fn read_at_with_opts_traced(
291 &self,
292 buffer_slice: MutableBufferSlice<'_>,
293 device_offset: u64,
294 opts: ReadOptions,
295 trace_flow_id: u64,
296 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
297
298 fn write_at(
300 &self,
301 buffer_slice: BufferSlice<'_>,
302 device_offset: u64,
303 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
304 self.write_at_with_opts_traced(
305 buffer_slice,
306 device_offset,
307 WriteOptions::default(),
308 NO_TRACE_ID,
309 )
310 }
311
312 fn write_at_with_opts(
313 &self,
314 buffer_slice: BufferSlice<'_>,
315 device_offset: u64,
316 opts: WriteOptions,
317 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
318 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
319 }
320
321 fn write_at_with_opts_traced(
322 &self,
323 buffer_slice: BufferSlice<'_>,
324 device_offset: u64,
325 opts: WriteOptions,
326 trace_flow_id: u64,
327 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
328
329 fn trim(
331 &self,
332 device_range: Range<u64>,
333 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
334 self.trim_traced(device_range, NO_TRACE_ID)
335 }
336
337 fn trim_traced(
338 &self,
339 device_range: Range<u64>,
340 trace_flow_id: u64,
341 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
342
343 fn barrier(&self);
348
349 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
350 self.flush_traced(NO_TRACE_ID)
351 }
352
353 fn flush_traced(
355 &self,
356 trace_flow_id: u64,
357 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
358
359 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
361
362 fn block_size(&self) -> u32;
364
365 fn block_count(&self) -> u64;
367
368 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
370
371 fn block_flags(&self) -> BlockDeviceFlag;
373
374 fn is_connected(&self) -> bool;
376}
377
378struct Common {
379 block_size: u32,
380 block_count: u64,
381 max_transfer_blocks: Option<NonZero<u32>>,
382 block_flags: BlockDeviceFlag,
383 fifo_state: FifoStateRef,
384 temp_vmo: futures::lock::Mutex<zx::Vmo>,
385 temp_vmo_id: VmoId,
386}
387
388impl Common {
389 fn new(
390 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
391 info: &block::BlockInfo,
392 temp_vmo: zx::Vmo,
393 temp_vmo_id: VmoId,
394 ) -> Self {
395 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
396 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
397 Self {
398 block_size: info.block_size,
399 block_count: info.block_count,
400 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
401 NonZero::new(info.max_transfer_size / info.block_size)
402 } else {
403 None
404 },
405 block_flags: info.flags,
406 fifo_state,
407 temp_vmo: futures::lock::Mutex::new(temp_vmo),
408 temp_vmo_id,
409 }
410 }
411
412 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
413 if bytes % self.block_size as u64 != 0 {
414 Err(zx::Status::INVALID_ARGS)
415 } else {
416 Ok(bytes / self.block_size as u64)
417 }
418 }
419
420 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
422 let (request_id, trace_flow_id) = {
423 let mut state = self.fifo_state.lock();
424
425 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
426 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
427 && state.attach_barrier
428 {
429 flags |= BlockIoFlag::PRE_BARRIER;
430 request.command.flags = flags.bits();
431 state.attach_barrier = false;
432 }
433
434 if state.fifo.is_none() {
435 return Err(zx::Status::CANCELED);
437 }
438 trace::duration!(
439 "storage",
440 "block_client::send::start",
441 "op" => opcode_str(request.command.opcode),
442 "len" => request.length * self.block_size
443 );
444 let request_id = state.next_request_id;
445 state.next_request_id = state.next_request_id.overflowing_add(1).0;
446 assert!(
447 state.map.insert(request_id, RequestState::default()).is_none(),
448 "request id in use!"
449 );
450 request.reqid = request_id;
451 if request.trace_flow_id == NO_TRACE_ID {
452 request.trace_flow_id = generate_trace_flow_id(request_id);
453 }
454 let trace_flow_id = request.trace_flow_id;
455 trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
456 state.queue.push_back(request);
457 if let Some(waker) = state.poller_waker.clone() {
458 state.poll_send_requests(&mut Context::from_waker(&waker));
459 }
460 (request_id, trace_flow_id)
461 };
462 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
463 trace::duration!("storage", "block_client::send::end");
464 trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
465 Ok(())
466 }
467
468 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
469 self.send(BlockFifoRequest {
470 command: BlockFifoCommand {
471 opcode: BlockOpcode::CloseVmo.into_primitive(),
472 flags: 0,
473 ..Default::default()
474 },
475 vmoid: vmo_id.into_id(),
476 ..Default::default()
477 })
478 .await
479 }
480
481 async fn read_at(
482 &self,
483 buffer_slice: MutableBufferSlice<'_>,
484 device_offset: u64,
485 opts: ReadOptions,
486 trace_flow_id: u64,
487 ) -> Result<(), zx::Status> {
488 match buffer_slice {
489 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
490 self.send(BlockFifoRequest {
491 command: BlockFifoCommand {
492 opcode: BlockOpcode::Read.into_primitive(),
493 flags: 0,
494 ..Default::default()
495 },
496 vmoid: vmo_id.id(),
497 length: self
498 .to_blocks(length)?
499 .try_into()
500 .map_err(|_| zx::Status::INVALID_ARGS)?,
501 vmo_offset: self.to_blocks(offset)?,
502 dev_offset: self.to_blocks(device_offset)?,
503 trace_flow_id,
504 dun: opts.inline_crypto.dun,
505 slot: opts.inline_crypto.slot,
506 ..Default::default()
507 })
508 .await?
509 }
510 MutableBufferSlice::Memory(mut slice) => {
511 let temp_vmo = self.temp_vmo.lock().await;
512 let mut device_block = self.to_blocks(device_offset)?;
513 loop {
514 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
515 let block_count = self.to_blocks(to_do as u64)? as u32;
516 self.send(BlockFifoRequest {
517 command: BlockFifoCommand {
518 opcode: BlockOpcode::Read.into_primitive(),
519 flags: 0,
520 ..Default::default()
521 },
522 vmoid: self.temp_vmo_id.id(),
523 length: block_count,
524 vmo_offset: 0,
525 dev_offset: device_block,
526 trace_flow_id,
527 dun: opts.inline_crypto.dun,
528 slot: opts.inline_crypto.slot,
529 ..Default::default()
530 })
531 .await?;
532 temp_vmo.read(&mut slice[..to_do], 0)?;
533 if to_do == slice.len() {
534 break;
535 }
536 device_block += block_count as u64;
537 slice = &mut slice[to_do..];
538 }
539 }
540 }
541 Ok(())
542 }
543
544 async fn write_at(
545 &self,
546 buffer_slice: BufferSlice<'_>,
547 device_offset: u64,
548 opts: WriteOptions,
549 trace_flow_id: u64,
550 ) -> Result<(), zx::Status> {
551 let mut flags = BlockIoFlag::empty();
552
553 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
554 flags |= BlockIoFlag::FORCE_ACCESS;
555 }
556
557 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
558 flags |= BlockIoFlag::PRE_BARRIER;
559 }
560
561 match buffer_slice {
562 BufferSlice::VmoId { vmo_id, offset, length } => {
563 self.send(BlockFifoRequest {
564 command: BlockFifoCommand {
565 opcode: BlockOpcode::Write.into_primitive(),
566 flags: flags.bits(),
567 ..Default::default()
568 },
569 vmoid: vmo_id.id(),
570 length: self
571 .to_blocks(length)?
572 .try_into()
573 .map_err(|_| zx::Status::INVALID_ARGS)?,
574 vmo_offset: self.to_blocks(offset)?,
575 dev_offset: self.to_blocks(device_offset)?,
576 trace_flow_id,
577 dun: opts.inline_crypto.dun,
578 slot: opts.inline_crypto.slot,
579 ..Default::default()
580 })
581 .await?;
582 }
583 BufferSlice::Memory(mut slice) => {
584 let temp_vmo = self.temp_vmo.lock().await;
585 let mut device_block = self.to_blocks(device_offset)?;
586 loop {
587 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
588 let block_count = self.to_blocks(to_do as u64)? as u32;
589 temp_vmo.write(&slice[..to_do], 0)?;
590 self.send(BlockFifoRequest {
591 command: BlockFifoCommand {
592 opcode: BlockOpcode::Write.into_primitive(),
593 flags: flags.bits(),
594 ..Default::default()
595 },
596 vmoid: self.temp_vmo_id.id(),
597 length: block_count,
598 vmo_offset: 0,
599 dev_offset: device_block,
600 trace_flow_id,
601 dun: opts.inline_crypto.dun,
602 slot: opts.inline_crypto.slot,
603 ..Default::default()
604 })
605 .await?;
606 if to_do == slice.len() {
607 break;
608 }
609 device_block += block_count as u64;
610 slice = &slice[to_do..];
611 }
612 }
613 }
614 Ok(())
615 }
616
617 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
618 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
619 let dev_offset = self.to_blocks(device_range.start)?;
620 self.send(BlockFifoRequest {
621 command: BlockFifoCommand {
622 opcode: BlockOpcode::Trim.into_primitive(),
623 flags: 0,
624 ..Default::default()
625 },
626 vmoid: VMOID_INVALID,
627 length,
628 dev_offset,
629 trace_flow_id,
630 ..Default::default()
631 })
632 .await
633 }
634
635 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
636 self.send(BlockFifoRequest {
637 command: BlockFifoCommand {
638 opcode: BlockOpcode::Flush.into_primitive(),
639 flags: 0,
640 ..Default::default()
641 },
642 vmoid: VMOID_INVALID,
643 trace_flow_id,
644 ..Default::default()
645 })
646 .await
647 }
648
649 fn barrier(&self) {
650 self.fifo_state.lock().attach_barrier = true;
651 }
652
653 fn block_size(&self) -> u32 {
654 self.block_size
655 }
656
657 fn block_count(&self) -> u64 {
658 self.block_count
659 }
660
661 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
662 self.max_transfer_blocks.clone()
663 }
664
665 fn block_flags(&self) -> BlockDeviceFlag {
666 self.block_flags
667 }
668
669 fn is_connected(&self) -> bool {
670 self.fifo_state.lock().fifo.is_some()
671 }
672}
673
674impl Drop for Common {
675 fn drop(&mut self) {
676 let _ = self.temp_vmo_id.take().into_id();
679 self.fifo_state.lock().terminate();
680 }
681}
682
683pub struct RemoteBlockClient {
685 session: block::SessionProxy,
686 common: Common,
687}
688
689impl RemoteBlockClient {
690 pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
692 let remote = remote.borrow();
693 let info =
694 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
695 let (session, server) = fidl::endpoints::create_proxy();
696 let () = remote.open_session(server).map_err(fidl_to_status)?;
697 Self::from_session(info, session).await
698 }
699
700 pub async fn from_session(
701 info: block::BlockInfo,
702 session: block::SessionProxy,
703 ) -> Result<Self, zx::Status> {
704 let fifo =
705 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
706 let fifo = fasync::Fifo::from_fifo(fifo);
707 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
708 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
709 let vmo_id =
710 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
711 let vmo_id = VmoId::new(vmo_id.id);
712 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
713 }
714}
715
716impl BlockClient for RemoteBlockClient {
717 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
718 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
719 let vmo_id = self
720 .session
721 .attach_vmo(dup)
722 .await
723 .map_err(fidl_to_status)?
724 .map_err(zx::Status::from_raw)?;
725 Ok(VmoId::new(vmo_id.id))
726 }
727
728 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
729 self.common.detach_vmo(vmo_id).await
730 }
731
732 async fn read_at_with_opts_traced(
733 &self,
734 buffer_slice: MutableBufferSlice<'_>,
735 device_offset: u64,
736 opts: ReadOptions,
737 trace_flow_id: u64,
738 ) -> Result<(), zx::Status> {
739 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
740 }
741
742 async fn write_at_with_opts_traced(
743 &self,
744 buffer_slice: BufferSlice<'_>,
745 device_offset: u64,
746 opts: WriteOptions,
747 trace_flow_id: u64,
748 ) -> Result<(), zx::Status> {
749 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
750 }
751
752 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
753 self.common.trim(range, trace_flow_id).await
754 }
755
756 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
757 self.common.flush(trace_flow_id).await
758 }
759
760 fn barrier(&self) {
761 self.common.barrier()
762 }
763
764 async fn close(&self) -> Result<(), zx::Status> {
765 let () =
766 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
767 Ok(())
768 }
769
770 fn block_size(&self) -> u32 {
771 self.common.block_size()
772 }
773
774 fn block_count(&self) -> u64 {
775 self.common.block_count()
776 }
777
778 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
779 self.common.max_transfer_blocks()
780 }
781
782 fn block_flags(&self) -> BlockDeviceFlag {
783 self.common.block_flags()
784 }
785
786 fn is_connected(&self) -> bool {
787 self.common.is_connected()
788 }
789}
790
791pub struct RemoteBlockClientSync {
792 session: block::SessionSynchronousProxy,
793 common: Common,
794}
795
796impl RemoteBlockClientSync {
797 pub fn new(
801 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
802 ) -> Result<Self, zx::Status> {
803 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
804 let info = remote
805 .get_info(zx::MonotonicInstant::INFINITE)
806 .map_err(fidl_to_status)?
807 .map_err(zx::Status::from_raw)?;
808 let (client, server) = fidl::endpoints::create_endpoints();
809 let () = remote.open_session(server).map_err(fidl_to_status)?;
810 let session = block::SessionSynchronousProxy::new(client.into_channel());
811 let fifo = session
812 .get_fifo(zx::MonotonicInstant::INFINITE)
813 .map_err(fidl_to_status)?
814 .map_err(zx::Status::from_raw)?;
815 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
816 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
817 let vmo_id = session
818 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
819 .map_err(fidl_to_status)?
820 .map_err(zx::Status::from_raw)?;
821 let vmo_id = VmoId::new(vmo_id.id);
822
823 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
826 std::thread::spawn(move || {
827 let mut executor = fasync::LocalExecutor::default();
828 let fifo = fasync::Fifo::from_fifo(fifo);
829 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
830 let fifo_state = common.fifo_state.clone();
831 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
832 executor.run_singlethreaded(FifoPoller { fifo_state });
833 });
834 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
835 }
836
837 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
838 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
839 let vmo_id = self
840 .session
841 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
842 .map_err(fidl_to_status)?
843 .map_err(zx::Status::from_raw)?;
844 Ok(VmoId::new(vmo_id.id))
845 }
846
847 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
848 block_on(self.common.detach_vmo(vmo_id))
849 }
850
851 pub fn read_at(
852 &self,
853 buffer_slice: MutableBufferSlice<'_>,
854 device_offset: u64,
855 ) -> Result<(), zx::Status> {
856 block_on(self.common.read_at(
857 buffer_slice,
858 device_offset,
859 ReadOptions::default(),
860 NO_TRACE_ID,
861 ))
862 }
863
864 pub fn write_at(
865 &self,
866 buffer_slice: BufferSlice<'_>,
867 device_offset: u64,
868 ) -> Result<(), zx::Status> {
869 block_on(self.common.write_at(
870 buffer_slice,
871 device_offset,
872 WriteOptions::default(),
873 NO_TRACE_ID,
874 ))
875 }
876
877 pub fn flush(&self) -> Result<(), zx::Status> {
878 block_on(self.common.flush(NO_TRACE_ID))
879 }
880
881 pub fn close(&self) -> Result<(), zx::Status> {
882 let () = self
883 .session
884 .close(zx::MonotonicInstant::INFINITE)
885 .map_err(fidl_to_status)?
886 .map_err(zx::Status::from_raw)?;
887 Ok(())
888 }
889
890 pub fn block_size(&self) -> u32 {
891 self.common.block_size()
892 }
893
894 pub fn block_count(&self) -> u64 {
895 self.common.block_count()
896 }
897
898 pub fn is_connected(&self) -> bool {
899 self.common.is_connected()
900 }
901}
902
903impl Drop for RemoteBlockClientSync {
904 fn drop(&mut self) {
905 let _ = self.close();
907 }
908}
909
910struct FifoPoller {
912 fifo_state: FifoStateRef,
913}
914
915impl Future for FifoPoller {
916 type Output = ();
917
918 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
919 let mut state_lock = self.fifo_state.lock();
920 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
924 return Poll::Ready(());
925 }
926
927 let fifo = state.fifo.as_ref().unwrap(); loop {
930 let mut response = MaybeUninit::uninit();
931 match fifo.try_read(context, &mut response) {
932 Poll::Pending => {
933 state.poller_waker = Some(context.waker().clone());
934 return Poll::Pending;
935 }
936 Poll::Ready(Ok(_)) => {
937 let response = unsafe { response.assume_init() };
938 let request_id = response.reqid;
939 if let Some(request_state) = state.map.get_mut(&request_id) {
941 request_state.result.replace(zx::Status::from_raw(response.status));
942 if let Some(waker) = request_state.waker.take() {
943 waker.wake();
944 }
945 }
946 }
947 Poll::Ready(Err(_)) => {
948 state.terminate();
949 return Poll::Ready(());
950 }
951 }
952 }
953 }
954}
955
956#[cfg(test)]
957mod tests {
958 use super::{
959 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
960 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
961 };
962 use block_protocol::ReadOptions;
963 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
964 use fidl::endpoints::RequestStream as _;
965 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
966 use futures::join;
967 use futures::stream::StreamExt as _;
968 use futures::stream::futures_unordered::FuturesUnordered;
969 use ramdevice_client::RamdiskClient;
970 use std::borrow::Cow;
971 use std::num::NonZero;
972 use std::sync::Arc;
973 use std::sync::atomic::{AtomicBool, Ordering};
974 use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync};
975
976 const RAMDISK_BLOCK_SIZE: u64 = 1024;
977 const RAMDISK_BLOCK_COUNT: u64 = 1024;
978
979 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
980 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
981 .await
982 .expect("RamdiskClient::create failed");
983 let client_end = ramdisk.open().expect("ramdisk.open failed");
984 let proxy = client_end.into_proxy();
985 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
986 assert_eq!(block_client.block_size(), 1024);
987 let client_end = ramdisk.open().expect("ramdisk.open failed");
988 let proxy = client_end.into_proxy();
989 (ramdisk, proxy, block_client)
990 }
991
992 #[fuchsia::test]
993 async fn test_against_ram_disk() {
994 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
995
996 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
997 vmo.write(b"hello", 5).expect("vmo.write failed");
998 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
999 block_client
1000 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1001 .await
1002 .expect("write_at failed");
1003 block_client
1004 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1005 .await
1006 .expect("read_at failed");
1007 let mut buf: [u8; 5] = Default::default();
1008 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1009 assert_eq!(&buf, b"hello");
1010 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1011 }
1012
1013 #[fuchsia::test]
1014 async fn test_alignment() {
1015 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1016 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1017 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1018 block_client
1019 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1020 .await
1021 .expect_err("expected failure due to bad alignment");
1022 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1023 }
1024
1025 #[fuchsia::test]
1026 async fn test_parallel_io() {
1027 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1028 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1029 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1030 let mut reads = Vec::new();
1031 for _ in 0..1024 {
1032 reads.push(
1033 block_client
1034 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1035 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1036 );
1037 }
1038 futures::future::join_all(reads).await;
1039 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1040 }
1041
1042 #[fuchsia::test]
1043 async fn test_closed_device() {
1044 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1045 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1046 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1047 let mut reads = Vec::new();
1048 for _ in 0..1024 {
1049 reads.push(
1050 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1051 );
1052 }
1053 assert!(block_client.is_connected());
1054 let _ = futures::join!(futures::future::join_all(reads), async {
1055 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1056 });
1057 while block_client
1059 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1060 .await
1061 .is_ok()
1062 {}
1063
1064 while block_client.is_connected() {
1067 fasync::Timer::new(fasync::MonotonicInstant::after(
1069 zx::MonotonicDuration::from_millis(500),
1070 ))
1071 .await;
1072 }
1073
1074 assert_eq!(block_client.is_connected(), false);
1076 let _ = block_client.detach_vmo(vmo_id).await;
1077 }
1078
1079 #[fuchsia::test]
1080 async fn test_cancelled_reads() {
1081 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1082 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1083 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1084 {
1085 let mut reads = FuturesUnordered::new();
1086 for _ in 0..1024 {
1087 reads.push(
1088 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1089 );
1090 }
1091 for _ in 0..500 {
1093 reads.next().await;
1094 }
1095 }
1096 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1097 }
1098
1099 #[fuchsia::test]
1100 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1101 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1102 let block_client_ref = &block_client;
1103 let test_one = |offset, len, fill| async move {
1104 let buf = vec![fill; len];
1105 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1106 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1108 block_client_ref
1109 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1110 .await
1111 .expect("read_at failed");
1112 assert_eq!(
1113 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1114 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1115 );
1116 assert_eq!(
1117 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1118 &buf[..]
1119 );
1120 assert_eq!(
1121 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1122 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1123 );
1124 };
1125 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1126 join!(
1127 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1128 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1129 );
1130 }
1131
1132 struct FakeBlockServer<'a> {
1136 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1137 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1138 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1139 }
1140
1141 impl<'a> FakeBlockServer<'a> {
1142 fn new(
1154 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1155 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1156 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1157 ) -> FakeBlockServer<'a> {
1158 FakeBlockServer {
1159 server_channel: Some(server_channel),
1160 channel_handler: Box::new(channel_handler),
1161 fifo_handler: Box::new(fifo_handler),
1162 }
1163 }
1164
1165 async fn run(&mut self) {
1167 let server = self.server_channel.take().unwrap();
1168
1169 let (server_fifo, client_fifo) =
1171 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1172 .expect("Fifo::create failed");
1173 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1174
1175 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1176 let fifo_future = Abortable::new(
1177 async {
1178 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1179 let (mut reader, mut writer) = fifo.async_io();
1180 let mut request = BlockFifoRequest::default();
1181 loop {
1182 match reader.read_entries(&mut request).await {
1183 Ok(1) => {}
1184 Err(zx::Status::PEER_CLOSED) => break,
1185 Err(e) => panic!("read_entry failed {:?}", e),
1186 _ => unreachable!(),
1187 };
1188
1189 let response = self.fifo_handler.as_ref()(request);
1190 writer
1191 .write_entries(std::slice::from_ref(&response))
1192 .await
1193 .expect("write_entries failed");
1194 }
1195 },
1196 fifo_future_abort_registration,
1197 );
1198
1199 let channel_future = async {
1200 server
1201 .into_stream()
1202 .for_each_concurrent(None, |request| async {
1203 let request = request.expect("unexpected fidl error");
1204
1205 match request {
1206 block::BlockRequest::GetInfo { responder } => {
1207 responder
1208 .send(Ok(&block::BlockInfo {
1209 block_count: 1024,
1210 block_size: 512,
1211 max_transfer_size: 1024 * 1024,
1212 flags: block::DeviceFlag::empty(),
1213 }))
1214 .expect("send failed");
1215 }
1216 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1217 let stream = session.into_stream();
1218 stream
1219 .for_each(|request| async {
1220 let request = request.expect("unexpected fidl error");
1221 if self.channel_handler.as_ref()(&request) {
1224 return;
1225 }
1226 match request {
1227 block::SessionRequest::GetFifo { responder } => {
1228 match maybe_server_fifo.lock().take() {
1229 Some(fifo) => {
1230 responder.send(Ok(fifo.downcast()))
1231 }
1232 None => responder.send(Err(
1233 zx::Status::NO_RESOURCES.into_raw(),
1234 )),
1235 }
1236 .expect("send failed")
1237 }
1238 block::SessionRequest::AttachVmo {
1239 vmo: _,
1240 responder,
1241 } => responder
1242 .send(Ok(&block::VmoId { id: 1 }))
1243 .expect("send failed"),
1244 block::SessionRequest::Close { responder } => {
1245 fifo_future_abort.abort();
1246 responder.send(Ok(())).expect("send failed")
1247 }
1248 }
1249 })
1250 .await
1251 }
1252 _ => panic!("Unexpected message"),
1253 }
1254 })
1255 .await;
1256 };
1257
1258 let _result = join!(fifo_future, channel_future);
1259 }
1261 }
1262
1263 #[fuchsia::test]
1264 async fn test_block_close_is_called() {
1265 let close_called = fuchsia_sync::Mutex::new(false);
1266 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1267
1268 std::thread::spawn(move || {
1269 let _block_client =
1270 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1271 });
1273
1274 let channel_handler = |request: &block::SessionRequest| -> bool {
1275 if let block::SessionRequest::Close { .. } = request {
1276 *close_called.lock() = true;
1277 }
1278 false
1279 };
1280 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1281
1282 assert!(*close_called.lock());
1284 }
1285
1286 #[fuchsia::test]
1287 async fn test_block_flush_is_called() {
1288 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1289
1290 struct Interface {
1291 flush_called: Arc<AtomicBool>,
1292 }
1293 impl block_server::async_interface::Interface for Interface {
1294 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1295 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1296 device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1297 max_transfer_blocks: None,
1298 block_range: Some(0..1000),
1299 type_guid: [0; 16],
1300 instance_guid: [0; 16],
1301 name: "foo".to_string(),
1302 flags: 0,
1303 }))
1304 }
1305
1306 async fn read(
1307 &self,
1308 _device_block_offset: u64,
1309 _block_count: u32,
1310 _vmo: &Arc<zx::Vmo>,
1311 _vmo_offset: u64,
1312 _opts: ReadOptions,
1313 _trace_flow_id: Option<NonZero<u64>>,
1314 ) -> Result<(), zx::Status> {
1315 unreachable!();
1316 }
1317
1318 async fn write(
1319 &self,
1320 _device_block_offset: u64,
1321 _block_count: u32,
1322 _vmo: &Arc<zx::Vmo>,
1323 _vmo_offset: u64,
1324 _opts: WriteOptions,
1325 _trace_flow_id: Option<NonZero<u64>>,
1326 ) -> Result<(), zx::Status> {
1327 unreachable!();
1328 }
1329
1330 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1331 self.flush_called.store(true, Ordering::Relaxed);
1332 Ok(())
1333 }
1334
1335 async fn trim(
1336 &self,
1337 _device_block_offset: u64,
1338 _block_count: u32,
1339 _trace_flow_id: Option<NonZero<u64>>,
1340 ) -> Result<(), zx::Status> {
1341 unreachable!();
1342 }
1343 }
1344
1345 let flush_called = Arc::new(AtomicBool::new(false));
1346
1347 futures::join!(
1348 async {
1349 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1350
1351 block_client.flush().await.expect("flush failed");
1352 },
1353 async {
1354 let block_server = BlockServer::new(
1355 512,
1356 Arc::new(Interface { flush_called: flush_called.clone() }),
1357 );
1358 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1359 }
1360 );
1361
1362 assert!(flush_called.load(Ordering::Relaxed));
1363 }
1364
1365 #[fuchsia::test]
1366 async fn test_trace_flow_ids_set() {
1367 let (proxy, server) = fidl::endpoints::create_proxy();
1368
1369 futures::join!(
1370 async {
1371 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1372 block_client.flush().await.expect("flush failed");
1373 },
1374 async {
1375 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1376 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1377 if request.trace_flow_id > 0 {
1378 *flow_id.lock() = Some(request.trace_flow_id);
1379 }
1380 BlockFifoResponse {
1381 status: zx::Status::OK.into_raw(),
1382 reqid: request.reqid,
1383 ..Default::default()
1384 }
1385 };
1386 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1387 assert!(flow_id.lock().is_some());
1389 }
1390 );
1391 }
1392}