1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::{Borrow, Cow};
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23pub mod callback_interface;
24
25#[cfg(test)]
26mod decompression_tests;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn label(&self) -> &str {
40 match self {
41 Self::Block(BlockInfo { .. }) => "",
42 Self::Partition(PartitionInfo { name, .. }) => name,
43 }
44 }
45 pub fn device_flags(&self) -> fblock::DeviceFlag {
46 match self {
47 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
48 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
49 }
50 }
51
52 pub fn block_count(&self) -> Option<u64> {
53 match self {
54 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
55 Self::Partition(PartitionInfo { block_range, .. }) => {
56 block_range.as_ref().map(|range| range.end - range.start)
57 }
58 }
59 }
60
61 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
62 match self {
63 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
64 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
65 max_transfer_blocks.clone()
66 }
67 }
68 }
69
70 fn max_transfer_size(&self, block_size: u32) -> u32 {
71 if let Some(max_blocks) = self.max_transfer_blocks() {
72 max_blocks.get() * block_size
73 } else {
74 MAX_TRANSFER_UNBOUNDED
75 }
76 }
77}
78
79#[derive(Clone, Default)]
81pub struct BlockInfo {
82 pub device_flags: fblock::DeviceFlag,
83 pub block_count: u64,
84 pub max_transfer_blocks: Option<NonZero<u32>>,
85}
86
87#[derive(Clone, Default)]
89pub struct PartitionInfo {
90 pub device_flags: fblock::DeviceFlag,
92 pub max_transfer_blocks: Option<NonZero<u32>>,
93 pub block_range: Option<Range<u64>>,
97 pub type_guid: [u8; 16],
98 pub instance_guid: [u8; 16],
99 pub name: String,
100 pub flags: u64,
101}
102
103struct ActiveRequest<S> {
106 session: S,
107 group_or_request: GroupOrRequest,
108 trace_flow_id: TraceFlowId,
109 _epoch_guard: EpochGuard<'static>,
110 status: zx::Status,
111 count: u32,
112 req_id: Option<u32>,
113 decompression_info: Option<DecompressionInfo>,
114}
115
116struct DecompressionInfo {
117 compressed_range: Range<usize>,
119
120 uncompressed_range: Range<u64>,
122
123 bytes_so_far: u64,
124 mapping: Arc<VmoMapping>,
125 buffer: Option<Buffer<'static>>,
126}
127
128impl DecompressionInfo {
129 fn uncompressed_slice(&self) -> *mut [u8] {
131 std::ptr::slice_from_raw_parts_mut(
132 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
133 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
134 )
135 }
136}
137
138pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
139
140impl<S> Default for ActiveRequests<S> {
141 fn default() -> Self {
142 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
143 }
144}
145
146impl<S> ActiveRequests<S> {
147 fn complete_and_take_response(
148 &self,
149 request_id: RequestId,
150 status: zx::Status,
151 ) -> Option<(S, BlockFifoResponse)> {
152 self.0.lock().complete_and_take_response(request_id, status)
153 }
154
155 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
156 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
157 }
158}
159
160struct ActiveRequestsInner<S> {
161 requests: Slab<ActiveRequest<S>>,
162}
163
164impl<S> ActiveRequestsInner<S> {
166 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
168 let group = &mut self.requests[request_id.0];
169
170 group.count = group.count.checked_sub(1).unwrap();
171 if status != zx::Status::OK && group.status == zx::Status::OK {
172 group.status = status
173 }
174
175 fuchsia_trace::duration!(
176 "storage",
177 "block_server::finish_transaction",
178 "request_id" => request_id.0,
179 "group_completed" => group.count == 0,
180 "status" => status.into_raw());
181 if let Some(trace_flow_id) = group.trace_flow_id {
182 fuchsia_trace::flow_step!(
183 "storage",
184 "block_server::finish_request",
185 trace_flow_id.get().into()
186 );
187 }
188
189 if group.count == 0
190 && group.status == zx::Status::OK
191 && let Some(info) = &mut group.decompression_info
192 {
193 thread_local! {
194 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
195 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
196 }
197 DECOMPRESSOR.with(|decompressor| {
198 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
200 let mut decompressor = decompressor.borrow_mut();
201 if let Err(error) = decompressor.decompress_to_buffer(
202 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
203 target_slice,
204 ) {
205 log::warn!(error:?; "Decompression error");
206 group.status = zx::Status::IO_DATA_INTEGRITY;
207 };
208 });
209 }
210 }
211
212 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
214 let group = &self.requests[request_id.0];
215 match group.req_id {
216 Some(reqid) if group.count == 0 => {
217 let group = self.requests.remove(request_id.0);
218 Some((
219 group.session,
220 BlockFifoResponse {
221 status: group.status.into_raw(),
222 reqid,
223 group: group.group_or_request.group_id().unwrap_or(0),
224 ..Default::default()
225 },
226 ))
227 }
228 _ => None,
229 }
230 }
231
232 fn complete_and_take_response(
234 &mut self,
235 request_id: RequestId,
236 status: zx::Status,
237 ) -> Option<(S, BlockFifoResponse)> {
238 self.complete(request_id, status);
239 self.take_response(request_id)
240 }
241}
242
243pub struct BlockServer<SM: SessionManager> {
246 block_size: u32,
247 orchestrator: Arc<SM::Orchestrator>,
248}
249
250#[derive(Debug)]
252pub struct BlockOffsetMapping {
253 source_block_offset: u64,
254 target_block_offset: u64,
255 length: u64,
256}
257
258impl BlockOffsetMapping {
259 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
260 blocks.0 >= self.source_block_offset
261 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
262 }
263}
264
265impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
266 type Error = zx::Status;
267
268 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
269 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
270 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271 Ok(Self {
272 source_block_offset: wire.source_block_offset,
273 target_block_offset: wire.target_block_offset,
274 length: wire.length,
275 })
276 }
277}
278
279pub struct OffsetMap {
281 mapping: Option<BlockOffsetMapping>,
282 max_transfer_blocks: Option<NonZero<u32>>,
283}
284
285impl OffsetMap {
286 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
288 Self { mapping: Some(mapping), max_transfer_blocks }
289 }
290
291 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
293 Self { mapping: None, max_transfer_blocks }
294 }
295
296 pub fn is_empty(&self) -> bool {
297 self.mapping.is_none()
298 }
299
300 fn mapping(&self) -> Option<&BlockOffsetMapping> {
301 self.mapping.as_ref()
302 }
303
304 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
305 self.max_transfer_blocks
306 }
307}
308
309pub trait SessionManager: 'static {
312 type Orchestrator: Borrow<Self> + Send + Sync;
317
318 const SUPPORTS_DECOMPRESSION: bool;
319
320 type Session;
321
322 fn on_attach_vmo(
323 orchestrator: Arc<Self::Orchestrator>,
324 vmo: &Arc<zx::Vmo>,
325 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
326
327 fn open_session(
332 orchestrator: Arc<Self::Orchestrator>,
333 stream: fblock::SessionRequestStream,
334 offset_map: OffsetMap,
335 block_size: u32,
336 ) -> impl Future<Output = Result<(), Error>> + Send;
337
338 fn get_info(&self) -> Cow<'_, DeviceInfo>;
340
341 fn get_volume_info(
343 &self,
344 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
345 {
346 async { Err(zx::Status::NOT_SUPPORTED) }
347 }
348
349 fn query_slices(
351 &self,
352 _start_slices: &[u64],
353 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
354 async { Err(zx::Status::NOT_SUPPORTED) }
355 }
356
357 fn extend(
359 &self,
360 _start_slice: u64,
361 _slice_count: u64,
362 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363 async { Err(zx::Status::NOT_SUPPORTED) }
364 }
365
366 fn shrink(
368 &self,
369 _start_slice: u64,
370 _slice_count: u64,
371 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
372 async { Err(zx::Status::NOT_SUPPORTED) }
373 }
374
375 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
377}
378
379pub trait IntoOrchestrator {
383 type SM: SessionManager;
384
385 fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
386}
387
388impl<SM: SessionManager> BlockServer<SM> {
389 pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
390 Self { block_size, orchestrator: orchestrator.into_orchestrator() }
391 }
392
393 pub fn session_manager(&self) -> &SM {
394 self.orchestrator.as_ref().borrow()
395 }
396
397 pub async fn handle_requests(
399 &self,
400 mut requests: fblock::BlockRequestStream,
401 ) -> Result<(), Error> {
402 let scope = fasync::Scope::new();
403 loop {
404 match requests.try_next().await {
405 Ok(Some(request)) => {
406 if let Some(session) = self.handle_request(request).await? {
407 scope.spawn(session.map(|_| ()));
408 }
409 }
410 Ok(None) => break,
411 Err(err) => log::warn!(err:?; "Invalid request"),
412 }
413 }
414 scope.await;
415 Ok(())
416 }
417
418 async fn handle_request(
421 &self,
422 request: fblock::BlockRequest,
423 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
424 match request {
425 fblock::BlockRequest::GetInfo { responder } => {
426 let info = self.device_info();
427 let max_transfer_size = info.max_transfer_size(self.block_size);
428 let (block_count, mut flags) = match info.as_ref() {
429 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
430 (*block_count, *device_flags)
431 }
432 DeviceInfo::Partition(partition_info) => {
433 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
434 range.end - range.start
435 } else {
436 let volume_info = self.session_manager().get_volume_info().await?;
437 volume_info.0.slice_size * volume_info.1.partition_slice_count
438 / self.block_size as u64
439 };
440 (block_count, partition_info.device_flags)
441 }
442 };
443 if SM::SUPPORTS_DECOMPRESSION {
444 flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
445 }
446 responder.send(Ok(&fblock::BlockInfo {
447 block_count,
448 block_size: self.block_size,
449 max_transfer_size,
450 flags,
451 }))?;
452 }
453 fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
454 let info = self.device_info();
455 return Ok(Some(SM::open_session(
456 self.orchestrator.clone(),
457 session.into_stream(),
458 OffsetMap::empty(info.max_transfer_blocks()),
459 self.block_size,
460 )));
461 }
462 fblock::BlockRequest::OpenSessionWithOffsetMap {
463 session,
464 mapping,
465 control_handle: _,
466 } => {
467 let info = self.device_info();
468 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
469 Ok(m) => m,
470 Err(status) => {
471 session.close_with_epitaph(status)?;
472 return Ok(None);
473 }
474 };
475 if let Some(max) = info.block_count() {
476 if initial_mapping.target_block_offset + initial_mapping.length > max {
477 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
478 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
479 return Ok(None);
480 }
481 }
482 return Ok(Some(SM::open_session(
483 self.orchestrator.clone(),
484 session.into_stream(),
485 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
486 self.block_size,
487 )));
488 }
489 fblock::BlockRequest::GetTypeGuid { responder } => {
490 let info = self.device_info();
491 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
492 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
493 guid.value.copy_from_slice(&partition_info.type_guid);
494 responder.send(zx::sys::ZX_OK, Some(&guid))?;
495 } else {
496 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497 }
498 }
499 fblock::BlockRequest::GetInstanceGuid { responder } => {
500 let info = self.device_info();
501 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
502 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503 guid.value.copy_from_slice(&partition_info.instance_guid);
504 responder.send(zx::sys::ZX_OK, Some(&guid))?;
505 } else {
506 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
507 }
508 }
509 fblock::BlockRequest::GetName { responder } => {
510 let info = self.device_info();
511 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
512 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
513 } else {
514 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
515 }
516 }
517 fblock::BlockRequest::GetMetadata { responder } => {
518 let info = self.device_info();
519 if let DeviceInfo::Partition(info) = info.as_ref() {
520 let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
521 type_guid.value.copy_from_slice(&info.type_guid);
522 let mut instance_guid =
523 fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
524 instance_guid.value.copy_from_slice(&info.instance_guid);
525 responder.send(Ok(&fblock::BlockGetMetadataResponse {
526 name: Some(info.name.clone()),
527 type_guid: Some(type_guid),
528 instance_guid: Some(instance_guid),
529 start_block_offset: info.block_range.as_ref().map(|range| range.start),
530 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
531 flags: Some(info.flags),
532 ..Default::default()
533 }))?;
534 } else {
535 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
536 }
537 }
538 fblock::BlockRequest::QuerySlices { responder, start_slices } => {
539 match self.session_manager().query_slices(&start_slices).await {
540 Ok(mut results) => {
541 let results_len = results.len();
542 assert!(results_len <= 16);
543 results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
544 responder.send(
545 zx::sys::ZX_OK,
546 &results.try_into().unwrap(),
547 results_len as u64,
548 )?;
549 }
550 Err(s) => {
551 responder.send(
552 s.into_raw(),
553 &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
554 0,
555 )?;
556 }
557 }
558 }
559 fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
560 match self.session_manager().get_volume_info().await {
561 Ok((manager_info, volume_info)) => {
562 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
563 }
564 Err(s) => responder.send(s.into_raw(), None, None)?,
565 }
566 }
567 fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
568 responder.send(
569 zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
570 .into_raw(),
571 )?;
572 }
573 fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
574 responder.send(
575 zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
576 .into_raw(),
577 )?;
578 }
579 fblock::BlockRequest::Destroy { responder, .. } => {
580 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
581 }
582 }
583 Ok(None)
584 }
585
586 fn device_info(&self) -> Cow<'_, DeviceInfo> {
587 self.session_manager().get_info()
588 }
589}
590
591struct SessionHelper<SM: SessionManager> {
592 orchestrator: Arc<SM::Orchestrator>,
593 offset_map: OffsetMap,
594 block_size: u32,
595 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
596 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
597}
598
599struct VmoMapping {
600 base: usize,
601 size: usize,
602}
603
604impl VmoMapping {
605 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
606 let size = vmo.get_size().unwrap() as usize;
607 Ok(Arc::new(Self {
608 base: fuchsia_runtime::vmar_root_self()
609 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
610 .inspect_err(|error| {
611 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
612 })?,
613 size,
614 }))
615 }
616}
617
618impl Drop for VmoMapping {
619 fn drop(&mut self) {
620 unsafe {
622 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
623 }
624 }
625}
626
627impl<SM: SessionManager> SessionHelper<SM> {
628 fn new(
629 orchestrator: Arc<SM::Orchestrator>,
630 offset_map: OffsetMap,
631 block_size: u32,
632 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
633 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
634 Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
635 }
636
637 fn session_manager(&self) -> &SM {
638 self.orchestrator.as_ref().borrow()
639 }
640
641 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
642 match request {
643 fblock::SessionRequest::GetFifo { responder } => {
644 let rights = zx::Rights::TRANSFER
645 | zx::Rights::READ
646 | zx::Rights::WRITE
647 | zx::Rights::SIGNAL
648 | zx::Rights::WAIT;
649 match self.peer_fifo.duplicate_handle(rights) {
650 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
651 Err(s) => responder.send(Err(s.into_raw()))?,
652 }
653 Ok(())
654 }
655 fblock::SessionRequest::AttachVmo { vmo, responder } => {
656 let vmo = Arc::new(vmo);
657 let vmo_id = {
658 let mut vmos = self.vmos.lock();
659 if vmos.len() == u16::MAX as usize {
660 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
661 return Ok(());
662 } else {
663 let vmo_id = match vmos.last_entry() {
664 None => 1,
665 Some(o) => {
666 o.key().checked_add(1).unwrap_or_else(|| {
667 let mut vmo_id = 1;
668 for (&id, _) in &*vmos {
670 if id > vmo_id {
671 break;
672 }
673 vmo_id = id + 1;
674 }
675 vmo_id
676 })
677 }
678 };
679 vmos.insert(vmo_id, (vmo.clone(), None));
680 vmo_id
681 }
682 };
683 SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
684 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
685 Ok(())
686 }
687 fblock::SessionRequest::Close { responder } => {
688 responder.send(Ok(()))?;
689 Err(anyhow!("Closed"))
690 }
691 }
692 }
693
694 fn decode_fifo_request(
696 &self,
697 session: SM::Session,
698 request: &BlockFifoRequest,
699 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
700 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
701
702 let request_bytes = request.length as u64 * self.block_size as u64;
703
704 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
705 .ok_or(zx::Status::INVALID_ARGS)
706 .and_then(|code| {
707 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
708 if code != BlockOpcode::Read {
709 return Err(zx::Status::INVALID_ARGS);
710 }
711 if !SM::SUPPORTS_DECOMPRESSION {
712 return Err(zx::Status::NOT_SUPPORTED);
713 }
714 }
715 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
716 if request.length == 0 {
717 return Err(zx::Status::INVALID_ARGS);
718 }
719 if request.dev_offset.checked_add(request.length as u64).is_none()
721 || (code != BlockOpcode::Trim
722 && request_bytes.checked_add(request.vmo_offset).is_none())
723 {
724 return Err(zx::Status::OUT_OF_RANGE);
725 }
726 }
727 Ok(match code {
728 BlockOpcode::Read => Operation::Read {
729 device_block_offset: request.dev_offset,
730 block_count: request.length,
731 _unused: 0,
732 vmo_offset: request
733 .vmo_offset
734 .checked_mul(self.block_size as u64)
735 .ok_or(zx::Status::OUT_OF_RANGE)?,
736 options: ReadOptions {
737 inline_crypto: InlineCryptoOptions {
738 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
739 dun: request.dun,
740 slot: request.slot,
741 },
742 },
743 },
744 BlockOpcode::Write => {
745 let mut options = WriteOptions {
746 inline_crypto: InlineCryptoOptions {
747 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
748 dun: request.dun,
749 slot: request.slot,
750 },
751 ..WriteOptions::default()
752 };
753 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
754 options.flags |= WriteFlags::FORCE_ACCESS;
755 }
756 if flags.contains(BlockIoFlag::PRE_BARRIER) {
757 options.flags |= WriteFlags::PRE_BARRIER;
758 }
759 Operation::Write {
760 device_block_offset: request.dev_offset,
761 block_count: request.length,
762 _unused: 0,
763 options,
764 vmo_offset: request
765 .vmo_offset
766 .checked_mul(self.block_size as u64)
767 .ok_or(zx::Status::OUT_OF_RANGE)?,
768 }
769 }
770 BlockOpcode::Flush => Operation::Flush,
771 BlockOpcode::Trim => Operation::Trim {
772 device_block_offset: request.dev_offset,
773 block_count: request.length,
774 },
775 BlockOpcode::CloseVmo => Operation::CloseVmo,
776 })
777 });
778
779 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
780 GroupOrRequest::Group(request.group)
781 } else {
782 GroupOrRequest::Request(request.reqid)
783 };
784
785 let mut active_requests = self.session_manager().active_requests().0.lock();
786 let mut request_id = None;
787
788 if group_or_request.is_group() {
799 for (key, group) in &mut active_requests.requests {
803 if group.group_or_request == group_or_request {
804 if group.req_id.is_some() {
805 if group.status == zx::Status::OK {
807 group.status = zx::Status::INVALID_ARGS;
808 }
809 return Err(None);
811 }
812 if group.status == zx::Status::OK
814 && let Some(info) = &mut group.decompression_info
815 {
816 if let Ok(Operation::Read {
817 device_block_offset,
818 mut block_count,
819 options,
820 vmo_offset: 0,
821 ..
822 }) = operation
823 {
824 let remaining_bytes = info
825 .compressed_range
826 .end
827 .next_multiple_of(self.block_size as usize)
828 as u64
829 - info.bytes_so_far;
830 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
831 || request.total_compressed_bytes != 0
832 || request.uncompressed_bytes != 0
833 || request.compressed_prefix_bytes != 0
834 || (flags.contains(BlockIoFlag::GROUP_LAST)
835 && info.bytes_so_far + request_bytes
836 < info.compressed_range.end as u64)
837 || (!flags.contains(BlockIoFlag::GROUP_LAST)
838 && request_bytes >= remaining_bytes)
839 {
840 group.status = zx::Status::INVALID_ARGS;
841 } else {
842 if request_bytes > remaining_bytes {
851 block_count = (remaining_bytes / self.block_size as u64) as u32;
852 }
853
854 operation = Ok(Operation::ContinueDecompressedRead {
855 offset: info.bytes_so_far,
856 device_block_offset,
857 block_count,
858 options,
859 });
860
861 info.bytes_so_far += block_count as u64 * self.block_size as u64;
862 }
863 } else {
864 group.status = zx::Status::INVALID_ARGS;
865 }
866 }
867 if flags.contains(BlockIoFlag::GROUP_LAST) {
868 group.req_id = Some(request.reqid);
869 if group.status != zx::Status::OK {
872 operation = Err(group.status);
873 }
874 } else if group.status != zx::Status::OK {
875 return Err(None);
878 }
879 request_id = Some(RequestId(key));
880 group.count += 1;
881 break;
882 }
883 }
884 }
885
886 let is_single_request =
887 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
888
889 let mut decompression_info = None;
890 let vmo = match operation {
891 Ok(Operation::Read {
892 device_block_offset,
893 mut block_count,
894 options,
895 vmo_offset,
896 ..
897 }) => match self.vmos.lock().get_mut(&request.vmoid) {
898 Some((vmo, mapping)) => {
899 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
900 let compressed_range = request.compressed_prefix_bytes as usize
901 ..request.compressed_prefix_bytes as usize
902 + request.total_compressed_bytes as usize;
903 let required_buffer_size =
904 compressed_range.end.next_multiple_of(self.block_size as usize);
905
906 if compressed_range.start >= compressed_range.end
908 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
909 || (is_single_request && request_bytes < compressed_range.end as u64)
910 || (!is_single_request && request_bytes >= required_buffer_size as u64)
911 {
912 Err(zx::Status::INVALID_ARGS)
913 } else {
914 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
921 block_count =
922 (required_buffer_size / self.block_size as usize) as u32;
923 required_buffer_size as u64
924 } else {
925 request_bytes
926 };
927
928 match mapping {
930 Some(mapping) => Ok(mapping.clone()),
931 None => {
932 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
933 }
934 }
935 .and_then(|mapping| {
936 if vmo_offset
939 .checked_add(request.uncompressed_bytes as u64)
940 .is_some_and(|end| end <= mapping.size as u64)
941 {
942 Ok(mapping)
943 } else {
944 Err(zx::Status::OUT_OF_RANGE)
945 }
946 })
947 .map(|mapping| {
948 operation = Ok(Operation::StartDecompressedRead {
953 required_buffer_size,
954 device_block_offset,
955 block_count,
956 options,
957 });
958 decompression_info = Some(DecompressionInfo {
961 compressed_range,
962 bytes_so_far,
963 mapping,
964 uncompressed_range: vmo_offset
965 ..vmo_offset + request.uncompressed_bytes as u64,
966 buffer: None,
967 });
968 None
969 })
970 }
971 } else {
972 Ok(Some(vmo.clone()))
973 }
974 }
975 None => Err(zx::Status::IO),
976 },
977 Ok(Operation::Write { .. }) => self
978 .vmos
979 .lock()
980 .get(&request.vmoid)
981 .cloned()
982 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
983 Ok(Operation::CloseVmo) => {
984 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
985 let vmo_clone = vmo.clone();
986 Epoch::global().defer(move || drop(vmo_clone));
989 Ok(Some(vmo))
990 })
991 }
992 _ => Ok(None),
993 }
994 .unwrap_or_else(|e| {
995 operation = Err(e);
996 None
997 });
998
999 let trace_flow_id = NonZero::new(request.trace_flow_id);
1000 let request_id = request_id.unwrap_or_else(|| {
1001 RequestId(active_requests.requests.insert(ActiveRequest {
1002 session,
1003 group_or_request,
1004 trace_flow_id,
1005 _epoch_guard: Epoch::global().guard(),
1006 status: zx::Status::OK,
1007 count: 1,
1008 req_id: is_single_request.then_some(request.reqid),
1009 decompression_info,
1010 }))
1011 });
1012
1013 Ok(DecodedRequest {
1014 request_id,
1015 trace_flow_id,
1016 operation: operation.map_err(|status| {
1017 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1018 })?,
1019 vmo,
1020 })
1021 }
1022
1023 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1024 std::mem::take(&mut *self.vmos.lock())
1025 }
1026
1027 fn map_request(
1029 &self,
1030 mut request: DecodedRequest,
1031 active_request: &mut ActiveRequest<SM::Session>,
1032 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1033 if active_request.status != zx::Status::OK {
1034 return Err(zx::Status::BAD_STATE);
1035 }
1036 let mapping = self.offset_map.mapping();
1037 match (mapping, request.operation.blocks()) {
1038 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1039 return Err(zx::Status::OUT_OF_RANGE);
1040 }
1041 _ => {}
1042 }
1043 let remainder = request.operation.map(
1044 self.offset_map.mapping(),
1045 self.offset_map.max_transfer_blocks(),
1046 self.block_size,
1047 );
1048 if remainder.is_some() {
1049 active_request.count += 1;
1050 }
1051 static CACHE: AtomicU64 = AtomicU64::new(0);
1052 if let Some(context) =
1053 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1054 {
1055 use fuchsia_trace::ArgValue;
1056 let trace_args = [
1057 ArgValue::of("request_id", request.request_id.0),
1058 ArgValue::of("opcode", request.operation.trace_label()),
1059 ];
1060 let _scope =
1061 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1062 if let Some(trace_flow_id) = active_request.trace_flow_id {
1063 fuchsia_trace::flow_step(
1064 &context,
1065 "block_server::start_transaction",
1066 trace_flow_id.get().into(),
1067 &[],
1068 );
1069 }
1070 }
1071 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1072 Ok((request, remainder))
1073 }
1074
1075 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1076 self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1077 }
1078}
1079
1080#[repr(transparent)]
1081#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1082pub struct RequestId(usize);
1083
1084#[derive(Clone, Debug)]
1085struct DecodedRequest {
1086 request_id: RequestId,
1087 trace_flow_id: TraceFlowId,
1088 operation: Operation,
1089 vmo: Option<Arc<zx::Vmo>>,
1090}
1091
1092pub type WriteFlags = block_protocol::WriteFlags;
1094pub type WriteOptions = block_protocol::WriteOptions;
1095pub type ReadOptions = block_protocol::ReadOptions;
1096pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1097
1098#[repr(C)]
1099#[derive(Clone, Debug, PartialEq, Eq)]
1100pub enum Operation {
1101 Read {
1106 device_block_offset: u64,
1107 block_count: u32,
1108 _unused: u32,
1109 vmo_offset: u64,
1110 options: ReadOptions,
1111 },
1112 Write {
1113 device_block_offset: u64,
1114 block_count: u32,
1115 _unused: u32,
1116 vmo_offset: u64,
1117 options: WriteOptions,
1118 },
1119 Flush,
1120 Trim {
1121 device_block_offset: u64,
1122 block_count: u32,
1123 },
1124 CloseVmo,
1126 StartDecompressedRead {
1127 required_buffer_size: usize,
1128 device_block_offset: u64,
1129 block_count: u32,
1130 options: ReadOptions,
1131 },
1132 ContinueDecompressedRead {
1133 offset: u64,
1134 device_block_offset: u64,
1135 block_count: u32,
1136 options: ReadOptions,
1137 },
1138}
1139
1140impl Operation {
1141 fn trace_label(&self) -> &'static str {
1142 match self {
1143 Operation::Read { .. } => "read",
1144 Operation::Write { .. } => "write",
1145 Operation::Flush { .. } => "flush",
1146 Operation::Trim { .. } => "trim",
1147 Operation::CloseVmo { .. } => "close_vmo",
1148 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1149 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1150 }
1151 }
1152
1153 fn blocks(&self) -> Option<(u64, u32)> {
1155 match self {
1156 Operation::Read { device_block_offset, block_count, .. }
1157 | Operation::Write { device_block_offset, block_count, .. }
1158 | Operation::Trim { device_block_offset, block_count, .. } => {
1159 Some((*device_block_offset, *block_count))
1160 }
1161 _ => None,
1162 }
1163 }
1164
1165 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1167 match self {
1168 Operation::Read { device_block_offset, block_count, .. }
1169 | Operation::Write { device_block_offset, block_count, .. }
1170 | Operation::Trim { device_block_offset, block_count, .. } => {
1171 Some((device_block_offset, block_count))
1172 }
1173 _ => None,
1174 }
1175 }
1176
1177 fn map(
1180 &mut self,
1181 mapping: Option<&BlockOffsetMapping>,
1182 max_blocks: Option<NonZero<u32>>,
1183 block_size: u32,
1184 ) -> Option<Self> {
1185 let mut max = match self {
1186 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1187 _ => None,
1188 };
1189 let (offset, length) = self.blocks_mut()?;
1190 let orig_offset = *offset;
1191 if let Some(mapping) = mapping {
1192 let delta = *offset - mapping.source_block_offset;
1193 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1194 *offset = mapping.target_block_offset + delta;
1195 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1196 max = match max {
1197 None => Some(mapping_max),
1198 Some(m) => Some(std::cmp::min(m, mapping_max)),
1199 };
1200 };
1201 if let Some(max) = max {
1202 if *length as u64 > max {
1203 let rem = (*length as u64 - max) as u32;
1204 *length = max as u32;
1205 return Some(match self {
1206 Operation::Read {
1207 device_block_offset: _,
1208 block_count: _,
1209 vmo_offset,
1210 _unused,
1211 options,
1212 } => {
1213 let mut options = *options;
1214 options.inline_crypto.dun += max as u32;
1215 Operation::Read {
1216 device_block_offset: orig_offset + max,
1217 block_count: rem,
1218 vmo_offset: *vmo_offset + max * block_size as u64,
1219 _unused: *_unused,
1220 options: options,
1221 }
1222 }
1223 Operation::Write {
1224 device_block_offset: _,
1225 block_count: _,
1226 _unused,
1227 vmo_offset,
1228 options,
1229 } => {
1230 let mut options = *options;
1231 options.inline_crypto.dun += max as u32;
1232 Operation::Write {
1233 device_block_offset: orig_offset + max,
1234 block_count: rem,
1235 _unused: *_unused,
1236 vmo_offset: *vmo_offset + max * block_size as u64,
1237 options: options,
1238 }
1239 }
1240 Operation::Trim { device_block_offset: _, block_count: _ } => {
1241 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1242 }
1243 _ => unreachable!(),
1244 });
1245 }
1246 }
1247 None
1248 }
1249
1250 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1252 if let Operation::Write { options, .. } = self {
1253 options.flags.contains(value)
1254 } else {
1255 false
1256 }
1257 }
1258
1259 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1261 if let Operation::Write { options, .. } = self {
1262 let result = options.flags.contains(value);
1263 options.flags.remove(value);
1264 result
1265 } else {
1266 false
1267 }
1268 }
1269}
1270
1271#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1272pub enum GroupOrRequest {
1273 Group(u16),
1274 Request(u32),
1275}
1276
1277impl GroupOrRequest {
1278 fn is_group(&self) -> bool {
1279 matches!(self, Self::Group(_))
1280 }
1281
1282 fn group_id(&self) -> Option<u16> {
1283 match self {
1284 Self::Group(id) => Some(*id),
1285 Self::Request(_) => None,
1286 }
1287 }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292 use super::{
1293 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1294 TraceFlowId,
1295 };
1296 use assert_matches::assert_matches;
1297 use block_protocol::{
1298 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1299 WriteFlags, WriteOptions,
1300 };
1301 use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1302 use fuchsia_sync::Mutex;
1303 use futures::FutureExt as _;
1304 use futures::channel::oneshot;
1305 use futures::future::BoxFuture;
1306 use std::borrow::Cow;
1307 use std::future::poll_fn;
1308 use std::num::NonZero;
1309 use std::pin::pin;
1310 use std::sync::Arc;
1311 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1312 use std::task::{Context, Poll};
1313 use zx::HandleBased as _;
1314 use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1315
1316 #[derive(Default)]
1317 struct MockInterface {
1318 read_hook: Option<
1319 Box<
1320 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1321 + Send
1322 + Sync,
1323 >,
1324 >,
1325 write_hook:
1326 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1327 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1328 }
1329
1330 impl super::async_interface::Interface for MockInterface {
1331 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1332 Ok(())
1333 }
1334
1335 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1336 Cow::Owned(test_device_info())
1337 }
1338
1339 async fn read(
1340 &self,
1341 device_block_offset: u64,
1342 block_count: u32,
1343 vmo: &Arc<zx::Vmo>,
1344 vmo_offset: u64,
1345 _opts: ReadOptions,
1346 _trace_flow_id: TraceFlowId,
1347 ) -> Result<(), zx::Status> {
1348 if let Some(read_hook) = &self.read_hook {
1349 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1350 } else {
1351 unimplemented!();
1352 }
1353 }
1354
1355 async fn write(
1356 &self,
1357 device_block_offset: u64,
1358 _block_count: u32,
1359 _vmo: &Arc<zx::Vmo>,
1360 _vmo_offset: u64,
1361 opts: WriteOptions,
1362 _trace_flow_id: TraceFlowId,
1363 ) -> Result<(), zx::Status> {
1364 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1365 && let Some(barrier_hook) = &self.barrier_hook
1366 {
1367 barrier_hook()?;
1368 }
1369 if let Some(write_hook) = &self.write_hook {
1370 write_hook(device_block_offset).await
1371 } else {
1372 unimplemented!();
1373 }
1374 }
1375
1376 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1377 Ok(())
1378 }
1379
1380 async fn trim(
1381 &self,
1382 _device_block_offset: u64,
1383 _block_count: u32,
1384 _trace_flow_id: TraceFlowId,
1385 ) -> Result<(), zx::Status> {
1386 unreachable!();
1387 }
1388
1389 async fn get_volume_info(
1390 &self,
1391 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1392 let () = std::future::pending().await;
1394 unreachable!();
1395 }
1396 }
1397
1398 const BLOCK_SIZE: u32 = 512;
1399 const MAX_TRANSFER_BLOCKS: u32 = 10;
1400
1401 fn test_device_info() -> DeviceInfo {
1402 DeviceInfo::Partition(PartitionInfo {
1403 device_flags: fblock::DeviceFlag::READONLY
1404 | fblock::DeviceFlag::BARRIER_SUPPORT
1405 | fblock::DeviceFlag::FUA_SUPPORT,
1406 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1407 block_range: Some(0..100),
1408 type_guid: [1; 16],
1409 instance_guid: [2; 16],
1410 name: "foo".to_string(),
1411 flags: 0xabcd,
1412 })
1413 }
1414
1415 #[fuchsia::test]
1416 async fn test_barriers_ordering() {
1417 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1418 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1419 let barrier_called = Arc::new(AtomicBool::new(false));
1420
1421 futures::join!(
1422 async move {
1423 let barrier_called_clone = barrier_called.clone();
1424 let block_server = BlockServer::new(
1425 BLOCK_SIZE,
1426 Arc::new(MockInterface {
1427 barrier_hook: Some(Box::new(move || {
1428 barrier_called.store(true, Ordering::Relaxed);
1429 Ok(())
1430 })),
1431 write_hook: Some(Box::new(move |device_block_offset| {
1432 let barrier_called = barrier_called_clone.clone();
1433 Box::pin(async move {
1434 if device_block_offset % 2 == 0 {
1436 fasync::Timer::new(fasync::MonotonicInstant::after(
1437 zx::MonotonicDuration::from_millis(200),
1438 ))
1439 .await;
1440 }
1441 assert!(barrier_called.load(Ordering::Relaxed));
1442 Ok(())
1443 })
1444 })),
1445 ..MockInterface::default()
1446 }),
1447 );
1448 block_server.handle_requests(stream).await.unwrap();
1449 },
1450 async move {
1451 let (session_proxy, server) = fidl::endpoints::create_proxy();
1452
1453 proxy.open_session(server).unwrap();
1454
1455 let vmo_id = session_proxy
1456 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1457 .await
1458 .unwrap()
1459 .unwrap();
1460 assert_ne!(vmo_id.id, 0);
1461
1462 let mut fifo =
1463 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1464 let (mut reader, mut writer) = fifo.async_io();
1465
1466 writer
1467 .write_entries(&BlockFifoRequest {
1468 command: BlockFifoCommand {
1469 opcode: BlockOpcode::Write.into_primitive(),
1470 flags: BlockIoFlag::PRE_BARRIER.bits(),
1471 ..Default::default()
1472 },
1473 vmoid: vmo_id.id,
1474 dev_offset: 0,
1475 length: 5,
1476 vmo_offset: 6,
1477 ..Default::default()
1478 })
1479 .await
1480 .unwrap();
1481
1482 for i in 0..10 {
1483 writer
1484 .write_entries(&BlockFifoRequest {
1485 command: BlockFifoCommand {
1486 opcode: BlockOpcode::Write.into_primitive(),
1487 ..Default::default()
1488 },
1489 vmoid: vmo_id.id,
1490 dev_offset: i + 1,
1491 length: 5,
1492 vmo_offset: 6,
1493 ..Default::default()
1494 })
1495 .await
1496 .unwrap();
1497 }
1498 for _ in 0..11 {
1499 let mut response = BlockFifoResponse::default();
1500 reader.read_entries(&mut response).await.unwrap();
1501 assert_eq!(response.status, zx::sys::ZX_OK);
1502 }
1503
1504 std::mem::drop(proxy);
1505 }
1506 );
1507 }
1508
1509 #[fuchsia::test]
1510 async fn test_info() {
1511 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1512
1513 futures::join!(
1514 async {
1515 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1516 block_server.handle_requests(stream).await.unwrap();
1517 },
1518 async {
1519 let expected_info = test_device_info();
1520 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1521 info
1522 } else {
1523 unreachable!()
1524 };
1525
1526 let block_info = proxy.get_info().await.unwrap().unwrap();
1527 assert_eq!(
1528 block_info.block_count,
1529 partition_info.block_range.as_ref().unwrap().end
1530 - partition_info.block_range.as_ref().unwrap().start
1531 );
1532 assert_eq!(
1533 block_info.flags,
1534 fblock::DeviceFlag::READONLY
1535 | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1536 | fblock::DeviceFlag::BARRIER_SUPPORT
1537 | fblock::DeviceFlag::FUA_SUPPORT
1538 );
1539
1540 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1541
1542 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1543 assert_eq!(status, zx::sys::ZX_OK);
1544 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1545
1546 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1547 assert_eq!(status, zx::sys::ZX_OK);
1548 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1549
1550 let (status, name) = proxy.get_name().await.unwrap();
1551 assert_eq!(status, zx::sys::ZX_OK);
1552 assert_eq!(name.as_ref(), Some(&partition_info.name));
1553
1554 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1555 assert_eq!(metadata.name, name);
1556 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1557 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1558 assert_eq!(
1559 metadata.start_block_offset,
1560 Some(partition_info.block_range.as_ref().unwrap().start)
1561 );
1562 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1563 assert_eq!(metadata.flags, Some(partition_info.flags));
1564
1565 std::mem::drop(proxy);
1566 }
1567 );
1568 }
1569
1570 #[fuchsia::test]
1571 async fn test_attach_vmo() {
1572 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1573
1574 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1575 let koid = vmo.koid().unwrap();
1576
1577 futures::join!(
1578 async {
1579 let block_server = BlockServer::new(
1580 BLOCK_SIZE,
1581 Arc::new(MockInterface {
1582 read_hook: Some(Box::new(move |_, _, vmo, _| {
1583 assert_eq!(vmo.koid().unwrap(), koid);
1584 Box::pin(async { Ok(()) })
1585 })),
1586 ..MockInterface::default()
1587 }),
1588 );
1589 block_server.handle_requests(stream).await.unwrap();
1590 },
1591 async move {
1592 let (session_proxy, server) = fidl::endpoints::create_proxy();
1593
1594 proxy.open_session(server).unwrap();
1595
1596 let vmo_id = session_proxy
1597 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1598 .await
1599 .unwrap()
1600 .unwrap();
1601 assert_ne!(vmo_id.id, 0);
1602
1603 let mut fifo =
1604 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1605 let (mut reader, mut writer) = fifo.async_io();
1606
1607 let mut count = 1;
1609 loop {
1610 match session_proxy
1611 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1612 .await
1613 .unwrap()
1614 {
1615 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1616 Err(e) => {
1617 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1618 break;
1619 }
1620 }
1621
1622 if count % 10 == 0 {
1624 writer
1625 .write_entries(&BlockFifoRequest {
1626 command: BlockFifoCommand {
1627 opcode: BlockOpcode::Read.into_primitive(),
1628 ..Default::default()
1629 },
1630 vmoid: vmo_id.id,
1631 length: 1,
1632 ..Default::default()
1633 })
1634 .await
1635 .unwrap();
1636
1637 let mut response = BlockFifoResponse::default();
1638 reader.read_entries(&mut response).await.unwrap();
1639 assert_eq!(response.status, zx::sys::ZX_OK);
1640 }
1641
1642 count += 1;
1643 }
1644
1645 assert_eq!(count, u16::MAX as u64);
1646
1647 writer
1649 .write_entries(&BlockFifoRequest {
1650 command: BlockFifoCommand {
1651 opcode: BlockOpcode::CloseVmo.into_primitive(),
1652 ..Default::default()
1653 },
1654 vmoid: vmo_id.id,
1655 ..Default::default()
1656 })
1657 .await
1658 .unwrap();
1659
1660 let mut response = BlockFifoResponse::default();
1661 reader.read_entries(&mut response).await.unwrap();
1662 assert_eq!(response.status, zx::sys::ZX_OK);
1663
1664 let new_vmo_id = session_proxy
1665 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1666 .await
1667 .unwrap()
1668 .unwrap();
1669 assert_eq!(new_vmo_id.id, vmo_id.id);
1671
1672 std::mem::drop(proxy);
1673 }
1674 );
1675 }
1676
1677 #[fuchsia::test]
1678 async fn test_close() {
1679 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1680
1681 let mut server = std::pin::pin!(
1682 async {
1683 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1684 block_server.handle_requests(stream).await.unwrap();
1685 }
1686 .fuse()
1687 );
1688
1689 let mut client = std::pin::pin!(
1690 async {
1691 let (session_proxy, server) = fidl::endpoints::create_proxy();
1692
1693 proxy.open_session(server).unwrap();
1694
1695 std::mem::drop(proxy);
1698
1699 session_proxy.close().await.unwrap().unwrap();
1700
1701 let _: () = std::future::pending().await;
1703 }
1704 .fuse()
1705 );
1706
1707 futures::select!(
1708 _ = server => {}
1709 _ = client => unreachable!(),
1710 );
1711 }
1712
1713 #[derive(Default)]
1714 struct IoMockInterface {
1715 do_checks: bool,
1716 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1717 return_errors: bool,
1718 }
1719
1720 #[derive(Debug)]
1721 enum ExpectedOp {
1722 Read(u64, u32, u64),
1723 Write(u64, u32, u64),
1724 Trim(u64, u32),
1725 Flush,
1726 }
1727
1728 impl super::async_interface::Interface for IoMockInterface {
1729 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1730 Ok(())
1731 }
1732
1733 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1734 Cow::Owned(test_device_info())
1735 }
1736
1737 async fn read(
1738 &self,
1739 device_block_offset: u64,
1740 block_count: u32,
1741 _vmo: &Arc<zx::Vmo>,
1742 vmo_offset: u64,
1743 _opts: ReadOptions,
1744 _trace_flow_id: TraceFlowId,
1745 ) -> Result<(), zx::Status> {
1746 if self.return_errors {
1747 Err(zx::Status::INTERNAL)
1748 } else {
1749 if self.do_checks {
1750 assert_matches!(
1751 self.expected_op.lock().take(),
1752 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1753 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1754 "Read {device_block_offset} {block_count} {vmo_offset}"
1755 );
1756 }
1757 Ok(())
1758 }
1759 }
1760
1761 async fn write(
1762 &self,
1763 device_block_offset: u64,
1764 block_count: u32,
1765 _vmo: &Arc<zx::Vmo>,
1766 vmo_offset: u64,
1767 _write_opts: WriteOptions,
1768 _trace_flow_id: TraceFlowId,
1769 ) -> Result<(), zx::Status> {
1770 if self.return_errors {
1771 Err(zx::Status::NOT_SUPPORTED)
1772 } else {
1773 if self.do_checks {
1774 assert_matches!(
1775 self.expected_op.lock().take(),
1776 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1777 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1778 "Write {device_block_offset} {block_count} {vmo_offset}"
1779 );
1780 }
1781 Ok(())
1782 }
1783 }
1784
1785 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1786 if self.return_errors {
1787 Err(zx::Status::NO_RESOURCES)
1788 } else {
1789 if self.do_checks {
1790 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1791 }
1792 Ok(())
1793 }
1794 }
1795
1796 async fn trim(
1797 &self,
1798 device_block_offset: u64,
1799 block_count: u32,
1800 _trace_flow_id: TraceFlowId,
1801 ) -> Result<(), zx::Status> {
1802 if self.return_errors {
1803 Err(zx::Status::NO_MEMORY)
1804 } else {
1805 if self.do_checks {
1806 assert_matches!(
1807 self.expected_op.lock().take(),
1808 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1809 block_count == b,
1810 "Trim {device_block_offset} {block_count}"
1811 );
1812 }
1813 Ok(())
1814 }
1815 }
1816 }
1817
1818 #[fuchsia::test]
1819 async fn test_io() {
1820 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1821
1822 let expected_op = Arc::new(Mutex::new(None));
1823 let expected_op_clone = expected_op.clone();
1824
1825 let server = async {
1826 let block_server = BlockServer::new(
1827 BLOCK_SIZE,
1828 Arc::new(IoMockInterface {
1829 return_errors: false,
1830 do_checks: true,
1831 expected_op: expected_op_clone,
1832 }),
1833 );
1834 block_server.handle_requests(stream).await.unwrap();
1835 };
1836
1837 let client = async move {
1838 let (session_proxy, server) = fidl::endpoints::create_proxy();
1839
1840 proxy.open_session(server).unwrap();
1841
1842 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1843 let vmo_id = session_proxy
1844 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1845 .await
1846 .unwrap()
1847 .unwrap();
1848
1849 let mut fifo =
1850 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1851 let (mut reader, mut writer) = fifo.async_io();
1852
1853 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1855 writer
1856 .write_entries(&BlockFifoRequest {
1857 command: BlockFifoCommand {
1858 opcode: BlockOpcode::Read.into_primitive(),
1859 ..Default::default()
1860 },
1861 vmoid: vmo_id.id,
1862 dev_offset: 1,
1863 length: 2,
1864 vmo_offset: 3,
1865 ..Default::default()
1866 })
1867 .await
1868 .unwrap();
1869
1870 let mut response = BlockFifoResponse::default();
1871 reader.read_entries(&mut response).await.unwrap();
1872 assert_eq!(response.status, zx::sys::ZX_OK);
1873
1874 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1876 writer
1877 .write_entries(&BlockFifoRequest {
1878 command: BlockFifoCommand {
1879 opcode: BlockOpcode::Write.into_primitive(),
1880 ..Default::default()
1881 },
1882 vmoid: vmo_id.id,
1883 dev_offset: 4,
1884 length: 5,
1885 vmo_offset: 6,
1886 ..Default::default()
1887 })
1888 .await
1889 .unwrap();
1890
1891 let mut response = BlockFifoResponse::default();
1892 reader.read_entries(&mut response).await.unwrap();
1893 assert_eq!(response.status, zx::sys::ZX_OK);
1894
1895 *expected_op.lock() = Some(ExpectedOp::Flush);
1897 writer
1898 .write_entries(&BlockFifoRequest {
1899 command: BlockFifoCommand {
1900 opcode: BlockOpcode::Flush.into_primitive(),
1901 ..Default::default()
1902 },
1903 ..Default::default()
1904 })
1905 .await
1906 .unwrap();
1907
1908 reader.read_entries(&mut response).await.unwrap();
1909 assert_eq!(response.status, zx::sys::ZX_OK);
1910
1911 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1913 writer
1914 .write_entries(&BlockFifoRequest {
1915 command: BlockFifoCommand {
1916 opcode: BlockOpcode::Trim.into_primitive(),
1917 ..Default::default()
1918 },
1919 dev_offset: 7,
1920 length: 8,
1921 ..Default::default()
1922 })
1923 .await
1924 .unwrap();
1925
1926 reader.read_entries(&mut response).await.unwrap();
1927 assert_eq!(response.status, zx::sys::ZX_OK);
1928
1929 std::mem::drop(proxy);
1930 };
1931
1932 futures::join!(server, client);
1933 }
1934
1935 #[fuchsia::test]
1936 async fn test_io_errors() {
1937 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1938
1939 futures::join!(
1940 async {
1941 let block_server = BlockServer::new(
1942 BLOCK_SIZE,
1943 Arc::new(IoMockInterface {
1944 return_errors: true,
1945 do_checks: false,
1946 expected_op: Arc::new(Mutex::new(None)),
1947 }),
1948 );
1949 block_server.handle_requests(stream).await.unwrap();
1950 },
1951 async move {
1952 let (session_proxy, server) = fidl::endpoints::create_proxy();
1953
1954 proxy.open_session(server).unwrap();
1955
1956 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1957 let vmo_id = session_proxy
1958 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1959 .await
1960 .unwrap()
1961 .unwrap();
1962
1963 let mut fifo =
1964 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1965 let (mut reader, mut writer) = fifo.async_io();
1966
1967 writer
1969 .write_entries(&BlockFifoRequest {
1970 command: BlockFifoCommand {
1971 opcode: BlockOpcode::Read.into_primitive(),
1972 ..Default::default()
1973 },
1974 vmoid: vmo_id.id,
1975 length: 1,
1976 reqid: 1,
1977 ..Default::default()
1978 })
1979 .await
1980 .unwrap();
1981
1982 let mut response = BlockFifoResponse::default();
1983 reader.read_entries(&mut response).await.unwrap();
1984 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1985
1986 writer
1988 .write_entries(&BlockFifoRequest {
1989 command: BlockFifoCommand {
1990 opcode: BlockOpcode::Write.into_primitive(),
1991 ..Default::default()
1992 },
1993 vmoid: vmo_id.id,
1994 length: 1,
1995 reqid: 2,
1996 ..Default::default()
1997 })
1998 .await
1999 .unwrap();
2000
2001 reader.read_entries(&mut response).await.unwrap();
2002 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2003
2004 writer
2006 .write_entries(&BlockFifoRequest {
2007 command: BlockFifoCommand {
2008 opcode: BlockOpcode::Flush.into_primitive(),
2009 ..Default::default()
2010 },
2011 reqid: 3,
2012 ..Default::default()
2013 })
2014 .await
2015 .unwrap();
2016
2017 reader.read_entries(&mut response).await.unwrap();
2018 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2019
2020 writer
2022 .write_entries(&BlockFifoRequest {
2023 command: BlockFifoCommand {
2024 opcode: BlockOpcode::Trim.into_primitive(),
2025 ..Default::default()
2026 },
2027 reqid: 4,
2028 length: 1,
2029 ..Default::default()
2030 })
2031 .await
2032 .unwrap();
2033
2034 reader.read_entries(&mut response).await.unwrap();
2035 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2036
2037 std::mem::drop(proxy);
2038 }
2039 );
2040 }
2041
2042 #[fuchsia::test]
2043 async fn test_invalid_args() {
2044 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2045
2046 futures::join!(
2047 async {
2048 let block_server = BlockServer::new(
2049 BLOCK_SIZE,
2050 Arc::new(IoMockInterface {
2051 return_errors: false,
2052 do_checks: false,
2053 expected_op: Arc::new(Mutex::new(None)),
2054 }),
2055 );
2056 block_server.handle_requests(stream).await.unwrap();
2057 },
2058 async move {
2059 let (session_proxy, server) = fidl::endpoints::create_proxy();
2060
2061 proxy.open_session(server).unwrap();
2062
2063 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2064 let vmo_id = session_proxy
2065 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2066 .await
2067 .unwrap()
2068 .unwrap();
2069
2070 let mut fifo =
2071 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2072
2073 async fn test(
2074 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2075 request: BlockFifoRequest,
2076 ) -> Result<(), zx::Status> {
2077 let (mut reader, mut writer) = fifo.async_io();
2078 writer.write_entries(&request).await.unwrap();
2079 let mut response = BlockFifoResponse::default();
2080 reader.read_entries(&mut response).await.unwrap();
2081 zx::Status::ok(response.status)
2082 }
2083
2084 let good_read_request = || BlockFifoRequest {
2087 command: BlockFifoCommand {
2088 opcode: BlockOpcode::Read.into_primitive(),
2089 ..Default::default()
2090 },
2091 length: 1,
2092 vmoid: vmo_id.id,
2093 ..Default::default()
2094 };
2095
2096 assert_eq!(
2097 test(
2098 &mut fifo,
2099 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2100 )
2101 .await,
2102 Err(zx::Status::IO)
2103 );
2104
2105 assert_eq!(
2106 test(
2107 &mut fifo,
2108 BlockFifoRequest {
2109 vmo_offset: 0xffff_ffff_ffff_ffff,
2110 ..good_read_request()
2111 }
2112 )
2113 .await,
2114 Err(zx::Status::OUT_OF_RANGE)
2115 );
2116
2117 assert_eq!(
2118 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2119 Err(zx::Status::INVALID_ARGS)
2120 );
2121
2122 let good_write_request = || BlockFifoRequest {
2125 command: BlockFifoCommand {
2126 opcode: BlockOpcode::Write.into_primitive(),
2127 ..Default::default()
2128 },
2129 length: 1,
2130 vmoid: vmo_id.id,
2131 ..Default::default()
2132 };
2133
2134 assert_eq!(
2135 test(
2136 &mut fifo,
2137 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2138 )
2139 .await,
2140 Err(zx::Status::IO)
2141 );
2142
2143 assert_eq!(
2144 test(
2145 &mut fifo,
2146 BlockFifoRequest {
2147 vmo_offset: 0xffff_ffff_ffff_ffff,
2148 ..good_write_request()
2149 }
2150 )
2151 .await,
2152 Err(zx::Status::OUT_OF_RANGE)
2153 );
2154
2155 assert_eq!(
2156 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2157 Err(zx::Status::INVALID_ARGS)
2158 );
2159
2160 assert_eq!(
2163 test(
2164 &mut fifo,
2165 BlockFifoRequest {
2166 command: BlockFifoCommand {
2167 opcode: BlockOpcode::CloseVmo.into_primitive(),
2168 ..Default::default()
2169 },
2170 vmoid: vmo_id.id + 1,
2171 ..Default::default()
2172 }
2173 )
2174 .await,
2175 Err(zx::Status::IO)
2176 );
2177
2178 std::mem::drop(proxy);
2179 }
2180 );
2181 }
2182
2183 #[fuchsia::test]
2184 async fn test_concurrent_requests() {
2185 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2186
2187 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2188 let waiting_readers_clone = waiting_readers.clone();
2189
2190 futures::join!(
2191 async move {
2192 let block_server = BlockServer::new(
2193 BLOCK_SIZE,
2194 Arc::new(MockInterface {
2195 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2196 let (tx, rx) = oneshot::channel();
2197 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2198 Box::pin(async move {
2199 let _ = rx.await;
2200 Ok(())
2201 })
2202 })),
2203 ..MockInterface::default()
2204 }),
2205 );
2206 block_server.handle_requests(stream).await.unwrap();
2207 },
2208 async move {
2209 let (session_proxy, server) = fidl::endpoints::create_proxy();
2210
2211 proxy.open_session(server).unwrap();
2212
2213 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2214 let vmo_id = session_proxy
2215 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2216 .await
2217 .unwrap()
2218 .unwrap();
2219
2220 let mut fifo =
2221 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2222 let (mut reader, mut writer) = fifo.async_io();
2223
2224 writer
2225 .write_entries(&BlockFifoRequest {
2226 command: BlockFifoCommand {
2227 opcode: BlockOpcode::Read.into_primitive(),
2228 ..Default::default()
2229 },
2230 reqid: 1,
2231 dev_offset: 1, vmoid: vmo_id.id,
2233 length: 1,
2234 ..Default::default()
2235 })
2236 .await
2237 .unwrap();
2238
2239 writer
2240 .write_entries(&BlockFifoRequest {
2241 command: BlockFifoCommand {
2242 opcode: BlockOpcode::Read.into_primitive(),
2243 ..Default::default()
2244 },
2245 reqid: 2,
2246 dev_offset: 2,
2247 vmoid: vmo_id.id,
2248 length: 1,
2249 ..Default::default()
2250 })
2251 .await
2252 .unwrap();
2253
2254 poll_fn(|cx: &mut Context<'_>| {
2256 if waiting_readers.lock().len() == 2 {
2257 Poll::Ready(())
2258 } else {
2259 cx.waker().wake_by_ref();
2261 Poll::Pending
2262 }
2263 })
2264 .await;
2265
2266 let mut response = BlockFifoResponse::default();
2267 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2268
2269 let (id, tx) = waiting_readers.lock().pop().unwrap();
2270 tx.send(()).unwrap();
2271
2272 reader.read_entries(&mut response).await.unwrap();
2273 assert_eq!(response.status, zx::sys::ZX_OK);
2274 assert_eq!(response.reqid, id);
2275
2276 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2277
2278 let (id, tx) = waiting_readers.lock().pop().unwrap();
2279 tx.send(()).unwrap();
2280
2281 reader.read_entries(&mut response).await.unwrap();
2282 assert_eq!(response.status, zx::sys::ZX_OK);
2283 assert_eq!(response.reqid, id);
2284 }
2285 );
2286 }
2287
2288 #[fuchsia::test]
2289 async fn test_groups() {
2290 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2291
2292 futures::join!(
2293 async move {
2294 let block_server = BlockServer::new(
2295 BLOCK_SIZE,
2296 Arc::new(MockInterface {
2297 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2298 ..MockInterface::default()
2299 }),
2300 );
2301 block_server.handle_requests(stream).await.unwrap();
2302 },
2303 async move {
2304 let (session_proxy, server) = fidl::endpoints::create_proxy();
2305
2306 proxy.open_session(server).unwrap();
2307
2308 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2309 let vmo_id = session_proxy
2310 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2311 .await
2312 .unwrap()
2313 .unwrap();
2314
2315 let mut fifo =
2316 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2317 let (mut reader, mut writer) = fifo.async_io();
2318
2319 writer
2320 .write_entries(&BlockFifoRequest {
2321 command: BlockFifoCommand {
2322 opcode: BlockOpcode::Read.into_primitive(),
2323 flags: BlockIoFlag::GROUP_ITEM.bits(),
2324 ..Default::default()
2325 },
2326 group: 1,
2327 vmoid: vmo_id.id,
2328 length: 1,
2329 ..Default::default()
2330 })
2331 .await
2332 .unwrap();
2333
2334 writer
2335 .write_entries(&BlockFifoRequest {
2336 command: BlockFifoCommand {
2337 opcode: BlockOpcode::Read.into_primitive(),
2338 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2339 ..Default::default()
2340 },
2341 reqid: 2,
2342 group: 1,
2343 vmoid: vmo_id.id,
2344 length: 1,
2345 ..Default::default()
2346 })
2347 .await
2348 .unwrap();
2349
2350 let mut response = BlockFifoResponse::default();
2351 reader.read_entries(&mut response).await.unwrap();
2352 assert_eq!(response.status, zx::sys::ZX_OK);
2353 assert_eq!(response.reqid, 2);
2354 assert_eq!(response.group, 1);
2355 }
2356 );
2357 }
2358
2359 #[fuchsia::test]
2360 async fn test_group_error() {
2361 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2362
2363 let counter = Arc::new(AtomicU64::new(0));
2364 let counter_clone = counter.clone();
2365
2366 futures::join!(
2367 async move {
2368 let block_server = BlockServer::new(
2369 BLOCK_SIZE,
2370 Arc::new(MockInterface {
2371 read_hook: Some(Box::new(move |_, _, _, _| {
2372 counter_clone.fetch_add(1, Ordering::Relaxed);
2373 Box::pin(async { Err(zx::Status::BAD_STATE) })
2374 })),
2375 ..MockInterface::default()
2376 }),
2377 );
2378 block_server.handle_requests(stream).await.unwrap();
2379 },
2380 async move {
2381 let (session_proxy, server) = fidl::endpoints::create_proxy();
2382
2383 proxy.open_session(server).unwrap();
2384
2385 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2386 let vmo_id = session_proxy
2387 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2388 .await
2389 .unwrap()
2390 .unwrap();
2391
2392 let mut fifo =
2393 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2394 let (mut reader, mut writer) = fifo.async_io();
2395
2396 writer
2397 .write_entries(&BlockFifoRequest {
2398 command: BlockFifoCommand {
2399 opcode: BlockOpcode::Read.into_primitive(),
2400 flags: BlockIoFlag::GROUP_ITEM.bits(),
2401 ..Default::default()
2402 },
2403 group: 1,
2404 vmoid: vmo_id.id,
2405 length: 1,
2406 ..Default::default()
2407 })
2408 .await
2409 .unwrap();
2410
2411 poll_fn(|cx: &mut Context<'_>| {
2413 if counter.load(Ordering::Relaxed) == 1 {
2414 Poll::Ready(())
2415 } else {
2416 cx.waker().wake_by_ref();
2418 Poll::Pending
2419 }
2420 })
2421 .await;
2422
2423 let mut response = BlockFifoResponse::default();
2424 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2425
2426 writer
2427 .write_entries(&BlockFifoRequest {
2428 command: BlockFifoCommand {
2429 opcode: BlockOpcode::Read.into_primitive(),
2430 flags: BlockIoFlag::GROUP_ITEM.bits(),
2431 ..Default::default()
2432 },
2433 group: 1,
2434 vmoid: vmo_id.id,
2435 length: 1,
2436 ..Default::default()
2437 })
2438 .await
2439 .unwrap();
2440
2441 writer
2442 .write_entries(&BlockFifoRequest {
2443 command: BlockFifoCommand {
2444 opcode: BlockOpcode::Read.into_primitive(),
2445 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2446 ..Default::default()
2447 },
2448 reqid: 2,
2449 group: 1,
2450 vmoid: vmo_id.id,
2451 length: 1,
2452 ..Default::default()
2453 })
2454 .await
2455 .unwrap();
2456
2457 reader.read_entries(&mut response).await.unwrap();
2458 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2459 assert_eq!(response.reqid, 2);
2460 assert_eq!(response.group, 1);
2461
2462 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2463
2464 assert_eq!(counter.load(Ordering::Relaxed), 1);
2466 }
2467 );
2468 }
2469
2470 #[fuchsia::test]
2471 async fn test_group_with_two_lasts() {
2472 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2473
2474 let (tx, rx) = oneshot::channel();
2475
2476 futures::join!(
2477 async move {
2478 let rx = Mutex::new(Some(rx));
2479 let block_server = BlockServer::new(
2480 BLOCK_SIZE,
2481 Arc::new(MockInterface {
2482 read_hook: Some(Box::new(move |_, _, _, _| {
2483 let rx = rx.lock().take().unwrap();
2484 Box::pin(async {
2485 let _ = rx.await;
2486 Ok(())
2487 })
2488 })),
2489 ..MockInterface::default()
2490 }),
2491 );
2492 block_server.handle_requests(stream).await.unwrap();
2493 },
2494 async move {
2495 let (session_proxy, server) = fidl::endpoints::create_proxy();
2496
2497 proxy.open_session(server).unwrap();
2498
2499 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2500 let vmo_id = session_proxy
2501 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2502 .await
2503 .unwrap()
2504 .unwrap();
2505
2506 let mut fifo =
2507 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2508 let (mut reader, mut writer) = fifo.async_io();
2509
2510 writer
2511 .write_entries(&BlockFifoRequest {
2512 command: BlockFifoCommand {
2513 opcode: BlockOpcode::Read.into_primitive(),
2514 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2515 ..Default::default()
2516 },
2517 reqid: 1,
2518 group: 1,
2519 vmoid: vmo_id.id,
2520 length: 1,
2521 ..Default::default()
2522 })
2523 .await
2524 .unwrap();
2525
2526 writer
2527 .write_entries(&BlockFifoRequest {
2528 command: BlockFifoCommand {
2529 opcode: BlockOpcode::Read.into_primitive(),
2530 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2531 ..Default::default()
2532 },
2533 reqid: 2,
2534 group: 1,
2535 vmoid: vmo_id.id,
2536 length: 1,
2537 ..Default::default()
2538 })
2539 .await
2540 .unwrap();
2541
2542 writer
2544 .write_entries(&BlockFifoRequest {
2545 command: BlockFifoCommand {
2546 opcode: BlockOpcode::CloseVmo.into_primitive(),
2547 ..Default::default()
2548 },
2549 reqid: 3,
2550 vmoid: vmo_id.id,
2551 ..Default::default()
2552 })
2553 .await
2554 .unwrap();
2555
2556 let mut response = BlockFifoResponse::default();
2558 reader.read_entries(&mut response).await.unwrap();
2559 assert_eq!(response.status, zx::sys::ZX_OK);
2560 assert_eq!(response.reqid, 3);
2561
2562 tx.send(()).unwrap();
2564
2565 let mut response = BlockFifoResponse::default();
2568 reader.read_entries(&mut response).await.unwrap();
2569 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2570 assert_eq!(response.reqid, 1);
2571 assert_eq!(response.group, 1);
2572 }
2573 );
2574 }
2575
2576 #[fuchsia::test(allow_stalls = false)]
2577 async fn test_requests_dont_block_sessions() {
2578 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2579
2580 let (tx, rx) = oneshot::channel();
2581
2582 fasync::Task::local(async move {
2583 let rx = Mutex::new(Some(rx));
2584 let block_server = BlockServer::new(
2585 BLOCK_SIZE,
2586 Arc::new(MockInterface {
2587 read_hook: Some(Box::new(move |_, _, _, _| {
2588 let rx = rx.lock().take().unwrap();
2589 Box::pin(async {
2590 let _ = rx.await;
2591 Ok(())
2592 })
2593 })),
2594 ..MockInterface::default()
2595 }),
2596 );
2597 block_server.handle_requests(stream).await.unwrap();
2598 })
2599 .detach();
2600
2601 let mut fut = pin!(async {
2602 let (session_proxy, server) = fidl::endpoints::create_proxy();
2603
2604 proxy.open_session(server).unwrap();
2605
2606 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2607 let vmo_id = session_proxy
2608 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2609 .await
2610 .unwrap()
2611 .unwrap();
2612
2613 let mut fifo =
2614 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2615 let (mut reader, mut writer) = fifo.async_io();
2616
2617 writer
2618 .write_entries(&BlockFifoRequest {
2619 command: BlockFifoCommand {
2620 opcode: BlockOpcode::Read.into_primitive(),
2621 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2622 ..Default::default()
2623 },
2624 reqid: 1,
2625 group: 1,
2626 vmoid: vmo_id.id,
2627 length: 1,
2628 ..Default::default()
2629 })
2630 .await
2631 .unwrap();
2632
2633 let mut response = BlockFifoResponse::default();
2634 reader.read_entries(&mut response).await.unwrap();
2635 assert_eq!(response.status, zx::sys::ZX_OK);
2636 });
2637
2638 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2640
2641 let mut fut2 = pin!(proxy.get_volume_info());
2642
2643 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2645
2646 let _ = tx.send(());
2649
2650 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2651 }
2652
2653 #[fuchsia::test]
2654 async fn test_request_flow_control() {
2655 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2656
2657 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2660 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2661 let event_clone = event.clone();
2662 futures::join!(
2663 async move {
2664 let block_server = BlockServer::new(
2665 BLOCK_SIZE,
2666 Arc::new(MockInterface {
2667 read_hook: Some(Box::new(move |_, _, _, _| {
2668 let event_clone = event_clone.clone();
2669 Box::pin(async move {
2670 if !event_clone.1.load(Ordering::SeqCst) {
2671 event_clone.0.listen().await;
2672 }
2673 Ok(())
2674 })
2675 })),
2676 ..MockInterface::default()
2677 }),
2678 );
2679 block_server.handle_requests(stream).await.unwrap();
2680 },
2681 async move {
2682 let (session_proxy, server) = fidl::endpoints::create_proxy();
2683
2684 proxy.open_session(server).unwrap();
2685
2686 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2687 let vmo_id = session_proxy
2688 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2689 .await
2690 .unwrap()
2691 .unwrap();
2692
2693 let mut fifo =
2694 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2695 let (mut reader, mut writer) = fifo.async_io();
2696
2697 for i in 0..MAX_REQUESTS {
2698 writer
2699 .write_entries(&BlockFifoRequest {
2700 command: BlockFifoCommand {
2701 opcode: BlockOpcode::Read.into_primitive(),
2702 ..Default::default()
2703 },
2704 reqid: (i + 1) as u32,
2705 dev_offset: i,
2706 vmoid: vmo_id.id,
2707 length: 1,
2708 ..Default::default()
2709 })
2710 .await
2711 .unwrap();
2712 }
2713 assert!(
2714 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2715 command: BlockFifoCommand {
2716 opcode: BlockOpcode::Read.into_primitive(),
2717 ..Default::default()
2718 },
2719 reqid: u32::MAX,
2720 dev_offset: MAX_REQUESTS,
2721 vmoid: vmo_id.id,
2722 length: 1,
2723 ..Default::default()
2724 })))
2725 .is_pending()
2726 );
2727 event.1.store(true, Ordering::SeqCst);
2729 event.0.notify(usize::MAX);
2730 let mut finished_reqids = vec![];
2732 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2733 let mut response = BlockFifoResponse::default();
2734 reader.read_entries(&mut response).await.unwrap();
2735 assert_eq!(response.status, zx::sys::ZX_OK);
2736 finished_reqids.push(response.reqid);
2737 writer
2738 .write_entries(&BlockFifoRequest {
2739 command: BlockFifoCommand {
2740 opcode: BlockOpcode::Read.into_primitive(),
2741 ..Default::default()
2742 },
2743 reqid: (i + 1) as u32,
2744 dev_offset: i,
2745 vmoid: vmo_id.id,
2746 length: 1,
2747 ..Default::default()
2748 })
2749 .await
2750 .unwrap();
2751 }
2752 let mut response = BlockFifoResponse::default();
2753 for _ in 0..MAX_REQUESTS {
2754 reader.read_entries(&mut response).await.unwrap();
2755 assert_eq!(response.status, zx::sys::ZX_OK);
2756 finished_reqids.push(response.reqid);
2757 }
2758 finished_reqids.sort();
2761 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2762 let mut i = 1;
2763 for reqid in finished_reqids {
2764 assert_eq!(reqid, i);
2765 i += 1;
2766 }
2767 }
2768 );
2769 }
2770
2771 #[fuchsia::test]
2772 async fn test_passthrough_io_with_fixed_map() {
2773 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2774
2775 let expected_op = Arc::new(Mutex::new(None));
2776 let expected_op_clone = expected_op.clone();
2777 futures::join!(
2778 async {
2779 let block_server = BlockServer::new(
2780 BLOCK_SIZE,
2781 Arc::new(IoMockInterface {
2782 return_errors: false,
2783 do_checks: true,
2784 expected_op: expected_op_clone,
2785 }),
2786 );
2787 block_server.handle_requests(stream).await.unwrap();
2788 },
2789 async move {
2790 let (session_proxy, server) = fidl::endpoints::create_proxy();
2791
2792 let mapping = fblock::BlockOffsetMapping {
2793 source_block_offset: 0,
2794 target_block_offset: 10,
2795 length: 20,
2796 };
2797 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2798
2799 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2800 let vmo_id = session_proxy
2801 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2802 .await
2803 .unwrap()
2804 .unwrap();
2805
2806 let mut fifo =
2807 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2808 let (mut reader, mut writer) = fifo.async_io();
2809
2810 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2812 writer
2813 .write_entries(&BlockFifoRequest {
2814 command: BlockFifoCommand {
2815 opcode: BlockOpcode::Read.into_primitive(),
2816 ..Default::default()
2817 },
2818 vmoid: vmo_id.id,
2819 dev_offset: 1,
2820 length: 2,
2821 vmo_offset: 3,
2822 ..Default::default()
2823 })
2824 .await
2825 .unwrap();
2826
2827 let mut response = BlockFifoResponse::default();
2828 reader.read_entries(&mut response).await.unwrap();
2829 assert_eq!(response.status, zx::sys::ZX_OK);
2830
2831 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2833 writer
2834 .write_entries(&BlockFifoRequest {
2835 command: BlockFifoCommand {
2836 opcode: BlockOpcode::Write.into_primitive(),
2837 ..Default::default()
2838 },
2839 vmoid: vmo_id.id,
2840 dev_offset: 4,
2841 length: 5,
2842 vmo_offset: 6,
2843 ..Default::default()
2844 })
2845 .await
2846 .unwrap();
2847
2848 reader.read_entries(&mut response).await.unwrap();
2849 assert_eq!(response.status, zx::sys::ZX_OK);
2850
2851 *expected_op.lock() = Some(ExpectedOp::Flush);
2853 writer
2854 .write_entries(&BlockFifoRequest {
2855 command: BlockFifoCommand {
2856 opcode: BlockOpcode::Flush.into_primitive(),
2857 ..Default::default()
2858 },
2859 ..Default::default()
2860 })
2861 .await
2862 .unwrap();
2863
2864 reader.read_entries(&mut response).await.unwrap();
2865 assert_eq!(response.status, zx::sys::ZX_OK);
2866
2867 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2869 writer
2870 .write_entries(&BlockFifoRequest {
2871 command: BlockFifoCommand {
2872 opcode: BlockOpcode::Trim.into_primitive(),
2873 ..Default::default()
2874 },
2875 dev_offset: 7,
2876 length: 3,
2877 ..Default::default()
2878 })
2879 .await
2880 .unwrap();
2881
2882 reader.read_entries(&mut response).await.unwrap();
2883 assert_eq!(response.status, zx::sys::ZX_OK);
2884
2885 *expected_op.lock() = None;
2887 writer
2888 .write_entries(&BlockFifoRequest {
2889 command: BlockFifoCommand {
2890 opcode: BlockOpcode::Read.into_primitive(),
2891 ..Default::default()
2892 },
2893 vmoid: vmo_id.id,
2894 dev_offset: 19,
2895 length: 2,
2896 vmo_offset: 3,
2897 ..Default::default()
2898 })
2899 .await
2900 .unwrap();
2901
2902 reader.read_entries(&mut response).await.unwrap();
2903 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2904
2905 std::mem::drop(proxy);
2906 }
2907 );
2908 }
2909
2910 #[fuchsia::test]
2911 fn operation_map() {
2912 const BLOCK_SIZE: u32 = 512;
2913
2914 #[track_caller]
2915 fn expect_map_result(
2916 mut operation: Operation,
2917 mapping: Option<BlockOffsetMapping>,
2918 max_blocks: Option<NonZero<u32>>,
2919 expected_operations: Vec<Operation>,
2920 ) {
2921 let mut ops = vec![];
2922 while let Some(remainder) =
2923 operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2924 {
2925 ops.push(operation);
2926 operation = remainder;
2927 }
2928 ops.push(operation);
2929 assert_eq!(ops, expected_operations);
2930 }
2931
2932 expect_map_result(
2934 Operation::Read {
2935 device_block_offset: 10,
2936 block_count: 200,
2937 _unused: 0,
2938 vmo_offset: 0,
2939 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2940 },
2941 None,
2942 None,
2943 vec![Operation::Read {
2944 device_block_offset: 10,
2945 block_count: 200,
2946 _unused: 0,
2947 vmo_offset: 0,
2948 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2949 }],
2950 );
2951
2952 expect_map_result(
2954 Operation::Read {
2955 device_block_offset: 10,
2956 block_count: 200,
2957 _unused: 0,
2958 vmo_offset: 0,
2959 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2960 },
2961 None,
2962 NonZero::new(120),
2963 vec![
2964 Operation::Read {
2965 device_block_offset: 10,
2966 block_count: 120,
2967 _unused: 0,
2968 vmo_offset: 0,
2969 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2970 },
2971 Operation::Read {
2972 device_block_offset: 130,
2973 block_count: 80,
2974 _unused: 0,
2975 vmo_offset: 120 * BLOCK_SIZE as u64,
2976 options: ReadOptions {
2977 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
2979 },
2980 },
2981 ],
2982 );
2983 expect_map_result(
2984 Operation::Trim { device_block_offset: 10, block_count: 200 },
2985 None,
2986 NonZero::new(120),
2987 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2988 );
2989
2990 expect_map_result(
2992 Operation::Read {
2993 device_block_offset: 10,
2994 block_count: 200,
2995 _unused: 0,
2996 vmo_offset: 0,
2997 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2998 },
2999 Some(BlockOffsetMapping {
3000 source_block_offset: 10,
3001 target_block_offset: 100,
3002 length: 200,
3003 }),
3004 NonZero::new(120),
3005 vec![
3006 Operation::Read {
3007 device_block_offset: 100,
3008 block_count: 120,
3009 _unused: 0,
3010 vmo_offset: 0,
3011 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3012 },
3013 Operation::Read {
3014 device_block_offset: 220,
3015 block_count: 80,
3016 _unused: 0,
3017 vmo_offset: 120 * BLOCK_SIZE as u64,
3018 options: ReadOptions {
3019 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3020 },
3021 },
3022 ],
3023 );
3024 expect_map_result(
3025 Operation::Trim { device_block_offset: 10, block_count: 200 },
3026 Some(BlockOffsetMapping {
3027 source_block_offset: 10,
3028 target_block_offset: 100,
3029 length: 200,
3030 }),
3031 NonZero::new(120),
3032 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3033 );
3034 }
3035
3036 #[fuchsia::test]
3038 async fn test_pre_barrier_flush_failure() {
3039 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3040
3041 struct NoBarrierInterface;
3042 impl super::async_interface::Interface for NoBarrierInterface {
3043 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3044 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3045 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3047 block_range: Some(0..100),
3048 type_guid: [0; 16],
3049 instance_guid: [0; 16],
3050 name: "test".to_string(),
3051 flags: 0,
3052 }))
3053 }
3054 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3055 Ok(())
3056 }
3057 async fn read(
3058 &self,
3059 _: u64,
3060 _: u32,
3061 _: &Arc<zx::Vmo>,
3062 _: u64,
3063 _: ReadOptions,
3064 _: TraceFlowId,
3065 ) -> Result<(), zx::Status> {
3066 unreachable!()
3067 }
3068 async fn write(
3069 &self,
3070 _: u64,
3071 _: u32,
3072 _: &Arc<zx::Vmo>,
3073 _: u64,
3074 _: WriteOptions,
3075 _: TraceFlowId,
3076 ) -> Result<(), zx::Status> {
3077 panic!("Write should not be called");
3078 }
3079 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3080 Err(zx::Status::IO)
3081 }
3082 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3083 unreachable!()
3084 }
3085 }
3086
3087 futures::join!(
3088 async move {
3089 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3090 block_server.handle_requests(stream).await.unwrap();
3091 },
3092 async move {
3093 let (session_proxy, server) = fidl::endpoints::create_proxy();
3094 proxy.open_session(server).unwrap();
3095 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3096 let vmo_id = session_proxy
3097 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3098 .await
3099 .unwrap()
3100 .unwrap();
3101
3102 let mut fifo =
3103 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3104 let (mut reader, mut writer) = fifo.async_io();
3105
3106 writer
3107 .write_entries(&BlockFifoRequest {
3108 command: BlockFifoCommand {
3109 opcode: BlockOpcode::Write.into_primitive(),
3110 flags: BlockIoFlag::PRE_BARRIER.bits(),
3111 ..Default::default()
3112 },
3113 vmoid: vmo_id.id,
3114 length: 1,
3115 ..Default::default()
3116 })
3117 .await
3118 .unwrap();
3119
3120 let mut response = BlockFifoResponse::default();
3121 reader.read_entries(&mut response).await.unwrap();
3122 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3123 }
3124 );
3125 }
3126
3127 #[fuchsia::test]
3130 async fn test_post_barrier_write_failure() {
3131 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3132
3133 struct NoBarrierInterface;
3134 impl super::async_interface::Interface for NoBarrierInterface {
3135 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3136 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3137 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3139 block_range: Some(0..100),
3140 type_guid: [0; 16],
3141 instance_guid: [0; 16],
3142 name: "test".to_string(),
3143 flags: 0,
3144 }))
3145 }
3146 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3147 Ok(())
3148 }
3149 async fn read(
3150 &self,
3151 _: u64,
3152 _: u32,
3153 _: &Arc<zx::Vmo>,
3154 _: u64,
3155 _: ReadOptions,
3156 _: TraceFlowId,
3157 ) -> Result<(), zx::Status> {
3158 unreachable!()
3159 }
3160 async fn write(
3161 &self,
3162 _: u64,
3163 _: u32,
3164 _: &Arc<zx::Vmo>,
3165 _: u64,
3166 _: WriteOptions,
3167 _: TraceFlowId,
3168 ) -> Result<(), zx::Status> {
3169 Err(zx::Status::IO)
3170 }
3171 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3172 panic!("Flush should not be called")
3173 }
3174 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3175 unreachable!()
3176 }
3177 }
3178
3179 futures::join!(
3180 async move {
3181 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3182 block_server.handle_requests(stream).await.unwrap();
3183 },
3184 async move {
3185 let (session_proxy, server) = fidl::endpoints::create_proxy();
3186 proxy.open_session(server).unwrap();
3187 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3188 let vmo_id = session_proxy
3189 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3190 .await
3191 .unwrap()
3192 .unwrap();
3193
3194 let mut fifo =
3195 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3196 let (mut reader, mut writer) = fifo.async_io();
3197
3198 writer
3199 .write_entries(&BlockFifoRequest {
3200 command: BlockFifoCommand {
3201 opcode: BlockOpcode::Write.into_primitive(),
3202 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3203 ..Default::default()
3204 },
3205 vmoid: vmo_id.id,
3206 length: 1,
3207 ..Default::default()
3208 })
3209 .await
3210 .unwrap();
3211
3212 let mut response = BlockFifoResponse::default();
3213 reader.read_entries(&mut response).await.unwrap();
3214 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3215 }
3216 );
3217 }
3218}