1use anyhow::Error;
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fidl_fuchsia_storage_block as fblock;
8use fuchsia_async as fasync;
9use fuchsia_async::epoch::{Epoch, EpochGuard};
10use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
11use futures::{Future, FutureExt as _, TryStreamExt as _};
12use slab::Slab;
13use std::borrow::{Borrow, Cow};
14use std::collections::BTreeMap;
15use std::num::NonZero;
16use std::ops::Range;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use storage_device::buffer::Buffer;
20use zx::HandleBased;
21
22pub mod async_interface;
23pub mod c_interface;
24pub mod callback_interface;
25
26#[cfg(test)]
27mod decompression_tests;
28
29pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
30
31type TraceFlowId = Option<NonZero<u64>>;
32
33#[derive(Clone)]
34pub enum DeviceInfo {
35 Block(BlockInfo),
36 Partition(PartitionInfo),
37}
38
39impl DeviceInfo {
40 pub fn label(&self) -> &str {
41 match self {
42 Self::Block(BlockInfo { .. }) => "",
43 Self::Partition(PartitionInfo { name, .. }) => name,
44 }
45 }
46 pub fn device_flags(&self) -> fblock::DeviceFlag {
47 match self {
48 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
49 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
50 }
51 }
52
53 pub fn block_count(&self) -> Option<u64> {
54 match self {
55 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
56 Self::Partition(PartitionInfo { block_range, .. }) => {
57 block_range.as_ref().map(|range| range.end - range.start)
58 }
59 }
60 }
61
62 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
63 match self {
64 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
65 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
66 max_transfer_blocks.clone()
67 }
68 }
69 }
70
71 fn max_transfer_size(&self, block_size: u32) -> u32 {
72 if let Some(max_blocks) = self.max_transfer_blocks() {
73 max_blocks.get() * block_size
74 } else {
75 MAX_TRANSFER_UNBOUNDED
76 }
77 }
78}
79
80#[derive(Clone, Default)]
82pub struct BlockInfo {
83 pub device_flags: fblock::DeviceFlag,
84 pub block_count: u64,
85 pub max_transfer_blocks: Option<NonZero<u32>>,
86}
87
88#[derive(Clone, Default)]
90pub struct PartitionInfo {
91 pub device_flags: fblock::DeviceFlag,
93 pub max_transfer_blocks: Option<NonZero<u32>>,
94 pub block_range: Option<Range<u64>>,
98 pub type_guid: [u8; 16],
99 pub instance_guid: [u8; 16],
100 pub name: String,
101 pub flags: u64,
102}
103
104struct ActiveRequest<S> {
107 session: S,
108 group_or_request: GroupOrRequest,
109 trace_flow_id: TraceFlowId,
110 _epoch_guard: EpochGuard<'static>,
111 status: zx::Status,
112 count: u32,
113 req_id: Option<u32>,
114 decompression_info: Option<DecompressionInfo>,
115}
116
117struct DecompressionInfo {
118 compressed_range: Range<usize>,
120
121 uncompressed_range: Range<u64>,
123
124 bytes_so_far: u64,
125 mapping: Arc<VmoMapping>,
126 buffer: Option<Buffer<'static>>,
127}
128
129impl DecompressionInfo {
130 fn uncompressed_slice(&self) -> *mut [u8] {
132 std::ptr::slice_from_raw_parts_mut(
133 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
134 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
135 )
136 }
137}
138
139pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
140
141impl<S> Default for ActiveRequests<S> {
142 fn default() -> Self {
143 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
144 }
145}
146
147impl<S> ActiveRequests<S> {
148 fn complete_and_take_response(
149 &self,
150 request_id: RequestId,
151 status: zx::Status,
152 ) -> Option<(S, BlockFifoResponse)> {
153 self.0.lock().complete_and_take_response(request_id, status)
154 }
155
156 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
157 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
158 }
159}
160
161struct ActiveRequestsInner<S> {
162 requests: Slab<ActiveRequest<S>>,
163}
164
165impl<S> ActiveRequestsInner<S> {
167 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
169 let group = &mut self.requests[request_id.0];
170
171 group.count = group.count.checked_sub(1).unwrap();
172 if status != zx::Status::OK && group.status == zx::Status::OK {
173 group.status = status
174 }
175
176 fuchsia_trace::duration!(
177 "storage",
178 "block_server::finish_transaction",
179 "request_id" => request_id.0,
180 "group_completed" => group.count == 0,
181 "status" => status.into_raw());
182 if let Some(trace_flow_id) = group.trace_flow_id {
183 fuchsia_trace::flow_step!(
184 "storage",
185 "block_server::finish_request",
186 trace_flow_id.get().into()
187 );
188 }
189
190 if group.count == 0
191 && group.status == zx::Status::OK
192 && let Some(info) = &mut group.decompression_info
193 {
194 thread_local! {
195 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
196 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
197 }
198 DECOMPRESSOR.with(|decompressor| {
199 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
201 let mut decompressor = decompressor.borrow_mut();
202 if let Err(error) = decompressor.decompress_to_buffer(
203 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
204 target_slice,
205 ) {
206 log::warn!(error:?; "Decompression error");
207 group.status = zx::Status::IO_DATA_INTEGRITY;
208 };
209 });
210 }
211 }
212
213 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
215 let group = &self.requests[request_id.0];
216 match group.req_id {
217 Some(reqid) if group.count == 0 => {
218 let group = self.requests.remove(request_id.0);
219 Some((
220 group.session,
221 BlockFifoResponse {
222 status: group.status.into_raw(),
223 reqid,
224 group: group.group_or_request.group_id().unwrap_or(0),
225 ..Default::default()
226 },
227 ))
228 }
229 _ => None,
230 }
231 }
232
233 fn complete_and_take_response(
235 &mut self,
236 request_id: RequestId,
237 status: zx::Status,
238 ) -> Option<(S, BlockFifoResponse)> {
239 self.complete(request_id, status);
240 self.take_response(request_id)
241 }
242}
243
244pub struct BlockServer<SM: SessionManager> {
247 block_size: u32,
248 orchestrator: Arc<SM::Orchestrator>,
249}
250
251#[derive(Debug)]
253pub struct BlockOffsetMapping {
254 source_block_offset: u64,
255 target_block_offset: u64,
256 length: u64,
257}
258
259impl BlockOffsetMapping {
260 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
261 blocks.0 >= self.source_block_offset
262 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
263 }
264}
265
266impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
267 type Error = zx::Status;
268
269 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
270 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
272 Ok(Self {
273 source_block_offset: wire.source_block_offset,
274 target_block_offset: wire.target_block_offset,
275 length: wire.length,
276 })
277 }
278}
279
280pub struct OffsetMap {
282 mapping: Option<BlockOffsetMapping>,
283 max_transfer_blocks: Option<NonZero<u32>>,
284}
285
286impl OffsetMap {
287 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
289 Self { mapping: Some(mapping), max_transfer_blocks }
290 }
291
292 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
294 Self { mapping: None, max_transfer_blocks }
295 }
296
297 pub fn is_empty(&self) -> bool {
298 self.mapping.is_none()
299 }
300
301 fn mapping(&self) -> Option<&BlockOffsetMapping> {
302 self.mapping.as_ref()
303 }
304
305 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
306 self.max_transfer_blocks
307 }
308}
309
310pub trait SessionManager: 'static {
313 type Orchestrator: Borrow<Self> + Send + Sync;
318
319 const SUPPORTS_DECOMPRESSION: bool;
320
321 type Session;
322
323 fn on_attach_vmo(
324 orchestrator: Arc<Self::Orchestrator>,
325 vmo: &Arc<zx::Vmo>,
326 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
327
328 fn open_session(
333 orchestrator: Arc<Self::Orchestrator>,
334 stream: fblock::SessionRequestStream,
335 offset_map: OffsetMap,
336 block_size: u32,
337 ) -> impl Future<Output = Result<(), Error>> + Send;
338
339 fn get_info(&self) -> Cow<'_, DeviceInfo>;
341
342 fn get_volume_info(
344 &self,
345 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
346 {
347 async { Err(zx::Status::NOT_SUPPORTED) }
348 }
349
350 fn query_slices(
352 &self,
353 _start_slices: &[u64],
354 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
355 async { Err(zx::Status::NOT_SUPPORTED) }
356 }
357
358 fn extend(
360 &self,
361 _start_slice: u64,
362 _slice_count: u64,
363 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
364 async { Err(zx::Status::NOT_SUPPORTED) }
365 }
366
367 fn shrink(
369 &self,
370 _start_slice: u64,
371 _slice_count: u64,
372 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
373 async { Err(zx::Status::NOT_SUPPORTED) }
374 }
375
376 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
378}
379
380pub trait IntoOrchestrator {
384 type SM: SessionManager;
385
386 fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
387}
388
389impl<SM: SessionManager> BlockServer<SM> {
390 pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
391 Self { block_size, orchestrator: orchestrator.into_orchestrator() }
392 }
393
394 pub fn session_manager(&self) -> &SM {
395 self.orchestrator.as_ref().borrow()
396 }
397
398 pub async fn handle_requests(
400 &self,
401 mut requests: fblock::BlockRequestStream,
402 ) -> Result<(), Error> {
403 let scope = fasync::Scope::new();
404 loop {
405 match requests.try_next().await {
406 Ok(Some(request)) => {
407 if let Some(session) = self.handle_request(request).await? {
408 scope.spawn(session.map(|_| ()));
409 }
410 }
411 Ok(None) => break,
412 Err(err) => log::warn!(err:?; "Invalid request"),
413 }
414 }
415 scope.await;
416 Ok(())
417 }
418
419 async fn handle_request(
422 &self,
423 request: fblock::BlockRequest,
424 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
425 match request {
426 fblock::BlockRequest::GetInfo { responder } => {
427 let info = self.device_info();
428 let max_transfer_size = info.max_transfer_size(self.block_size);
429 let (block_count, mut flags) = match info.as_ref() {
430 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
431 (*block_count, *device_flags)
432 }
433 DeviceInfo::Partition(partition_info) => {
434 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
435 range.end - range.start
436 } else {
437 let volume_info = self.session_manager().get_volume_info().await?;
438 volume_info.0.slice_size * volume_info.1.partition_slice_count
439 / self.block_size as u64
440 };
441 (block_count, partition_info.device_flags)
442 }
443 };
444 if SM::SUPPORTS_DECOMPRESSION {
445 flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
446 }
447 responder.send(Ok(&fblock::BlockInfo {
448 block_count,
449 block_size: self.block_size,
450 max_transfer_size,
451 flags,
452 }))?;
453 }
454 fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
455 let info = self.device_info();
456 return Ok(Some(SM::open_session(
457 self.orchestrator.clone(),
458 session.into_stream(),
459 OffsetMap::empty(info.max_transfer_blocks()),
460 self.block_size,
461 )));
462 }
463 fblock::BlockRequest::OpenSessionWithOffsetMap {
464 session,
465 mapping,
466 control_handle: _,
467 } => {
468 let info = self.device_info();
469 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
470 Ok(m) => m,
471 Err(status) => {
472 session.close_with_epitaph(status)?;
473 return Ok(None);
474 }
475 };
476 if let Some(max) = info.block_count() {
477 if initial_mapping.target_block_offset + initial_mapping.length > max {
478 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
479 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
480 return Ok(None);
481 }
482 }
483 return Ok(Some(SM::open_session(
484 self.orchestrator.clone(),
485 session.into_stream(),
486 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
487 self.block_size,
488 )));
489 }
490 fblock::BlockRequest::GetTypeGuid { responder } => {
491 let info = self.device_info();
492 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
493 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
494 guid.value.copy_from_slice(&partition_info.type_guid);
495 responder.send(zx::sys::ZX_OK, Some(&guid))?;
496 } else {
497 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
498 }
499 }
500 fblock::BlockRequest::GetInstanceGuid { responder } => {
501 let info = self.device_info();
502 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
503 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
504 guid.value.copy_from_slice(&partition_info.instance_guid);
505 responder.send(zx::sys::ZX_OK, Some(&guid))?;
506 } else {
507 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
508 }
509 }
510 fblock::BlockRequest::GetName { responder } => {
511 let info = self.device_info();
512 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
513 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
514 } else {
515 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
516 }
517 }
518 fblock::BlockRequest::GetMetadata { responder } => {
519 let info = self.device_info();
520 if let DeviceInfo::Partition(info) = info.as_ref() {
521 let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
522 type_guid.value.copy_from_slice(&info.type_guid);
523 let mut instance_guid =
524 fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
525 instance_guid.value.copy_from_slice(&info.instance_guid);
526 responder.send(Ok(&fblock::BlockGetMetadataResponse {
527 name: Some(info.name.clone()),
528 type_guid: Some(type_guid),
529 instance_guid: Some(instance_guid),
530 start_block_offset: info.block_range.as_ref().map(|range| range.start),
531 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
532 flags: Some(info.flags),
533 ..Default::default()
534 }))?;
535 } else {
536 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
537 }
538 }
539 fblock::BlockRequest::QuerySlices { responder, start_slices } => {
540 match self.session_manager().query_slices(&start_slices).await {
541 Ok(mut results) => {
542 let results_len = results.len();
543 assert!(results_len <= 16);
544 results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
545 responder.send(
546 zx::sys::ZX_OK,
547 &results.try_into().unwrap(),
548 results_len as u64,
549 )?;
550 }
551 Err(s) => {
552 responder.send(
553 s.into_raw(),
554 &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
555 0,
556 )?;
557 }
558 }
559 }
560 fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
561 match self.session_manager().get_volume_info().await {
562 Ok((manager_info, volume_info)) => {
563 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
564 }
565 Err(s) => responder.send(s.into_raw(), None, None)?,
566 }
567 }
568 fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
569 responder.send(
570 zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
571 .into_raw(),
572 )?;
573 }
574 fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
575 responder.send(
576 zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
577 .into_raw(),
578 )?;
579 }
580 fblock::BlockRequest::Destroy { responder, .. } => {
581 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
582 }
583 }
584 Ok(None)
585 }
586
587 fn device_info(&self) -> Cow<'_, DeviceInfo> {
588 self.session_manager().get_info()
589 }
590}
591
592struct SessionHelper<SM: SessionManager> {
593 orchestrator: Arc<SM::Orchestrator>,
594 offset_map: OffsetMap,
595 block_size: u32,
596 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
597 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
598}
599
600struct VmoMapping {
601 base: usize,
602 size: usize,
603}
604
605impl VmoMapping {
606 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
607 let size = vmo.get_size().unwrap() as usize;
608 Ok(Arc::new(Self {
609 base: fuchsia_runtime::vmar_root_self()
610 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
611 .inspect_err(|error| {
612 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
613 })?,
614 size,
615 }))
616 }
617}
618
619impl Drop for VmoMapping {
620 fn drop(&mut self) {
621 unsafe {
623 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
624 }
625 }
626}
627
628enum HandleRequestResult {
629 Ok,
631 Closed(Box<dyn FnOnce() + Send + 'static>),
635}
636
637impl<SM: SessionManager> SessionHelper<SM> {
638 fn new(
639 orchestrator: Arc<SM::Orchestrator>,
640 offset_map: OffsetMap,
641 block_size: u32,
642 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
643 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
644 Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
645 }
646
647 fn session_manager(&self) -> &SM {
648 self.orchestrator.as_ref().borrow()
649 }
650
651 async fn handle_request(
652 &self,
653 request: fblock::SessionRequest,
654 ) -> Result<HandleRequestResult, Error> {
655 match request {
656 fblock::SessionRequest::GetFifo { responder } => {
657 let rights = zx::Rights::TRANSFER
658 | zx::Rights::READ
659 | zx::Rights::WRITE
660 | zx::Rights::SIGNAL
661 | zx::Rights::WAIT;
662 match self.peer_fifo.duplicate_handle(rights) {
663 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
664 Err(s) => responder.send(Err(s.into_raw()))?,
665 }
666 Ok(HandleRequestResult::Ok)
667 }
668 fblock::SessionRequest::AttachVmo { vmo, responder } => {
669 let vmo = Arc::new(vmo);
670 let vmo_id = {
671 let mut vmos = self.vmos.lock();
672 if vmos.len() == u16::MAX as usize {
673 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
674 return Ok(HandleRequestResult::Ok);
675 } else {
676 let vmo_id = match vmos.last_entry() {
677 None => 1,
678 Some(o) => {
679 o.key().checked_add(1).unwrap_or_else(|| {
680 let mut vmo_id = 1;
681 for (&id, _) in &*vmos {
683 if id > vmo_id {
684 break;
685 }
686 vmo_id = id + 1;
687 }
688 vmo_id
689 })
690 }
691 };
692 vmos.insert(vmo_id, (vmo.clone(), None));
693 vmo_id
694 }
695 };
696 SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
697 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
698 Ok(HandleRequestResult::Ok)
699 }
700 fblock::SessionRequest::Close { responder } => {
701 Ok(HandleRequestResult::Closed(Box::new(move || {
702 if let Err(err) = responder.send(Ok(())) {
703 log::warn!(err:?; "Error sending close response");
704 }
705 })))
706 }
707 }
708 }
709
710 fn decode_fifo_request(
712 &self,
713 session: SM::Session,
714 request: &BlockFifoRequest,
715 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
716 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
717
718 let request_bytes = request.length as u64 * self.block_size as u64;
719
720 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
721 .ok_or(zx::Status::INVALID_ARGS)
722 .and_then(|code| {
723 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
724 if code != BlockOpcode::Read {
725 return Err(zx::Status::INVALID_ARGS);
726 }
727 if !SM::SUPPORTS_DECOMPRESSION {
728 return Err(zx::Status::NOT_SUPPORTED);
729 }
730 }
731 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
732 if request.length == 0 {
733 return Err(zx::Status::INVALID_ARGS);
734 }
735 if request.dev_offset.checked_add(request.length as u64).is_none()
737 || (code != BlockOpcode::Trim
738 && request_bytes.checked_add(request.vmo_offset).is_none())
739 {
740 return Err(zx::Status::OUT_OF_RANGE);
741 }
742 }
743 Ok(match code {
744 BlockOpcode::Read => Operation::Read {
745 device_block_offset: request.dev_offset,
746 block_count: request.length,
747 _unused: 0,
748 vmo_offset: request
749 .vmo_offset
750 .checked_mul(self.block_size as u64)
751 .ok_or(zx::Status::OUT_OF_RANGE)?,
752 options: ReadOptions {
753 inline_crypto: InlineCryptoOptions {
754 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
755 dun: request.dun,
756 slot: request.slot,
757 },
758 },
759 },
760 BlockOpcode::Write => {
761 let mut options = WriteOptions {
762 inline_crypto: InlineCryptoOptions {
763 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
764 dun: request.dun,
765 slot: request.slot,
766 },
767 ..WriteOptions::default()
768 };
769 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
770 options.flags |= WriteFlags::FORCE_ACCESS;
771 }
772 if flags.contains(BlockIoFlag::PRE_BARRIER) {
773 options.flags |= WriteFlags::PRE_BARRIER;
774 }
775 Operation::Write {
776 device_block_offset: request.dev_offset,
777 block_count: request.length,
778 _unused: 0,
779 options,
780 vmo_offset: request
781 .vmo_offset
782 .checked_mul(self.block_size as u64)
783 .ok_or(zx::Status::OUT_OF_RANGE)?,
784 }
785 }
786 BlockOpcode::Flush => Operation::Flush,
787 BlockOpcode::Trim => Operation::Trim {
788 device_block_offset: request.dev_offset,
789 block_count: request.length,
790 },
791 BlockOpcode::CloseVmo => Operation::CloseVmo,
792 })
793 });
794
795 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
796 GroupOrRequest::Group(request.group)
797 } else {
798 GroupOrRequest::Request(request.reqid)
799 };
800
801 let mut active_requests = self.session_manager().active_requests().0.lock();
802 let mut request_id = None;
803
804 if group_or_request.is_group() {
815 for (key, group) in &mut active_requests.requests {
819 if group.group_or_request == group_or_request {
820 if group.req_id.is_some() {
821 if group.status == zx::Status::OK {
823 group.status = zx::Status::INVALID_ARGS;
824 }
825 return Err(None);
827 }
828 if group.status == zx::Status::OK
830 && let Some(info) = &mut group.decompression_info
831 {
832 if let Ok(Operation::Read {
833 device_block_offset,
834 mut block_count,
835 options,
836 vmo_offset: 0,
837 ..
838 }) = operation
839 {
840 let remaining_bytes = info
841 .compressed_range
842 .end
843 .next_multiple_of(self.block_size as usize)
844 as u64
845 - info.bytes_so_far;
846 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
847 || request.total_compressed_bytes != 0
848 || request.uncompressed_bytes != 0
849 || request.compressed_prefix_bytes != 0
850 || (flags.contains(BlockIoFlag::GROUP_LAST)
851 && info.bytes_so_far + request_bytes
852 < info.compressed_range.end as u64)
853 || (!flags.contains(BlockIoFlag::GROUP_LAST)
854 && request_bytes >= remaining_bytes)
855 {
856 group.status = zx::Status::INVALID_ARGS;
857 } else {
858 if request_bytes > remaining_bytes {
867 block_count = (remaining_bytes / self.block_size as u64) as u32;
868 }
869
870 operation = Ok(Operation::ContinueDecompressedRead {
871 offset: info.bytes_so_far,
872 device_block_offset,
873 block_count,
874 options,
875 });
876
877 info.bytes_so_far += block_count as u64 * self.block_size as u64;
878 }
879 } else {
880 group.status = zx::Status::INVALID_ARGS;
881 }
882 }
883 if flags.contains(BlockIoFlag::GROUP_LAST) {
884 group.req_id = Some(request.reqid);
885 if group.status != zx::Status::OK {
888 operation = Err(group.status);
889 }
890 } else if group.status != zx::Status::OK {
891 return Err(None);
894 }
895 request_id = Some(RequestId(key));
896 group.count += 1;
897 break;
898 }
899 }
900 }
901
902 let is_single_request =
903 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
904
905 let mut decompression_info = None;
906 let vmo = match operation {
907 Ok(Operation::Read {
908 device_block_offset,
909 mut block_count,
910 options,
911 vmo_offset,
912 ..
913 }) => match self.vmos.lock().get_mut(&request.vmoid) {
914 Some((vmo, mapping)) => {
915 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
916 let compressed_range = request.compressed_prefix_bytes as usize
917 ..request.compressed_prefix_bytes as usize
918 + request.total_compressed_bytes as usize;
919 let required_buffer_size =
920 compressed_range.end.next_multiple_of(self.block_size as usize);
921
922 if compressed_range.start >= compressed_range.end
924 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
925 || (is_single_request && request_bytes < compressed_range.end as u64)
926 || (!is_single_request && request_bytes >= required_buffer_size as u64)
927 {
928 Err(zx::Status::INVALID_ARGS)
929 } else {
930 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
937 block_count =
938 (required_buffer_size / self.block_size as usize) as u32;
939 required_buffer_size as u64
940 } else {
941 request_bytes
942 };
943
944 match mapping {
946 Some(mapping) => Ok(mapping.clone()),
947 None => {
948 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
949 }
950 }
951 .and_then(|mapping| {
952 if vmo_offset
955 .checked_add(request.uncompressed_bytes as u64)
956 .is_some_and(|end| end <= mapping.size as u64)
957 {
958 Ok(mapping)
959 } else {
960 Err(zx::Status::OUT_OF_RANGE)
961 }
962 })
963 .map(|mapping| {
964 operation = Ok(Operation::StartDecompressedRead {
969 required_buffer_size,
970 device_block_offset,
971 block_count,
972 options,
973 });
974 decompression_info = Some(DecompressionInfo {
977 compressed_range,
978 bytes_so_far,
979 mapping,
980 uncompressed_range: vmo_offset
981 ..vmo_offset + request.uncompressed_bytes as u64,
982 buffer: None,
983 });
984 None
985 })
986 }
987 } else {
988 Ok(Some(vmo.clone()))
989 }
990 }
991 None => Err(zx::Status::IO),
992 },
993 Ok(Operation::Write { .. }) => self
994 .vmos
995 .lock()
996 .get(&request.vmoid)
997 .cloned()
998 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
999 Ok(Operation::CloseVmo) => {
1000 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
1001 let vmo_clone = vmo.clone();
1002 Epoch::global().defer(move || drop(vmo_clone));
1005 Ok(Some(vmo))
1006 })
1007 }
1008 _ => Ok(None),
1009 }
1010 .unwrap_or_else(|e| {
1011 operation = Err(e);
1012 None
1013 });
1014
1015 let trace_flow_id = NonZero::new(request.trace_flow_id);
1016 let request_id = request_id.unwrap_or_else(|| {
1017 RequestId(active_requests.requests.insert(ActiveRequest {
1018 session,
1019 group_or_request,
1020 trace_flow_id,
1021 _epoch_guard: Epoch::global().guard(),
1022 status: zx::Status::OK,
1023 count: 1,
1024 req_id: is_single_request.then_some(request.reqid),
1025 decompression_info,
1026 }))
1027 });
1028
1029 Ok(DecodedRequest {
1030 request_id,
1031 trace_flow_id,
1032 operation: operation.map_err(|status| {
1033 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1034 })?,
1035 vmo,
1036 })
1037 }
1038
1039 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1040 std::mem::take(&mut *self.vmos.lock())
1041 }
1042
1043 fn map_request(
1045 &self,
1046 mut request: DecodedRequest,
1047 active_request: &mut ActiveRequest<SM::Session>,
1048 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1049 if active_request.status != zx::Status::OK {
1050 return Err(zx::Status::BAD_STATE);
1051 }
1052 let mapping = self.offset_map.mapping();
1053 match (mapping, request.operation.blocks()) {
1054 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1055 return Err(zx::Status::OUT_OF_RANGE);
1056 }
1057 _ => {}
1058 }
1059 let remainder = request.operation.map(
1060 self.offset_map.mapping(),
1061 self.offset_map.max_transfer_blocks(),
1062 self.block_size,
1063 );
1064 if remainder.is_some() {
1065 active_request.count += 1;
1066 }
1067 static CACHE: AtomicU64 = AtomicU64::new(0);
1068 if let Some(context) =
1069 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1070 {
1071 use fuchsia_trace::ArgValue;
1072 let trace_args = [
1073 ArgValue::of("request_id", request.request_id.0),
1074 ArgValue::of("opcode", request.operation.trace_label()),
1075 ];
1076 let _scope =
1077 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1078 if let Some(trace_flow_id) = active_request.trace_flow_id {
1079 fuchsia_trace::flow_step(
1080 &context,
1081 "block_server::start_transaction",
1082 trace_flow_id.get().into(),
1083 &[],
1084 );
1085 }
1086 }
1087 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1088 Ok((request, remainder))
1089 }
1090
1091 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1096 self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1097 }
1098
1099 fn close_active_groups(&self, pred: impl Fn(&SM::Session) -> bool) {
1112 self.session_manager().active_requests().0.lock().requests.retain(|_, request| {
1113 if !pred(&request.session) || request.req_id.is_some() {
1114 return true;
1115 }
1116 request.req_id = Some(u32::MAX);
1119 request.count > 0
1120 });
1121 }
1122}
1123
1124#[repr(transparent)]
1125#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1126pub struct RequestId(usize);
1127
1128#[derive(Clone, Debug)]
1129struct DecodedRequest {
1130 request_id: RequestId,
1131 trace_flow_id: TraceFlowId,
1132 operation: Operation,
1133 vmo: Option<Arc<zx::Vmo>>,
1134}
1135
1136pub type WriteFlags = block_protocol::WriteFlags;
1138pub type WriteOptions = block_protocol::WriteOptions;
1139pub type ReadOptions = block_protocol::ReadOptions;
1140pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1141
1142#[repr(C)]
1143#[derive(Clone, Debug, PartialEq, Eq)]
1144pub enum Operation {
1145 Read {
1150 device_block_offset: u64,
1151 block_count: u32,
1152 _unused: u32,
1153 vmo_offset: u64,
1154 options: ReadOptions,
1155 },
1156 Write {
1157 device_block_offset: u64,
1158 block_count: u32,
1159 _unused: u32,
1160 vmo_offset: u64,
1161 options: WriteOptions,
1162 },
1163 Flush,
1164 Trim {
1165 device_block_offset: u64,
1166 block_count: u32,
1167 },
1168 CloseVmo,
1170 StartDecompressedRead {
1172 required_buffer_size: usize,
1173 device_block_offset: u64,
1174 block_count: u32,
1175 options: ReadOptions,
1176 },
1177 ContinueDecompressedRead {
1179 offset: u64,
1180 device_block_offset: u64,
1181 block_count: u32,
1182 options: ReadOptions,
1183 },
1184}
1185
1186impl Operation {
1187 fn trace_label(&self) -> &'static str {
1188 match self {
1189 Operation::Read { .. } => "read",
1190 Operation::Write { .. } => "write",
1191 Operation::Flush { .. } => "flush",
1192 Operation::Trim { .. } => "trim",
1193 Operation::CloseVmo { .. } => "close_vmo",
1194 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1195 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1196 }
1197 }
1198
1199 fn blocks(&self) -> Option<(u64, u32)> {
1201 match self {
1202 Operation::Read { device_block_offset, block_count, .. }
1203 | Operation::Write { device_block_offset, block_count, .. }
1204 | Operation::Trim { device_block_offset, block_count, .. } => {
1205 Some((*device_block_offset, *block_count))
1206 }
1207 _ => None,
1208 }
1209 }
1210
1211 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1213 match self {
1214 Operation::Read { device_block_offset, block_count, .. }
1215 | Operation::Write { device_block_offset, block_count, .. }
1216 | Operation::Trim { device_block_offset, block_count, .. } => {
1217 Some((device_block_offset, block_count))
1218 }
1219 _ => None,
1220 }
1221 }
1222
1223 fn map(
1226 &mut self,
1227 mapping: Option<&BlockOffsetMapping>,
1228 max_blocks: Option<NonZero<u32>>,
1229 block_size: u32,
1230 ) -> Option<Self> {
1231 let mut max = match self {
1232 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1233 _ => None,
1234 };
1235 let (offset, length) = self.blocks_mut()?;
1236 let orig_offset = *offset;
1237 if let Some(mapping) = mapping {
1238 let delta = *offset - mapping.source_block_offset;
1239 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1240 *offset = mapping.target_block_offset + delta;
1241 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1242 max = match max {
1243 None => Some(mapping_max),
1244 Some(m) => Some(std::cmp::min(m, mapping_max)),
1245 };
1246 };
1247 if let Some(max) = max {
1248 if *length as u64 > max {
1249 let rem = (*length as u64 - max) as u32;
1250 *length = max as u32;
1251 return Some(match self {
1252 Operation::Read {
1253 device_block_offset: _,
1254 block_count: _,
1255 vmo_offset,
1256 _unused,
1257 options,
1258 } => {
1259 let mut options = *options;
1260 options.inline_crypto.dun += max as u32;
1261 Operation::Read {
1262 device_block_offset: orig_offset + max,
1263 block_count: rem,
1264 vmo_offset: *vmo_offset + max * block_size as u64,
1265 _unused: *_unused,
1266 options: options,
1267 }
1268 }
1269 Operation::Write {
1270 device_block_offset: _,
1271 block_count: _,
1272 _unused,
1273 vmo_offset,
1274 options,
1275 } => {
1276 let mut options = *options;
1277 options.inline_crypto.dun += max as u32;
1278 Operation::Write {
1279 device_block_offset: orig_offset + max,
1280 block_count: rem,
1281 _unused: *_unused,
1282 vmo_offset: *vmo_offset + max * block_size as u64,
1283 options: options,
1284 }
1285 }
1286 Operation::Trim { device_block_offset: _, block_count: _ } => {
1287 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1288 }
1289 _ => unreachable!(),
1290 });
1291 }
1292 }
1293 None
1294 }
1295
1296 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1298 if let Operation::Write { options, .. } = self {
1299 options.flags.contains(value)
1300 } else {
1301 false
1302 }
1303 }
1304
1305 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1307 if let Operation::Write { options, .. } = self {
1308 let result = options.flags.contains(value);
1309 options.flags.remove(value);
1310 result
1311 } else {
1312 false
1313 }
1314 }
1315}
1316
1317#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1318pub enum GroupOrRequest {
1319 Group(u16),
1320 Request(u32),
1321}
1322
1323impl GroupOrRequest {
1324 fn is_group(&self) -> bool {
1325 matches!(self, Self::Group(_))
1326 }
1327
1328 fn group_id(&self) -> Option<u16> {
1329 match self {
1330 Self::Group(id) => Some(*id),
1331 Self::Request(_) => None,
1332 }
1333 }
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338 use super::{
1339 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1340 TraceFlowId,
1341 };
1342 use assert_matches::assert_matches;
1343 use block_protocol::{
1344 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1345 WriteFlags, WriteOptions,
1346 };
1347 use fidl_fuchsia_storage_block as fblock;
1348 use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1349 use fuchsia_async as fasync;
1350 use fuchsia_sync::Mutex;
1351 use futures::FutureExt as _;
1352 use futures::channel::oneshot;
1353 use futures::future::BoxFuture;
1354 use std::borrow::Cow;
1355 use std::future::poll_fn;
1356 use std::num::NonZero;
1357 use std::pin::pin;
1358 use std::sync::Arc;
1359 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1360 use std::task::{Context, Poll};
1361 use zx::HandleBased as _;
1362
1363 #[derive(Default)]
1364 struct MockInterface {
1365 read_hook: Option<
1366 Box<
1367 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1368 + Send
1369 + Sync,
1370 >,
1371 >,
1372 write_hook:
1373 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1374 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1375 }
1376
1377 impl super::async_interface::Interface for MockInterface {
1378 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1379 Ok(())
1380 }
1381
1382 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1383 Cow::Owned(test_device_info())
1384 }
1385
1386 async fn read(
1387 &self,
1388 device_block_offset: u64,
1389 block_count: u32,
1390 vmo: &Arc<zx::Vmo>,
1391 vmo_offset: u64,
1392 _opts: ReadOptions,
1393 _trace_flow_id: TraceFlowId,
1394 ) -> Result<(), zx::Status> {
1395 if let Some(read_hook) = &self.read_hook {
1396 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1397 } else {
1398 unimplemented!();
1399 }
1400 }
1401
1402 async fn write(
1403 &self,
1404 device_block_offset: u64,
1405 _block_count: u32,
1406 _vmo: &Arc<zx::Vmo>,
1407 _vmo_offset: u64,
1408 opts: WriteOptions,
1409 _trace_flow_id: TraceFlowId,
1410 ) -> Result<(), zx::Status> {
1411 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1412 && let Some(barrier_hook) = &self.barrier_hook
1413 {
1414 barrier_hook()?;
1415 }
1416 if let Some(write_hook) = &self.write_hook {
1417 write_hook(device_block_offset).await
1418 } else {
1419 unimplemented!();
1420 }
1421 }
1422
1423 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1424 Ok(())
1425 }
1426
1427 async fn trim(
1428 &self,
1429 _device_block_offset: u64,
1430 _block_count: u32,
1431 _trace_flow_id: TraceFlowId,
1432 ) -> Result<(), zx::Status> {
1433 unreachable!();
1434 }
1435
1436 async fn get_volume_info(
1437 &self,
1438 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1439 let () = std::future::pending().await;
1441 unreachable!();
1442 }
1443 }
1444
1445 const BLOCK_SIZE: u32 = 512;
1446 const MAX_TRANSFER_BLOCKS: u32 = 10;
1447
1448 fn test_device_info() -> DeviceInfo {
1449 DeviceInfo::Partition(PartitionInfo {
1450 device_flags: fblock::DeviceFlag::READONLY
1451 | fblock::DeviceFlag::BARRIER_SUPPORT
1452 | fblock::DeviceFlag::FUA_SUPPORT,
1453 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1454 block_range: Some(0..100),
1455 type_guid: [1; 16],
1456 instance_guid: [2; 16],
1457 name: "foo".to_string(),
1458 flags: 0xabcd,
1459 })
1460 }
1461
1462 #[fuchsia::test]
1463 async fn test_barriers_ordering() {
1464 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1465 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1466 let barrier_called = Arc::new(AtomicBool::new(false));
1467
1468 futures::join!(
1469 async move {
1470 let barrier_called_clone = barrier_called.clone();
1471 let block_server = BlockServer::new(
1472 BLOCK_SIZE,
1473 Arc::new(MockInterface {
1474 barrier_hook: Some(Box::new(move || {
1475 barrier_called.store(true, Ordering::Relaxed);
1476 Ok(())
1477 })),
1478 write_hook: Some(Box::new(move |device_block_offset| {
1479 let barrier_called = barrier_called_clone.clone();
1480 Box::pin(async move {
1481 if device_block_offset % 2 == 0 {
1483 fasync::Timer::new(fasync::MonotonicInstant::after(
1484 zx::MonotonicDuration::from_millis(200),
1485 ))
1486 .await;
1487 }
1488 assert!(barrier_called.load(Ordering::Relaxed));
1489 Ok(())
1490 })
1491 })),
1492 ..MockInterface::default()
1493 }),
1494 );
1495 block_server.handle_requests(stream).await.unwrap();
1496 },
1497 async move {
1498 let (session_proxy, server) = fidl::endpoints::create_proxy();
1499
1500 proxy.open_session(server).unwrap();
1501
1502 let vmo_id = session_proxy
1503 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1504 .await
1505 .unwrap()
1506 .unwrap();
1507 assert_ne!(vmo_id.id, 0);
1508
1509 let mut fifo =
1510 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1511 let (mut reader, mut writer) = fifo.async_io();
1512
1513 writer
1514 .write_entries(&BlockFifoRequest {
1515 command: BlockFifoCommand {
1516 opcode: BlockOpcode::Write.into_primitive(),
1517 flags: BlockIoFlag::PRE_BARRIER.bits(),
1518 ..Default::default()
1519 },
1520 vmoid: vmo_id.id,
1521 dev_offset: 0,
1522 length: 5,
1523 vmo_offset: 6,
1524 ..Default::default()
1525 })
1526 .await
1527 .unwrap();
1528
1529 for i in 0..10 {
1530 writer
1531 .write_entries(&BlockFifoRequest {
1532 command: BlockFifoCommand {
1533 opcode: BlockOpcode::Write.into_primitive(),
1534 ..Default::default()
1535 },
1536 vmoid: vmo_id.id,
1537 dev_offset: i + 1,
1538 length: 5,
1539 vmo_offset: 6,
1540 ..Default::default()
1541 })
1542 .await
1543 .unwrap();
1544 }
1545 for _ in 0..11 {
1546 let mut response = BlockFifoResponse::default();
1547 reader.read_entries(&mut response).await.unwrap();
1548 assert_eq!(response.status, zx::sys::ZX_OK);
1549 }
1550
1551 std::mem::drop(proxy);
1552 }
1553 );
1554 }
1555
1556 #[fuchsia::test]
1557 async fn test_info() {
1558 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1559
1560 futures::join!(
1561 async {
1562 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1563 block_server.handle_requests(stream).await.unwrap();
1564 },
1565 async {
1566 let expected_info = test_device_info();
1567 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1568 info
1569 } else {
1570 unreachable!()
1571 };
1572
1573 let block_info = proxy.get_info().await.unwrap().unwrap();
1574 assert_eq!(
1575 block_info.block_count,
1576 partition_info.block_range.as_ref().unwrap().end
1577 - partition_info.block_range.as_ref().unwrap().start
1578 );
1579 assert_eq!(
1580 block_info.flags,
1581 fblock::DeviceFlag::READONLY
1582 | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1583 | fblock::DeviceFlag::BARRIER_SUPPORT
1584 | fblock::DeviceFlag::FUA_SUPPORT
1585 );
1586
1587 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1588
1589 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1590 assert_eq!(status, zx::sys::ZX_OK);
1591 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1592
1593 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1594 assert_eq!(status, zx::sys::ZX_OK);
1595 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1596
1597 let (status, name) = proxy.get_name().await.unwrap();
1598 assert_eq!(status, zx::sys::ZX_OK);
1599 assert_eq!(name.as_ref(), Some(&partition_info.name));
1600
1601 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1602 assert_eq!(metadata.name, name);
1603 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1604 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1605 assert_eq!(
1606 metadata.start_block_offset,
1607 Some(partition_info.block_range.as_ref().unwrap().start)
1608 );
1609 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1610 assert_eq!(metadata.flags, Some(partition_info.flags));
1611
1612 std::mem::drop(proxy);
1613 }
1614 );
1615 }
1616
1617 #[fuchsia::test]
1618 async fn test_attach_vmo() {
1619 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1620
1621 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1622 let koid = vmo.koid().unwrap();
1623
1624 futures::join!(
1625 async {
1626 let block_server = BlockServer::new(
1627 BLOCK_SIZE,
1628 Arc::new(MockInterface {
1629 read_hook: Some(Box::new(move |_, _, vmo, _| {
1630 assert_eq!(vmo.koid().unwrap(), koid);
1631 Box::pin(async { Ok(()) })
1632 })),
1633 ..MockInterface::default()
1634 }),
1635 );
1636 block_server.handle_requests(stream).await.unwrap();
1637 },
1638 async move {
1639 let (session_proxy, server) = fidl::endpoints::create_proxy();
1640
1641 proxy.open_session(server).unwrap();
1642
1643 let vmo_id = session_proxy
1644 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1645 .await
1646 .unwrap()
1647 .unwrap();
1648 assert_ne!(vmo_id.id, 0);
1649
1650 let mut fifo =
1651 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1652 let (mut reader, mut writer) = fifo.async_io();
1653
1654 let mut count = 1;
1656 loop {
1657 match session_proxy
1658 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1659 .await
1660 .unwrap()
1661 {
1662 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1663 Err(e) => {
1664 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1665 break;
1666 }
1667 }
1668
1669 if count % 10 == 0 {
1671 writer
1672 .write_entries(&BlockFifoRequest {
1673 command: BlockFifoCommand {
1674 opcode: BlockOpcode::Read.into_primitive(),
1675 ..Default::default()
1676 },
1677 vmoid: vmo_id.id,
1678 length: 1,
1679 ..Default::default()
1680 })
1681 .await
1682 .unwrap();
1683
1684 let mut response = BlockFifoResponse::default();
1685 reader.read_entries(&mut response).await.unwrap();
1686 assert_eq!(response.status, zx::sys::ZX_OK);
1687 }
1688
1689 count += 1;
1690 }
1691
1692 assert_eq!(count, u16::MAX as u64);
1693
1694 writer
1696 .write_entries(&BlockFifoRequest {
1697 command: BlockFifoCommand {
1698 opcode: BlockOpcode::CloseVmo.into_primitive(),
1699 ..Default::default()
1700 },
1701 vmoid: vmo_id.id,
1702 ..Default::default()
1703 })
1704 .await
1705 .unwrap();
1706
1707 let mut response = BlockFifoResponse::default();
1708 reader.read_entries(&mut response).await.unwrap();
1709 assert_eq!(response.status, zx::sys::ZX_OK);
1710
1711 let new_vmo_id = session_proxy
1712 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1713 .await
1714 .unwrap()
1715 .unwrap();
1716 assert_eq!(new_vmo_id.id, vmo_id.id);
1718
1719 std::mem::drop(proxy);
1720 }
1721 );
1722 }
1723
1724 #[fuchsia::test]
1725 async fn test_close() {
1726 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1727
1728 let mut server = std::pin::pin!(
1729 async {
1730 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1731 block_server.handle_requests(stream).await.unwrap();
1732 }
1733 .fuse()
1734 );
1735
1736 let mut client = std::pin::pin!(
1737 async {
1738 let (session_proxy, server) = fidl::endpoints::create_proxy();
1739
1740 proxy.open_session(server).unwrap();
1741
1742 std::mem::drop(proxy);
1745
1746 session_proxy.close().await.unwrap().unwrap();
1747
1748 let _: () = std::future::pending().await;
1750 }
1751 .fuse()
1752 );
1753
1754 futures::select!(
1755 _ = server => {}
1756 _ = client => unreachable!(),
1757 );
1758 }
1759
1760 #[derive(Default)]
1761 struct IoMockInterface {
1762 do_checks: bool,
1763 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1764 return_errors: bool,
1765 }
1766
1767 #[derive(Debug)]
1768 enum ExpectedOp {
1769 Read(u64, u32, u64),
1770 Write(u64, u32, u64),
1771 Trim(u64, u32),
1772 Flush,
1773 }
1774
1775 impl super::async_interface::Interface for IoMockInterface {
1776 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1777 Ok(())
1778 }
1779
1780 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1781 Cow::Owned(test_device_info())
1782 }
1783
1784 async fn read(
1785 &self,
1786 device_block_offset: u64,
1787 block_count: u32,
1788 _vmo: &Arc<zx::Vmo>,
1789 vmo_offset: u64,
1790 _opts: ReadOptions,
1791 _trace_flow_id: TraceFlowId,
1792 ) -> Result<(), zx::Status> {
1793 if self.return_errors {
1794 Err(zx::Status::INTERNAL)
1795 } else {
1796 if self.do_checks {
1797 assert_matches!(
1798 self.expected_op.lock().take(),
1799 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1800 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1801 "Read {device_block_offset} {block_count} {vmo_offset}"
1802 );
1803 }
1804 Ok(())
1805 }
1806 }
1807
1808 async fn write(
1809 &self,
1810 device_block_offset: u64,
1811 block_count: u32,
1812 _vmo: &Arc<zx::Vmo>,
1813 vmo_offset: u64,
1814 _write_opts: WriteOptions,
1815 _trace_flow_id: TraceFlowId,
1816 ) -> Result<(), zx::Status> {
1817 if self.return_errors {
1818 Err(zx::Status::NOT_SUPPORTED)
1819 } else {
1820 if self.do_checks {
1821 assert_matches!(
1822 self.expected_op.lock().take(),
1823 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1824 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1825 "Write {device_block_offset} {block_count} {vmo_offset}"
1826 );
1827 }
1828 Ok(())
1829 }
1830 }
1831
1832 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1833 if self.return_errors {
1834 Err(zx::Status::NO_RESOURCES)
1835 } else {
1836 if self.do_checks {
1837 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1838 }
1839 Ok(())
1840 }
1841 }
1842
1843 async fn trim(
1844 &self,
1845 device_block_offset: u64,
1846 block_count: u32,
1847 _trace_flow_id: TraceFlowId,
1848 ) -> Result<(), zx::Status> {
1849 if self.return_errors {
1850 Err(zx::Status::NO_MEMORY)
1851 } else {
1852 if self.do_checks {
1853 assert_matches!(
1854 self.expected_op.lock().take(),
1855 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1856 block_count == b,
1857 "Trim {device_block_offset} {block_count}"
1858 );
1859 }
1860 Ok(())
1861 }
1862 }
1863 }
1864
1865 #[fuchsia::test]
1866 async fn test_io() {
1867 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1868
1869 let expected_op = Arc::new(Mutex::new(None));
1870 let expected_op_clone = expected_op.clone();
1871
1872 let server = async {
1873 let block_server = BlockServer::new(
1874 BLOCK_SIZE,
1875 Arc::new(IoMockInterface {
1876 return_errors: false,
1877 do_checks: true,
1878 expected_op: expected_op_clone,
1879 }),
1880 );
1881 block_server.handle_requests(stream).await.unwrap();
1882 };
1883
1884 let client = async move {
1885 let (session_proxy, server) = fidl::endpoints::create_proxy();
1886
1887 proxy.open_session(server).unwrap();
1888
1889 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1890 let vmo_id = session_proxy
1891 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1892 .await
1893 .unwrap()
1894 .unwrap();
1895
1896 let mut fifo =
1897 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1898 let (mut reader, mut writer) = fifo.async_io();
1899
1900 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1902 writer
1903 .write_entries(&BlockFifoRequest {
1904 command: BlockFifoCommand {
1905 opcode: BlockOpcode::Read.into_primitive(),
1906 ..Default::default()
1907 },
1908 vmoid: vmo_id.id,
1909 dev_offset: 1,
1910 length: 2,
1911 vmo_offset: 3,
1912 ..Default::default()
1913 })
1914 .await
1915 .unwrap();
1916
1917 let mut response = BlockFifoResponse::default();
1918 reader.read_entries(&mut response).await.unwrap();
1919 assert_eq!(response.status, zx::sys::ZX_OK);
1920
1921 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1923 writer
1924 .write_entries(&BlockFifoRequest {
1925 command: BlockFifoCommand {
1926 opcode: BlockOpcode::Write.into_primitive(),
1927 ..Default::default()
1928 },
1929 vmoid: vmo_id.id,
1930 dev_offset: 4,
1931 length: 5,
1932 vmo_offset: 6,
1933 ..Default::default()
1934 })
1935 .await
1936 .unwrap();
1937
1938 let mut response = BlockFifoResponse::default();
1939 reader.read_entries(&mut response).await.unwrap();
1940 assert_eq!(response.status, zx::sys::ZX_OK);
1941
1942 *expected_op.lock() = Some(ExpectedOp::Flush);
1944 writer
1945 .write_entries(&BlockFifoRequest {
1946 command: BlockFifoCommand {
1947 opcode: BlockOpcode::Flush.into_primitive(),
1948 ..Default::default()
1949 },
1950 ..Default::default()
1951 })
1952 .await
1953 .unwrap();
1954
1955 reader.read_entries(&mut response).await.unwrap();
1956 assert_eq!(response.status, zx::sys::ZX_OK);
1957
1958 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1960 writer
1961 .write_entries(&BlockFifoRequest {
1962 command: BlockFifoCommand {
1963 opcode: BlockOpcode::Trim.into_primitive(),
1964 ..Default::default()
1965 },
1966 dev_offset: 7,
1967 length: 8,
1968 ..Default::default()
1969 })
1970 .await
1971 .unwrap();
1972
1973 reader.read_entries(&mut response).await.unwrap();
1974 assert_eq!(response.status, zx::sys::ZX_OK);
1975
1976 std::mem::drop(proxy);
1977 };
1978
1979 futures::join!(server, client);
1980 }
1981
1982 #[fuchsia::test]
1983 async fn test_io_errors() {
1984 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1985
1986 futures::join!(
1987 async {
1988 let block_server = BlockServer::new(
1989 BLOCK_SIZE,
1990 Arc::new(IoMockInterface {
1991 return_errors: true,
1992 do_checks: false,
1993 expected_op: Arc::new(Mutex::new(None)),
1994 }),
1995 );
1996 block_server.handle_requests(stream).await.unwrap();
1997 },
1998 async move {
1999 let (session_proxy, server) = fidl::endpoints::create_proxy();
2000
2001 proxy.open_session(server).unwrap();
2002
2003 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2004 let vmo_id = session_proxy
2005 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2006 .await
2007 .unwrap()
2008 .unwrap();
2009
2010 let mut fifo =
2011 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2012 let (mut reader, mut writer) = fifo.async_io();
2013
2014 writer
2016 .write_entries(&BlockFifoRequest {
2017 command: BlockFifoCommand {
2018 opcode: BlockOpcode::Read.into_primitive(),
2019 ..Default::default()
2020 },
2021 vmoid: vmo_id.id,
2022 length: 1,
2023 reqid: 1,
2024 ..Default::default()
2025 })
2026 .await
2027 .unwrap();
2028
2029 let mut response = BlockFifoResponse::default();
2030 reader.read_entries(&mut response).await.unwrap();
2031 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2032
2033 writer
2035 .write_entries(&BlockFifoRequest {
2036 command: BlockFifoCommand {
2037 opcode: BlockOpcode::Write.into_primitive(),
2038 ..Default::default()
2039 },
2040 vmoid: vmo_id.id,
2041 length: 1,
2042 reqid: 2,
2043 ..Default::default()
2044 })
2045 .await
2046 .unwrap();
2047
2048 reader.read_entries(&mut response).await.unwrap();
2049 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2050
2051 writer
2053 .write_entries(&BlockFifoRequest {
2054 command: BlockFifoCommand {
2055 opcode: BlockOpcode::Flush.into_primitive(),
2056 ..Default::default()
2057 },
2058 reqid: 3,
2059 ..Default::default()
2060 })
2061 .await
2062 .unwrap();
2063
2064 reader.read_entries(&mut response).await.unwrap();
2065 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2066
2067 writer
2069 .write_entries(&BlockFifoRequest {
2070 command: BlockFifoCommand {
2071 opcode: BlockOpcode::Trim.into_primitive(),
2072 ..Default::default()
2073 },
2074 reqid: 4,
2075 length: 1,
2076 ..Default::default()
2077 })
2078 .await
2079 .unwrap();
2080
2081 reader.read_entries(&mut response).await.unwrap();
2082 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2083
2084 std::mem::drop(proxy);
2085 }
2086 );
2087 }
2088
2089 #[fuchsia::test]
2090 async fn test_invalid_args() {
2091 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2092
2093 futures::join!(
2094 async {
2095 let block_server = BlockServer::new(
2096 BLOCK_SIZE,
2097 Arc::new(IoMockInterface {
2098 return_errors: false,
2099 do_checks: false,
2100 expected_op: Arc::new(Mutex::new(None)),
2101 }),
2102 );
2103 block_server.handle_requests(stream).await.unwrap();
2104 },
2105 async move {
2106 let (session_proxy, server) = fidl::endpoints::create_proxy();
2107
2108 proxy.open_session(server).unwrap();
2109
2110 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2111 let vmo_id = session_proxy
2112 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2113 .await
2114 .unwrap()
2115 .unwrap();
2116
2117 let mut fifo =
2118 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2119
2120 async fn test(
2121 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2122 request: BlockFifoRequest,
2123 ) -> Result<(), zx::Status> {
2124 let (mut reader, mut writer) = fifo.async_io();
2125 writer.write_entries(&request).await.unwrap();
2126 let mut response = BlockFifoResponse::default();
2127 reader.read_entries(&mut response).await.unwrap();
2128 zx::Status::ok(response.status)
2129 }
2130
2131 let good_read_request = || BlockFifoRequest {
2134 command: BlockFifoCommand {
2135 opcode: BlockOpcode::Read.into_primitive(),
2136 ..Default::default()
2137 },
2138 length: 1,
2139 vmoid: vmo_id.id,
2140 ..Default::default()
2141 };
2142
2143 assert_eq!(
2144 test(
2145 &mut fifo,
2146 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2147 )
2148 .await,
2149 Err(zx::Status::IO)
2150 );
2151
2152 assert_eq!(
2153 test(
2154 &mut fifo,
2155 BlockFifoRequest {
2156 vmo_offset: 0xffff_ffff_ffff_ffff,
2157 ..good_read_request()
2158 }
2159 )
2160 .await,
2161 Err(zx::Status::OUT_OF_RANGE)
2162 );
2163
2164 assert_eq!(
2165 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2166 Err(zx::Status::INVALID_ARGS)
2167 );
2168
2169 let good_write_request = || BlockFifoRequest {
2172 command: BlockFifoCommand {
2173 opcode: BlockOpcode::Write.into_primitive(),
2174 ..Default::default()
2175 },
2176 length: 1,
2177 vmoid: vmo_id.id,
2178 ..Default::default()
2179 };
2180
2181 assert_eq!(
2182 test(
2183 &mut fifo,
2184 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2185 )
2186 .await,
2187 Err(zx::Status::IO)
2188 );
2189
2190 assert_eq!(
2191 test(
2192 &mut fifo,
2193 BlockFifoRequest {
2194 vmo_offset: 0xffff_ffff_ffff_ffff,
2195 ..good_write_request()
2196 }
2197 )
2198 .await,
2199 Err(zx::Status::OUT_OF_RANGE)
2200 );
2201
2202 assert_eq!(
2203 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2204 Err(zx::Status::INVALID_ARGS)
2205 );
2206
2207 assert_eq!(
2210 test(
2211 &mut fifo,
2212 BlockFifoRequest {
2213 command: BlockFifoCommand {
2214 opcode: BlockOpcode::CloseVmo.into_primitive(),
2215 ..Default::default()
2216 },
2217 vmoid: vmo_id.id + 1,
2218 ..Default::default()
2219 }
2220 )
2221 .await,
2222 Err(zx::Status::IO)
2223 );
2224
2225 std::mem::drop(proxy);
2226 }
2227 );
2228 }
2229
2230 #[fuchsia::test]
2231 async fn test_concurrent_requests() {
2232 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2233
2234 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2235 let waiting_readers_clone = waiting_readers.clone();
2236
2237 futures::join!(
2238 async move {
2239 let block_server = BlockServer::new(
2240 BLOCK_SIZE,
2241 Arc::new(MockInterface {
2242 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2243 let (tx, rx) = oneshot::channel();
2244 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2245 Box::pin(async move {
2246 let _ = rx.await;
2247 Ok(())
2248 })
2249 })),
2250 ..MockInterface::default()
2251 }),
2252 );
2253 block_server.handle_requests(stream).await.unwrap();
2254 },
2255 async move {
2256 let (session_proxy, server) = fidl::endpoints::create_proxy();
2257
2258 proxy.open_session(server).unwrap();
2259
2260 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2261 let vmo_id = session_proxy
2262 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2263 .await
2264 .unwrap()
2265 .unwrap();
2266
2267 let mut fifo =
2268 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2269 let (mut reader, mut writer) = fifo.async_io();
2270
2271 writer
2272 .write_entries(&BlockFifoRequest {
2273 command: BlockFifoCommand {
2274 opcode: BlockOpcode::Read.into_primitive(),
2275 ..Default::default()
2276 },
2277 reqid: 1,
2278 dev_offset: 1, vmoid: vmo_id.id,
2280 length: 1,
2281 ..Default::default()
2282 })
2283 .await
2284 .unwrap();
2285
2286 writer
2287 .write_entries(&BlockFifoRequest {
2288 command: BlockFifoCommand {
2289 opcode: BlockOpcode::Read.into_primitive(),
2290 ..Default::default()
2291 },
2292 reqid: 2,
2293 dev_offset: 2,
2294 vmoid: vmo_id.id,
2295 length: 1,
2296 ..Default::default()
2297 })
2298 .await
2299 .unwrap();
2300
2301 poll_fn(|cx: &mut Context<'_>| {
2303 if waiting_readers.lock().len() == 2 {
2304 Poll::Ready(())
2305 } else {
2306 cx.waker().wake_by_ref();
2308 Poll::Pending
2309 }
2310 })
2311 .await;
2312
2313 let mut response = BlockFifoResponse::default();
2314 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2315
2316 let (id, tx) = waiting_readers.lock().pop().unwrap();
2317 tx.send(()).unwrap();
2318
2319 reader.read_entries(&mut response).await.unwrap();
2320 assert_eq!(response.status, zx::sys::ZX_OK);
2321 assert_eq!(response.reqid, id);
2322
2323 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2324
2325 let (id, tx) = waiting_readers.lock().pop().unwrap();
2326 tx.send(()).unwrap();
2327
2328 reader.read_entries(&mut response).await.unwrap();
2329 assert_eq!(response.status, zx::sys::ZX_OK);
2330 assert_eq!(response.reqid, id);
2331 }
2332 );
2333 }
2334
2335 #[fuchsia::test]
2336 async fn test_session_close_is_synchronous() {
2337 use futures::{FutureExt as _, StreamExt as _};
2338
2339 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2340
2341 let (start_tx, mut start_rx) = futures::channel::mpsc::channel(1);
2342 let (finish_tx, finish_rx) = futures::channel::oneshot::channel();
2343 let finish_rx = Arc::new(Mutex::new(Some(finish_rx)));
2344
2345 futures::join!(
2346 async move {
2347 let block_server = BlockServer::new(
2348 BLOCK_SIZE,
2349 Arc::new(MockInterface {
2350 read_hook: Some(Box::new(move |_, _, _, _| {
2351 let mut start_tx = start_tx.clone();
2352 let finish_rx = finish_rx.lock().take().unwrap();
2353 Box::pin(async move {
2354 start_tx.try_send(()).unwrap();
2355 let _ = finish_rx.await;
2356 Ok(())
2357 })
2358 })),
2359 ..MockInterface::default()
2360 }),
2361 );
2362 block_server.handle_requests(stream).await.unwrap();
2363 },
2364 async move {
2365 let (session_proxy, server) = fidl::endpoints::create_proxy();
2366 proxy.open_session(server).unwrap();
2367
2368 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2369 let vmo_id = session_proxy
2370 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2371 .await
2372 .unwrap()
2373 .unwrap();
2374
2375 let mut fifo = fasync::Fifo::<BlockFifoResponse, BlockFifoRequest>::from_fifo(
2376 session_proxy.get_fifo().await.unwrap().unwrap(),
2377 );
2378 let (_reader, mut writer) = fifo.async_io();
2379
2380 writer
2381 .write_entries(&BlockFifoRequest {
2382 command: BlockFifoCommand {
2383 opcode: BlockOpcode::Read.into_primitive(),
2384 ..Default::default()
2385 },
2386 reqid: 1,
2387 vmoid: vmo_id.id,
2388 length: 1,
2389 ..Default::default()
2390 })
2391 .await
2392 .unwrap();
2393
2394 start_rx.next().await.unwrap();
2396
2397 let mut close_fut = std::pin::pin!(session_proxy.close().fuse());
2399 let mut timer_fut = std::pin::pin!(
2400 fasync::Timer::new(std::time::Duration::from_millis(100)).fuse()
2401 );
2402 futures::select! {
2403 res = close_fut => panic!("close completed too early: {:?}", res),
2404 _ = timer_fut => {}
2405 }
2406
2407 finish_tx.send(()).unwrap();
2409
2410 close_fut.await.unwrap().unwrap();
2412
2413 std::mem::drop(proxy);
2414 }
2415 );
2416 }
2417
2418 #[fuchsia::test]
2419 async fn test_groups() {
2420 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2421
2422 futures::join!(
2423 async move {
2424 let block_server = BlockServer::new(
2425 BLOCK_SIZE,
2426 Arc::new(MockInterface {
2427 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2428 ..MockInterface::default()
2429 }),
2430 );
2431 block_server.handle_requests(stream).await.unwrap();
2432 },
2433 async move {
2434 let (session_proxy, server) = fidl::endpoints::create_proxy();
2435
2436 proxy.open_session(server).unwrap();
2437
2438 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2439 let vmo_id = session_proxy
2440 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2441 .await
2442 .unwrap()
2443 .unwrap();
2444
2445 let mut fifo =
2446 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2447 let (mut reader, mut writer) = fifo.async_io();
2448
2449 writer
2450 .write_entries(&BlockFifoRequest {
2451 command: BlockFifoCommand {
2452 opcode: BlockOpcode::Read.into_primitive(),
2453 flags: BlockIoFlag::GROUP_ITEM.bits(),
2454 ..Default::default()
2455 },
2456 group: 1,
2457 vmoid: vmo_id.id,
2458 length: 1,
2459 ..Default::default()
2460 })
2461 .await
2462 .unwrap();
2463
2464 writer
2465 .write_entries(&BlockFifoRequest {
2466 command: BlockFifoCommand {
2467 opcode: BlockOpcode::Read.into_primitive(),
2468 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2469 ..Default::default()
2470 },
2471 reqid: 2,
2472 group: 1,
2473 vmoid: vmo_id.id,
2474 length: 1,
2475 ..Default::default()
2476 })
2477 .await
2478 .unwrap();
2479
2480 let mut response = BlockFifoResponse::default();
2481 reader.read_entries(&mut response).await.unwrap();
2482 assert_eq!(response.status, zx::sys::ZX_OK);
2483 assert_eq!(response.reqid, 2);
2484 assert_eq!(response.group, 1);
2485 }
2486 );
2487 }
2488
2489 #[fuchsia::test]
2490 async fn test_group_error() {
2491 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2492
2493 let counter = Arc::new(AtomicU64::new(0));
2494 let counter_clone = counter.clone();
2495
2496 futures::join!(
2497 async move {
2498 let block_server = BlockServer::new(
2499 BLOCK_SIZE,
2500 Arc::new(MockInterface {
2501 read_hook: Some(Box::new(move |_, _, _, _| {
2502 counter_clone.fetch_add(1, Ordering::Relaxed);
2503 Box::pin(async { Err(zx::Status::BAD_STATE) })
2504 })),
2505 ..MockInterface::default()
2506 }),
2507 );
2508 block_server.handle_requests(stream).await.unwrap();
2509 },
2510 async move {
2511 let (session_proxy, server) = fidl::endpoints::create_proxy();
2512
2513 proxy.open_session(server).unwrap();
2514
2515 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2516 let vmo_id = session_proxy
2517 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2518 .await
2519 .unwrap()
2520 .unwrap();
2521
2522 let mut fifo =
2523 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2524 let (mut reader, mut writer) = fifo.async_io();
2525
2526 writer
2527 .write_entries(&BlockFifoRequest {
2528 command: BlockFifoCommand {
2529 opcode: BlockOpcode::Read.into_primitive(),
2530 flags: BlockIoFlag::GROUP_ITEM.bits(),
2531 ..Default::default()
2532 },
2533 group: 1,
2534 vmoid: vmo_id.id,
2535 length: 1,
2536 ..Default::default()
2537 })
2538 .await
2539 .unwrap();
2540
2541 poll_fn(|cx: &mut Context<'_>| {
2543 if counter.load(Ordering::Relaxed) == 1 {
2544 Poll::Ready(())
2545 } else {
2546 cx.waker().wake_by_ref();
2548 Poll::Pending
2549 }
2550 })
2551 .await;
2552
2553 let mut response = BlockFifoResponse::default();
2554 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2555
2556 writer
2557 .write_entries(&BlockFifoRequest {
2558 command: BlockFifoCommand {
2559 opcode: BlockOpcode::Read.into_primitive(),
2560 flags: BlockIoFlag::GROUP_ITEM.bits(),
2561 ..Default::default()
2562 },
2563 group: 1,
2564 vmoid: vmo_id.id,
2565 length: 1,
2566 ..Default::default()
2567 })
2568 .await
2569 .unwrap();
2570
2571 writer
2572 .write_entries(&BlockFifoRequest {
2573 command: BlockFifoCommand {
2574 opcode: BlockOpcode::Read.into_primitive(),
2575 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2576 ..Default::default()
2577 },
2578 reqid: 2,
2579 group: 1,
2580 vmoid: vmo_id.id,
2581 length: 1,
2582 ..Default::default()
2583 })
2584 .await
2585 .unwrap();
2586
2587 reader.read_entries(&mut response).await.unwrap();
2588 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2589 assert_eq!(response.reqid, 2);
2590 assert_eq!(response.group, 1);
2591
2592 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2593
2594 assert_eq!(counter.load(Ordering::Relaxed), 1);
2596 }
2597 );
2598 }
2599
2600 #[fuchsia::test]
2601 async fn test_group_with_two_lasts() {
2602 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2603
2604 let (tx, rx) = oneshot::channel();
2605
2606 futures::join!(
2607 async move {
2608 let rx = Mutex::new(Some(rx));
2609 let block_server = BlockServer::new(
2610 BLOCK_SIZE,
2611 Arc::new(MockInterface {
2612 read_hook: Some(Box::new(move |_, _, _, _| {
2613 let rx = rx.lock().take().unwrap();
2614 Box::pin(async {
2615 let _ = rx.await;
2616 Ok(())
2617 })
2618 })),
2619 ..MockInterface::default()
2620 }),
2621 );
2622 block_server.handle_requests(stream).await.unwrap();
2623 },
2624 async move {
2625 let (session_proxy, server) = fidl::endpoints::create_proxy();
2626
2627 proxy.open_session(server).unwrap();
2628
2629 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2630 let vmo_id = session_proxy
2631 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2632 .await
2633 .unwrap()
2634 .unwrap();
2635
2636 let mut fifo =
2637 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2638 let (mut reader, mut writer) = fifo.async_io();
2639
2640 writer
2641 .write_entries(&BlockFifoRequest {
2642 command: BlockFifoCommand {
2643 opcode: BlockOpcode::Read.into_primitive(),
2644 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2645 ..Default::default()
2646 },
2647 reqid: 1,
2648 group: 1,
2649 vmoid: vmo_id.id,
2650 length: 1,
2651 ..Default::default()
2652 })
2653 .await
2654 .unwrap();
2655
2656 writer
2657 .write_entries(&BlockFifoRequest {
2658 command: BlockFifoCommand {
2659 opcode: BlockOpcode::Read.into_primitive(),
2660 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2661 ..Default::default()
2662 },
2663 reqid: 2,
2664 group: 1,
2665 vmoid: vmo_id.id,
2666 length: 1,
2667 ..Default::default()
2668 })
2669 .await
2670 .unwrap();
2671
2672 writer
2674 .write_entries(&BlockFifoRequest {
2675 command: BlockFifoCommand {
2676 opcode: BlockOpcode::CloseVmo.into_primitive(),
2677 ..Default::default()
2678 },
2679 reqid: 3,
2680 vmoid: vmo_id.id,
2681 ..Default::default()
2682 })
2683 .await
2684 .unwrap();
2685
2686 let mut response = BlockFifoResponse::default();
2688 reader.read_entries(&mut response).await.unwrap();
2689 assert_eq!(response.status, zx::sys::ZX_OK);
2690 assert_eq!(response.reqid, 3);
2691
2692 tx.send(()).unwrap();
2694
2695 let mut response = BlockFifoResponse::default();
2698 reader.read_entries(&mut response).await.unwrap();
2699 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2700 assert_eq!(response.reqid, 1);
2701 assert_eq!(response.group, 1);
2702 }
2703 );
2704 }
2705
2706 #[fuchsia::test(allow_stalls = false)]
2707 async fn test_requests_dont_block_sessions() {
2708 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2709
2710 let (tx, rx) = oneshot::channel();
2711
2712 fasync::Task::local(async move {
2713 let rx = Mutex::new(Some(rx));
2714 let block_server = BlockServer::new(
2715 BLOCK_SIZE,
2716 Arc::new(MockInterface {
2717 read_hook: Some(Box::new(move |_, _, _, _| {
2718 let rx = rx.lock().take().unwrap();
2719 Box::pin(async {
2720 let _ = rx.await;
2721 Ok(())
2722 })
2723 })),
2724 ..MockInterface::default()
2725 }),
2726 );
2727 block_server.handle_requests(stream).await.unwrap();
2728 })
2729 .detach();
2730
2731 let mut fut = pin!(async {
2732 let (session_proxy, server) = fidl::endpoints::create_proxy();
2733
2734 proxy.open_session(server).unwrap();
2735
2736 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2737 let vmo_id = session_proxy
2738 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2739 .await
2740 .unwrap()
2741 .unwrap();
2742
2743 let mut fifo =
2744 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2745 let (mut reader, mut writer) = fifo.async_io();
2746
2747 writer
2748 .write_entries(&BlockFifoRequest {
2749 command: BlockFifoCommand {
2750 opcode: BlockOpcode::Read.into_primitive(),
2751 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2752 ..Default::default()
2753 },
2754 reqid: 1,
2755 group: 1,
2756 vmoid: vmo_id.id,
2757 length: 1,
2758 ..Default::default()
2759 })
2760 .await
2761 .unwrap();
2762
2763 let mut response = BlockFifoResponse::default();
2764 reader.read_entries(&mut response).await.unwrap();
2765 assert_eq!(response.status, zx::sys::ZX_OK);
2766 });
2767
2768 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2770
2771 let mut fut2 = pin!(proxy.get_volume_info());
2772
2773 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2775
2776 let _ = tx.send(());
2779
2780 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2781 }
2782
2783 #[fuchsia::test]
2784 async fn test_request_flow_control() {
2785 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2786
2787 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2790 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2791 let event_clone = event.clone();
2792 futures::join!(
2793 async move {
2794 let block_server = BlockServer::new(
2795 BLOCK_SIZE,
2796 Arc::new(MockInterface {
2797 read_hook: Some(Box::new(move |_, _, _, _| {
2798 let event_clone = event_clone.clone();
2799 Box::pin(async move {
2800 if !event_clone.1.load(Ordering::SeqCst) {
2801 event_clone.0.listen().await;
2802 }
2803 Ok(())
2804 })
2805 })),
2806 ..MockInterface::default()
2807 }),
2808 );
2809 block_server.handle_requests(stream).await.unwrap();
2810 },
2811 async move {
2812 let (session_proxy, server) = fidl::endpoints::create_proxy();
2813
2814 proxy.open_session(server).unwrap();
2815
2816 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2817 let vmo_id = session_proxy
2818 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2819 .await
2820 .unwrap()
2821 .unwrap();
2822
2823 let mut fifo =
2824 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2825 let (mut reader, mut writer) = fifo.async_io();
2826
2827 for i in 0..MAX_REQUESTS {
2828 writer
2829 .write_entries(&BlockFifoRequest {
2830 command: BlockFifoCommand {
2831 opcode: BlockOpcode::Read.into_primitive(),
2832 ..Default::default()
2833 },
2834 reqid: (i + 1) as u32,
2835 dev_offset: i,
2836 vmoid: vmo_id.id,
2837 length: 1,
2838 ..Default::default()
2839 })
2840 .await
2841 .unwrap();
2842 }
2843 assert!(
2844 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2845 command: BlockFifoCommand {
2846 opcode: BlockOpcode::Read.into_primitive(),
2847 ..Default::default()
2848 },
2849 reqid: u32::MAX,
2850 dev_offset: MAX_REQUESTS,
2851 vmoid: vmo_id.id,
2852 length: 1,
2853 ..Default::default()
2854 })))
2855 .is_pending()
2856 );
2857 event.1.store(true, Ordering::SeqCst);
2859 event.0.notify(usize::MAX);
2860 let mut finished_reqids = vec![];
2862 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2863 let mut response = BlockFifoResponse::default();
2864 reader.read_entries(&mut response).await.unwrap();
2865 assert_eq!(response.status, zx::sys::ZX_OK);
2866 finished_reqids.push(response.reqid);
2867 writer
2868 .write_entries(&BlockFifoRequest {
2869 command: BlockFifoCommand {
2870 opcode: BlockOpcode::Read.into_primitive(),
2871 ..Default::default()
2872 },
2873 reqid: (i + 1) as u32,
2874 dev_offset: i,
2875 vmoid: vmo_id.id,
2876 length: 1,
2877 ..Default::default()
2878 })
2879 .await
2880 .unwrap();
2881 }
2882 let mut response = BlockFifoResponse::default();
2883 for _ in 0..MAX_REQUESTS {
2884 reader.read_entries(&mut response).await.unwrap();
2885 assert_eq!(response.status, zx::sys::ZX_OK);
2886 finished_reqids.push(response.reqid);
2887 }
2888 finished_reqids.sort();
2891 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2892 let mut i = 1;
2893 for reqid in finished_reqids {
2894 assert_eq!(reqid, i);
2895 i += 1;
2896 }
2897 }
2898 );
2899 }
2900
2901 #[fuchsia::test]
2902 async fn test_passthrough_io_with_fixed_map() {
2903 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2904
2905 let expected_op = Arc::new(Mutex::new(None));
2906 let expected_op_clone = expected_op.clone();
2907 futures::join!(
2908 async {
2909 let block_server = BlockServer::new(
2910 BLOCK_SIZE,
2911 Arc::new(IoMockInterface {
2912 return_errors: false,
2913 do_checks: true,
2914 expected_op: expected_op_clone,
2915 }),
2916 );
2917 block_server.handle_requests(stream).await.unwrap();
2918 },
2919 async move {
2920 let (session_proxy, server) = fidl::endpoints::create_proxy();
2921
2922 let mapping = fblock::BlockOffsetMapping {
2923 source_block_offset: 0,
2924 target_block_offset: 10,
2925 length: 20,
2926 };
2927 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2928
2929 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2930 let vmo_id = session_proxy
2931 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2932 .await
2933 .unwrap()
2934 .unwrap();
2935
2936 let mut fifo =
2937 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2938 let (mut reader, mut writer) = fifo.async_io();
2939
2940 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2942 writer
2943 .write_entries(&BlockFifoRequest {
2944 command: BlockFifoCommand {
2945 opcode: BlockOpcode::Read.into_primitive(),
2946 ..Default::default()
2947 },
2948 vmoid: vmo_id.id,
2949 dev_offset: 1,
2950 length: 2,
2951 vmo_offset: 3,
2952 ..Default::default()
2953 })
2954 .await
2955 .unwrap();
2956
2957 let mut response = BlockFifoResponse::default();
2958 reader.read_entries(&mut response).await.unwrap();
2959 assert_eq!(response.status, zx::sys::ZX_OK);
2960
2961 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2963 writer
2964 .write_entries(&BlockFifoRequest {
2965 command: BlockFifoCommand {
2966 opcode: BlockOpcode::Write.into_primitive(),
2967 ..Default::default()
2968 },
2969 vmoid: vmo_id.id,
2970 dev_offset: 4,
2971 length: 5,
2972 vmo_offset: 6,
2973 ..Default::default()
2974 })
2975 .await
2976 .unwrap();
2977
2978 reader.read_entries(&mut response).await.unwrap();
2979 assert_eq!(response.status, zx::sys::ZX_OK);
2980
2981 *expected_op.lock() = Some(ExpectedOp::Flush);
2983 writer
2984 .write_entries(&BlockFifoRequest {
2985 command: BlockFifoCommand {
2986 opcode: BlockOpcode::Flush.into_primitive(),
2987 ..Default::default()
2988 },
2989 ..Default::default()
2990 })
2991 .await
2992 .unwrap();
2993
2994 reader.read_entries(&mut response).await.unwrap();
2995 assert_eq!(response.status, zx::sys::ZX_OK);
2996
2997 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2999 writer
3000 .write_entries(&BlockFifoRequest {
3001 command: BlockFifoCommand {
3002 opcode: BlockOpcode::Trim.into_primitive(),
3003 ..Default::default()
3004 },
3005 dev_offset: 7,
3006 length: 3,
3007 ..Default::default()
3008 })
3009 .await
3010 .unwrap();
3011
3012 reader.read_entries(&mut response).await.unwrap();
3013 assert_eq!(response.status, zx::sys::ZX_OK);
3014
3015 *expected_op.lock() = None;
3017 writer
3018 .write_entries(&BlockFifoRequest {
3019 command: BlockFifoCommand {
3020 opcode: BlockOpcode::Read.into_primitive(),
3021 ..Default::default()
3022 },
3023 vmoid: vmo_id.id,
3024 dev_offset: 19,
3025 length: 2,
3026 vmo_offset: 3,
3027 ..Default::default()
3028 })
3029 .await
3030 .unwrap();
3031
3032 reader.read_entries(&mut response).await.unwrap();
3033 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3034
3035 std::mem::drop(proxy);
3036 }
3037 );
3038 }
3039
3040 #[fuchsia::test]
3041 fn operation_map() {
3042 const BLOCK_SIZE: u32 = 512;
3043
3044 #[track_caller]
3045 fn expect_map_result(
3046 mut operation: Operation,
3047 mapping: Option<BlockOffsetMapping>,
3048 max_blocks: Option<NonZero<u32>>,
3049 expected_operations: Vec<Operation>,
3050 ) {
3051 let mut ops = vec![];
3052 while let Some(remainder) =
3053 operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
3054 {
3055 ops.push(operation);
3056 operation = remainder;
3057 }
3058 ops.push(operation);
3059 assert_eq!(ops, expected_operations);
3060 }
3061
3062 expect_map_result(
3064 Operation::Read {
3065 device_block_offset: 10,
3066 block_count: 200,
3067 _unused: 0,
3068 vmo_offset: 0,
3069 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3070 },
3071 None,
3072 None,
3073 vec![Operation::Read {
3074 device_block_offset: 10,
3075 block_count: 200,
3076 _unused: 0,
3077 vmo_offset: 0,
3078 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3079 }],
3080 );
3081
3082 expect_map_result(
3084 Operation::Read {
3085 device_block_offset: 10,
3086 block_count: 200,
3087 _unused: 0,
3088 vmo_offset: 0,
3089 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3090 },
3091 None,
3092 NonZero::new(120),
3093 vec![
3094 Operation::Read {
3095 device_block_offset: 10,
3096 block_count: 120,
3097 _unused: 0,
3098 vmo_offset: 0,
3099 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3100 },
3101 Operation::Read {
3102 device_block_offset: 130,
3103 block_count: 80,
3104 _unused: 0,
3105 vmo_offset: 120 * BLOCK_SIZE as u64,
3106 options: ReadOptions {
3107 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3109 },
3110 },
3111 ],
3112 );
3113 expect_map_result(
3114 Operation::Trim { device_block_offset: 10, block_count: 200 },
3115 None,
3116 NonZero::new(120),
3117 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3118 );
3119
3120 expect_map_result(
3122 Operation::Read {
3123 device_block_offset: 10,
3124 block_count: 200,
3125 _unused: 0,
3126 vmo_offset: 0,
3127 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3128 },
3129 Some(BlockOffsetMapping {
3130 source_block_offset: 10,
3131 target_block_offset: 100,
3132 length: 200,
3133 }),
3134 NonZero::new(120),
3135 vec![
3136 Operation::Read {
3137 device_block_offset: 100,
3138 block_count: 120,
3139 _unused: 0,
3140 vmo_offset: 0,
3141 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3142 },
3143 Operation::Read {
3144 device_block_offset: 220,
3145 block_count: 80,
3146 _unused: 0,
3147 vmo_offset: 120 * BLOCK_SIZE as u64,
3148 options: ReadOptions {
3149 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3150 },
3151 },
3152 ],
3153 );
3154 expect_map_result(
3155 Operation::Trim { device_block_offset: 10, block_count: 200 },
3156 Some(BlockOffsetMapping {
3157 source_block_offset: 10,
3158 target_block_offset: 100,
3159 length: 200,
3160 }),
3161 NonZero::new(120),
3162 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3163 );
3164 }
3165
3166 #[fuchsia::test]
3168 async fn test_pre_barrier_flush_failure() {
3169 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3170
3171 struct NoBarrierInterface;
3172 impl super::async_interface::Interface for NoBarrierInterface {
3173 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3174 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3175 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3177 block_range: Some(0..100),
3178 type_guid: [0; 16],
3179 instance_guid: [0; 16],
3180 name: "test".to_string(),
3181 flags: 0,
3182 }))
3183 }
3184 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3185 Ok(())
3186 }
3187 async fn read(
3188 &self,
3189 _: u64,
3190 _: u32,
3191 _: &Arc<zx::Vmo>,
3192 _: u64,
3193 _: ReadOptions,
3194 _: TraceFlowId,
3195 ) -> Result<(), zx::Status> {
3196 unreachable!()
3197 }
3198 async fn write(
3199 &self,
3200 _: u64,
3201 _: u32,
3202 _: &Arc<zx::Vmo>,
3203 _: u64,
3204 _: WriteOptions,
3205 _: TraceFlowId,
3206 ) -> Result<(), zx::Status> {
3207 panic!("Write should not be called");
3208 }
3209 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3210 Err(zx::Status::IO)
3211 }
3212 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3213 unreachable!()
3214 }
3215 }
3216
3217 futures::join!(
3218 async move {
3219 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3220 block_server.handle_requests(stream).await.unwrap();
3221 },
3222 async move {
3223 let (session_proxy, server) = fidl::endpoints::create_proxy();
3224 proxy.open_session(server).unwrap();
3225 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3226 let vmo_id = session_proxy
3227 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3228 .await
3229 .unwrap()
3230 .unwrap();
3231
3232 let mut fifo =
3233 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3234 let (mut reader, mut writer) = fifo.async_io();
3235
3236 writer
3237 .write_entries(&BlockFifoRequest {
3238 command: BlockFifoCommand {
3239 opcode: BlockOpcode::Write.into_primitive(),
3240 flags: BlockIoFlag::PRE_BARRIER.bits(),
3241 ..Default::default()
3242 },
3243 vmoid: vmo_id.id,
3244 length: 1,
3245 ..Default::default()
3246 })
3247 .await
3248 .unwrap();
3249
3250 let mut response = BlockFifoResponse::default();
3251 reader.read_entries(&mut response).await.unwrap();
3252 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3253 }
3254 );
3255 }
3256
3257 #[fuchsia::test]
3260 async fn test_post_barrier_write_failure() {
3261 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3262
3263 struct NoBarrierInterface;
3264 impl super::async_interface::Interface for NoBarrierInterface {
3265 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3266 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3267 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3269 block_range: Some(0..100),
3270 type_guid: [0; 16],
3271 instance_guid: [0; 16],
3272 name: "test".to_string(),
3273 flags: 0,
3274 }))
3275 }
3276 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3277 Ok(())
3278 }
3279 async fn read(
3280 &self,
3281 _: u64,
3282 _: u32,
3283 _: &Arc<zx::Vmo>,
3284 _: u64,
3285 _: ReadOptions,
3286 _: TraceFlowId,
3287 ) -> Result<(), zx::Status> {
3288 unreachable!()
3289 }
3290 async fn write(
3291 &self,
3292 _: u64,
3293 _: u32,
3294 _: &Arc<zx::Vmo>,
3295 _: u64,
3296 _: WriteOptions,
3297 _: TraceFlowId,
3298 ) -> Result<(), zx::Status> {
3299 Err(zx::Status::IO)
3300 }
3301 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3302 panic!("Flush should not be called")
3303 }
3304 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3305 unreachable!()
3306 }
3307 }
3308
3309 futures::join!(
3310 async move {
3311 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3312 block_server.handle_requests(stream).await.unwrap();
3313 },
3314 async move {
3315 let (session_proxy, server) = fidl::endpoints::create_proxy();
3316 proxy.open_session(server).unwrap();
3317 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3318 let vmo_id = session_proxy
3319 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3320 .await
3321 .unwrap()
3322 .unwrap();
3323
3324 let mut fifo =
3325 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3326 let (mut reader, mut writer) = fifo.async_io();
3327
3328 writer
3329 .write_entries(&BlockFifoRequest {
3330 command: BlockFifoCommand {
3331 opcode: BlockOpcode::Write.into_primitive(),
3332 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3333 ..Default::default()
3334 },
3335 vmoid: vmo_id.id,
3336 length: 1,
3337 ..Default::default()
3338 })
3339 .await
3340 .unwrap();
3341
3342 let mut response = BlockFifoResponse::default();
3343 reader.read_entries(&mut response).await.unwrap();
3344 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3345 }
3346 );
3347 }
3348}