1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[cfg(test)]
25mod decompression_tests;
26
27pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
28
29type TraceFlowId = Option<NonZero<u64>>;
30
31#[derive(Clone)]
32pub enum DeviceInfo {
33 Block(BlockInfo),
34 Partition(PartitionInfo),
35}
36
37impl DeviceInfo {
38 pub fn device_flags(&self) -> fblock::DeviceFlag {
39 match self {
40 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
41 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
42 }
43 }
44
45 pub fn block_count(&self) -> Option<u64> {
46 match self {
47 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
48 Self::Partition(PartitionInfo { block_range, .. }) => {
49 block_range.as_ref().map(|range| range.end - range.start)
50 }
51 }
52 }
53
54 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
55 match self {
56 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
57 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
58 max_transfer_blocks.clone()
59 }
60 }
61 }
62
63 fn max_transfer_size(&self, block_size: u32) -> u32 {
64 if let Some(max_blocks) = self.max_transfer_blocks() {
65 max_blocks.get() * block_size
66 } else {
67 MAX_TRANSFER_UNBOUNDED
68 }
69 }
70}
71
72#[derive(Clone, Default)]
74pub struct BlockInfo {
75 pub device_flags: fblock::DeviceFlag,
76 pub block_count: u64,
77 pub max_transfer_blocks: Option<NonZero<u32>>,
78}
79
80#[derive(Clone, Default)]
82pub struct PartitionInfo {
83 pub device_flags: fblock::DeviceFlag,
85 pub max_transfer_blocks: Option<NonZero<u32>>,
86 pub block_range: Option<Range<u64>>,
90 pub type_guid: [u8; 16],
91 pub instance_guid: [u8; 16],
92 pub name: String,
93 pub flags: u64,
94}
95
96struct ActiveRequest<S> {
99 session: S,
100 group_or_request: GroupOrRequest,
101 trace_flow_id: TraceFlowId,
102 _epoch_guard: EpochGuard<'static>,
103 status: zx::Status,
104 count: u32,
105 req_id: Option<u32>,
106 decompression_info: Option<DecompressionInfo>,
107}
108
109struct DecompressionInfo {
110 compressed_range: Range<usize>,
112
113 uncompressed_range: Range<u64>,
115
116 bytes_so_far: u64,
117 mapping: Arc<VmoMapping>,
118 buffer: Option<Buffer<'static>>,
119}
120
121impl DecompressionInfo {
122 fn uncompressed_slice(&self) -> *mut [u8] {
124 std::ptr::slice_from_raw_parts_mut(
125 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
126 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
127 )
128 }
129}
130
131pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
132
133impl<S> Default for ActiveRequests<S> {
134 fn default() -> Self {
135 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
136 }
137}
138
139impl<S> ActiveRequests<S> {
140 fn complete_and_take_response(
141 &self,
142 request_id: RequestId,
143 status: zx::Status,
144 ) -> Option<(S, BlockFifoResponse)> {
145 self.0.lock().complete_and_take_response(request_id, status)
146 }
147
148 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
149 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
150 }
151}
152
153struct ActiveRequestsInner<S> {
154 requests: Slab<ActiveRequest<S>>,
155}
156
157impl<S> ActiveRequestsInner<S> {
159 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
161 let group = &mut self.requests[request_id.0];
162
163 group.count = group.count.checked_sub(1).unwrap();
164 if status != zx::Status::OK && group.status == zx::Status::OK {
165 group.status = status
166 }
167
168 fuchsia_trace::duration!(
169 "storage",
170 "block_server::finish_transaction",
171 "request_id" => request_id.0,
172 "group_completed" => group.count == 0,
173 "status" => status.into_raw());
174 if let Some(trace_flow_id) = group.trace_flow_id {
175 fuchsia_trace::flow_step!(
176 "storage",
177 "block_server::finish_request",
178 trace_flow_id.get().into()
179 );
180 }
181
182 if group.count == 0
183 && group.status == zx::Status::OK
184 && let Some(info) = &mut group.decompression_info
185 {
186 thread_local! {
187 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
188 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
189 }
190 DECOMPRESSOR.with(|decompressor| {
191 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
193 let mut decompressor = decompressor.borrow_mut();
194 if let Err(error) = decompressor.decompress_to_buffer(
195 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
196 target_slice,
197 ) {
198 log::warn!(error:?; "Decompression error");
199 group.status = zx::Status::IO_DATA_INTEGRITY;
200 };
201 });
202 }
203 }
204
205 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
207 let group = &self.requests[request_id.0];
208 match group.req_id {
209 Some(reqid) if group.count == 0 => {
210 let group = self.requests.remove(request_id.0);
211 Some((
212 group.session,
213 BlockFifoResponse {
214 status: group.status.into_raw(),
215 reqid,
216 group: group.group_or_request.group_id().unwrap_or(0),
217 ..Default::default()
218 },
219 ))
220 }
221 _ => None,
222 }
223 }
224
225 fn complete_and_take_response(
227 &mut self,
228 request_id: RequestId,
229 status: zx::Status,
230 ) -> Option<(S, BlockFifoResponse)> {
231 self.complete(request_id, status);
232 self.take_response(request_id)
233 }
234}
235
236pub struct BlockServer<SM> {
239 block_size: u32,
240 session_manager: Arc<SM>,
241}
242
243#[derive(Debug)]
245pub struct BlockOffsetMapping {
246 source_block_offset: u64,
247 target_block_offset: u64,
248 length: u64,
249}
250
251impl BlockOffsetMapping {
252 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
253 blocks.0 >= self.source_block_offset
254 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
255 }
256}
257
258impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
259 type Error = zx::Status;
260
261 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
262 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
263 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
264 Ok(Self {
265 source_block_offset: wire.source_block_offset,
266 target_block_offset: wire.target_block_offset,
267 length: wire.length,
268 })
269 }
270}
271
272pub struct OffsetMap {
274 mapping: Option<BlockOffsetMapping>,
275 max_transfer_blocks: Option<NonZero<u32>>,
276}
277
278impl OffsetMap {
279 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
281 Self { mapping: Some(mapping), max_transfer_blocks }
282 }
283
284 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
286 Self { mapping: None, max_transfer_blocks }
287 }
288
289 pub fn is_empty(&self) -> bool {
290 self.mapping.is_none()
291 }
292
293 fn mapping(&self) -> Option<&BlockOffsetMapping> {
294 self.mapping.as_ref()
295 }
296
297 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
298 self.max_transfer_blocks
299 }
300}
301
302pub trait SessionManager: 'static {
305 const SUPPORTS_DECOMPRESSION: bool;
306
307 type Session;
308
309 fn on_attach_vmo(
310 self: Arc<Self>,
311 vmo: &Arc<zx::Vmo>,
312 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
313
314 fn open_session(
319 self: Arc<Self>,
320 stream: fblock::SessionRequestStream,
321 offset_map: OffsetMap,
322 block_size: u32,
323 ) -> impl Future<Output = Result<(), Error>> + Send;
324
325 fn get_info(&self) -> Cow<'_, DeviceInfo>;
327
328 fn get_volume_info(
330 &self,
331 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
332 {
333 async { Err(zx::Status::NOT_SUPPORTED) }
334 }
335
336 fn query_slices(
338 &self,
339 _start_slices: &[u64],
340 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
341 async { Err(zx::Status::NOT_SUPPORTED) }
342 }
343
344 fn extend(
346 &self,
347 _start_slice: u64,
348 _slice_count: u64,
349 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
350 async { Err(zx::Status::NOT_SUPPORTED) }
351 }
352
353 fn shrink(
355 &self,
356 _start_slice: u64,
357 _slice_count: u64,
358 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
359 async { Err(zx::Status::NOT_SUPPORTED) }
360 }
361
362 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
364}
365
366pub trait IntoSessionManager {
367 type SM: SessionManager;
368
369 fn into_session_manager(self) -> Arc<Self::SM>;
370}
371
372impl<SM: SessionManager> BlockServer<SM> {
373 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
374 Self { block_size, session_manager: session_manager.into_session_manager() }
375 }
376
377 pub fn session_manager(&self) -> &SM {
378 self.session_manager.as_ref()
379 }
380
381 pub async fn handle_requests(
383 &self,
384 mut requests: fblock::BlockRequestStream,
385 ) -> Result<(), Error> {
386 let scope = fasync::Scope::new();
387 loop {
388 match requests.try_next().await {
389 Ok(Some(request)) => {
390 if let Some(session) = self.handle_request(request).await? {
391 scope.spawn(session.map(|_| ()));
392 }
393 }
394 Ok(None) => break,
395 Err(err) => log::warn!(err:?; "Invalid request"),
396 }
397 }
398 scope.await;
399 Ok(())
400 }
401
402 async fn handle_request(
405 &self,
406 request: fblock::BlockRequest,
407 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
408 match request {
409 fblock::BlockRequest::GetInfo { responder } => {
410 let info = self.device_info();
411 let max_transfer_size = info.max_transfer_size(self.block_size);
412 let (block_count, mut flags) = match info.as_ref() {
413 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
414 (*block_count, *device_flags)
415 }
416 DeviceInfo::Partition(partition_info) => {
417 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
418 range.end - range.start
419 } else {
420 let volume_info = self.session_manager.get_volume_info().await?;
421 volume_info.0.slice_size * volume_info.1.partition_slice_count
422 / self.block_size as u64
423 };
424 (block_count, partition_info.device_flags)
425 }
426 };
427 if SM::SUPPORTS_DECOMPRESSION {
428 flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
429 }
430 responder.send(Ok(&fblock::BlockInfo {
431 block_count,
432 block_size: self.block_size,
433 max_transfer_size,
434 flags,
435 }))?;
436 }
437 fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
438 let info = self.device_info();
439 return Ok(Some(self.session_manager.clone().open_session(
440 session.into_stream(),
441 OffsetMap::empty(info.max_transfer_blocks()),
442 self.block_size,
443 )));
444 }
445 fblock::BlockRequest::OpenSessionWithOffsetMap {
446 session,
447 mapping,
448 control_handle: _,
449 } => {
450 let info = self.device_info();
451 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
452 Ok(m) => m,
453 Err(status) => {
454 session.close_with_epitaph(status)?;
455 return Ok(None);
456 }
457 };
458 if let Some(max) = info.block_count() {
459 if initial_mapping.target_block_offset + initial_mapping.length > max {
460 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
461 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
462 return Ok(None);
463 }
464 }
465 return Ok(Some(self.session_manager.clone().open_session(
466 session.into_stream(),
467 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
468 self.block_size,
469 )));
470 }
471 fblock::BlockRequest::GetTypeGuid { responder } => {
472 let info = self.device_info();
473 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
474 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
475 guid.value.copy_from_slice(&partition_info.type_guid);
476 responder.send(zx::sys::ZX_OK, Some(&guid))?;
477 } else {
478 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
479 }
480 }
481 fblock::BlockRequest::GetInstanceGuid { responder } => {
482 let info = self.device_info();
483 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
484 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
485 guid.value.copy_from_slice(&partition_info.instance_guid);
486 responder.send(zx::sys::ZX_OK, Some(&guid))?;
487 } else {
488 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
489 }
490 }
491 fblock::BlockRequest::GetName { responder } => {
492 let info = self.device_info();
493 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
494 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
495 } else {
496 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497 }
498 }
499 fblock::BlockRequest::GetMetadata { responder } => {
500 let info = self.device_info();
501 if let DeviceInfo::Partition(info) = info.as_ref() {
502 let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503 type_guid.value.copy_from_slice(&info.type_guid);
504 let mut instance_guid =
505 fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
506 instance_guid.value.copy_from_slice(&info.instance_guid);
507 responder.send(Ok(&fblock::BlockGetMetadataResponse {
508 name: Some(info.name.clone()),
509 type_guid: Some(type_guid),
510 instance_guid: Some(instance_guid),
511 start_block_offset: info.block_range.as_ref().map(|range| range.start),
512 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
513 flags: Some(info.flags),
514 ..Default::default()
515 }))?;
516 } else {
517 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
518 }
519 }
520 fblock::BlockRequest::QuerySlices { responder, start_slices } => {
521 match self.session_manager.query_slices(&start_slices).await {
522 Ok(mut results) => {
523 let results_len = results.len();
524 assert!(results_len <= 16);
525 results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
526 responder.send(
527 zx::sys::ZX_OK,
528 &results.try_into().unwrap(),
529 results_len as u64,
530 )?;
531 }
532 Err(s) => {
533 responder.send(
534 s.into_raw(),
535 &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
536 0,
537 )?;
538 }
539 }
540 }
541 fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
542 match self.session_manager.get_volume_info().await {
543 Ok((manager_info, volume_info)) => {
544 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
545 }
546 Err(s) => responder.send(s.into_raw(), None, None)?,
547 }
548 }
549 fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
550 responder.send(
551 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
552 .into_raw(),
553 )?;
554 }
555 fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
556 responder.send(
557 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
558 .into_raw(),
559 )?;
560 }
561 fblock::BlockRequest::Destroy { responder, .. } => {
562 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
563 }
564 }
565 Ok(None)
566 }
567
568 fn device_info(&self) -> Cow<'_, DeviceInfo> {
569 self.session_manager.get_info()
570 }
571}
572
573struct SessionHelper<SM: SessionManager> {
574 session_manager: Arc<SM>,
575 offset_map: OffsetMap,
576 block_size: u32,
577 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
578 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
579}
580
581struct VmoMapping {
582 base: usize,
583 size: usize,
584}
585
586impl VmoMapping {
587 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
588 let size = vmo.get_size().unwrap() as usize;
589 Ok(Arc::new(Self {
590 base: fuchsia_runtime::vmar_root_self()
591 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
592 .inspect_err(|error| {
593 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
594 })?,
595 size,
596 }))
597 }
598}
599
600impl Drop for VmoMapping {
601 fn drop(&mut self) {
602 unsafe {
604 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
605 }
606 }
607}
608
609impl<SM: SessionManager> SessionHelper<SM> {
610 fn new(
611 session_manager: Arc<SM>,
612 offset_map: OffsetMap,
613 block_size: u32,
614 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
615 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
616 Ok((
617 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
618 fifo,
619 ))
620 }
621
622 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
623 match request {
624 fblock::SessionRequest::GetFifo { responder } => {
625 let rights = zx::Rights::TRANSFER
626 | zx::Rights::READ
627 | zx::Rights::WRITE
628 | zx::Rights::SIGNAL
629 | zx::Rights::WAIT;
630 match self.peer_fifo.duplicate_handle(rights) {
631 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
632 Err(s) => responder.send(Err(s.into_raw()))?,
633 }
634 Ok(())
635 }
636 fblock::SessionRequest::AttachVmo { vmo, responder } => {
637 let vmo = Arc::new(vmo);
638 let vmo_id = {
639 let mut vmos = self.vmos.lock();
640 if vmos.len() == u16::MAX as usize {
641 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
642 return Ok(());
643 } else {
644 let vmo_id = match vmos.last_entry() {
645 None => 1,
646 Some(o) => {
647 o.key().checked_add(1).unwrap_or_else(|| {
648 let mut vmo_id = 1;
649 for (&id, _) in &*vmos {
651 if id > vmo_id {
652 break;
653 }
654 vmo_id = id + 1;
655 }
656 vmo_id
657 })
658 }
659 };
660 vmos.insert(vmo_id, (vmo.clone(), None));
661 vmo_id
662 }
663 };
664 self.session_manager.clone().on_attach_vmo(&vmo).await?;
665 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
666 Ok(())
667 }
668 fblock::SessionRequest::Close { responder } => {
669 responder.send(Ok(()))?;
670 Err(anyhow!("Closed"))
671 }
672 }
673 }
674
675 fn decode_fifo_request(
677 &self,
678 session: SM::Session,
679 request: &BlockFifoRequest,
680 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
681 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
682
683 let request_bytes = request.length as u64 * self.block_size as u64;
684
685 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
686 .ok_or(zx::Status::INVALID_ARGS)
687 .and_then(|code| {
688 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
689 if code != BlockOpcode::Read {
690 return Err(zx::Status::INVALID_ARGS);
691 }
692 if !SM::SUPPORTS_DECOMPRESSION {
693 return Err(zx::Status::NOT_SUPPORTED);
694 }
695 }
696 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
697 if request.length == 0 {
698 return Err(zx::Status::INVALID_ARGS);
699 }
700 if request.dev_offset.checked_add(request.length as u64).is_none()
702 || (code != BlockOpcode::Trim
703 && request_bytes.checked_add(request.vmo_offset).is_none())
704 {
705 return Err(zx::Status::OUT_OF_RANGE);
706 }
707 }
708 Ok(match code {
709 BlockOpcode::Read => Operation::Read {
710 device_block_offset: request.dev_offset,
711 block_count: request.length,
712 _unused: 0,
713 vmo_offset: request
714 .vmo_offset
715 .checked_mul(self.block_size as u64)
716 .ok_or(zx::Status::OUT_OF_RANGE)?,
717 options: ReadOptions {
718 inline_crypto: InlineCryptoOptions {
719 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
720 dun: request.dun,
721 slot: request.slot,
722 },
723 },
724 },
725 BlockOpcode::Write => {
726 let mut options = WriteOptions {
727 inline_crypto: InlineCryptoOptions {
728 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
729 dun: request.dun,
730 slot: request.slot,
731 },
732 ..WriteOptions::default()
733 };
734 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
735 options.flags |= WriteFlags::FORCE_ACCESS;
736 }
737 if flags.contains(BlockIoFlag::PRE_BARRIER) {
738 options.flags |= WriteFlags::PRE_BARRIER;
739 }
740 Operation::Write {
741 device_block_offset: request.dev_offset,
742 block_count: request.length,
743 _unused: 0,
744 options,
745 vmo_offset: request
746 .vmo_offset
747 .checked_mul(self.block_size as u64)
748 .ok_or(zx::Status::OUT_OF_RANGE)?,
749 }
750 }
751 BlockOpcode::Flush => Operation::Flush,
752 BlockOpcode::Trim => Operation::Trim {
753 device_block_offset: request.dev_offset,
754 block_count: request.length,
755 },
756 BlockOpcode::CloseVmo => Operation::CloseVmo,
757 })
758 });
759
760 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
761 GroupOrRequest::Group(request.group)
762 } else {
763 GroupOrRequest::Request(request.reqid)
764 };
765
766 let mut active_requests = self.session_manager.active_requests().0.lock();
767 let mut request_id = None;
768
769 if group_or_request.is_group() {
780 for (key, group) in &mut active_requests.requests {
784 if group.group_or_request == group_or_request {
785 if group.req_id.is_some() {
786 if group.status == zx::Status::OK {
788 group.status = zx::Status::INVALID_ARGS;
789 }
790 return Err(None);
792 }
793 if group.status == zx::Status::OK
795 && let Some(info) = &mut group.decompression_info
796 {
797 if let Ok(Operation::Read {
798 device_block_offset,
799 mut block_count,
800 options,
801 vmo_offset: 0,
802 ..
803 }) = operation
804 {
805 let remaining_bytes = info
806 .compressed_range
807 .end
808 .next_multiple_of(self.block_size as usize)
809 as u64
810 - info.bytes_so_far;
811 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
812 || request.total_compressed_bytes != 0
813 || request.uncompressed_bytes != 0
814 || request.compressed_prefix_bytes != 0
815 || (flags.contains(BlockIoFlag::GROUP_LAST)
816 && info.bytes_so_far + request_bytes
817 < info.compressed_range.end as u64)
818 || (!flags.contains(BlockIoFlag::GROUP_LAST)
819 && request_bytes >= remaining_bytes)
820 {
821 group.status = zx::Status::INVALID_ARGS;
822 } else {
823 if request_bytes > remaining_bytes {
832 block_count = (remaining_bytes / self.block_size as u64) as u32;
833 }
834
835 operation = Ok(Operation::ContinueDecompressedRead {
836 offset: info.bytes_so_far,
837 device_block_offset,
838 block_count,
839 options,
840 });
841
842 info.bytes_so_far += block_count as u64 * self.block_size as u64;
843 }
844 } else {
845 group.status = zx::Status::INVALID_ARGS;
846 }
847 }
848 if flags.contains(BlockIoFlag::GROUP_LAST) {
849 group.req_id = Some(request.reqid);
850 if group.status != zx::Status::OK {
853 operation = Err(group.status);
854 }
855 } else if group.status != zx::Status::OK {
856 return Err(None);
859 }
860 request_id = Some(RequestId(key));
861 group.count += 1;
862 break;
863 }
864 }
865 }
866
867 let is_single_request =
868 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
869
870 let mut decompression_info = None;
871 let vmo = match operation {
872 Ok(Operation::Read {
873 device_block_offset,
874 mut block_count,
875 options,
876 vmo_offset,
877 ..
878 }) => match self.vmos.lock().get_mut(&request.vmoid) {
879 Some((vmo, mapping)) => {
880 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
881 let compressed_range = request.compressed_prefix_bytes as usize
882 ..request.compressed_prefix_bytes as usize
883 + request.total_compressed_bytes as usize;
884 let required_buffer_size =
885 compressed_range.end.next_multiple_of(self.block_size as usize);
886
887 if compressed_range.start >= compressed_range.end
889 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
890 || (is_single_request && request_bytes < compressed_range.end as u64)
891 || (!is_single_request && request_bytes >= required_buffer_size as u64)
892 {
893 Err(zx::Status::INVALID_ARGS)
894 } else {
895 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
902 block_count =
903 (required_buffer_size / self.block_size as usize) as u32;
904 required_buffer_size as u64
905 } else {
906 request_bytes
907 };
908
909 match mapping {
911 Some(mapping) => Ok(mapping.clone()),
912 None => {
913 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
914 }
915 }
916 .and_then(|mapping| {
917 if vmo_offset
920 .checked_add(request.uncompressed_bytes as u64)
921 .is_some_and(|end| end <= mapping.size as u64)
922 {
923 Ok(mapping)
924 } else {
925 Err(zx::Status::OUT_OF_RANGE)
926 }
927 })
928 .map(|mapping| {
929 operation = Ok(Operation::StartDecompressedRead {
934 required_buffer_size,
935 device_block_offset,
936 block_count,
937 options,
938 });
939 decompression_info = Some(DecompressionInfo {
942 compressed_range,
943 bytes_so_far,
944 mapping,
945 uncompressed_range: vmo_offset
946 ..vmo_offset + request.uncompressed_bytes as u64,
947 buffer: None,
948 });
949 None
950 })
951 }
952 } else {
953 Ok(Some(vmo.clone()))
954 }
955 }
956 None => Err(zx::Status::IO),
957 },
958 Ok(Operation::Write { .. }) => self
959 .vmos
960 .lock()
961 .get(&request.vmoid)
962 .cloned()
963 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
964 Ok(Operation::CloseVmo) => {
965 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
966 let vmo_clone = vmo.clone();
967 Epoch::global().defer(move || drop(vmo_clone));
970 Ok(Some(vmo))
971 })
972 }
973 _ => Ok(None),
974 }
975 .unwrap_or_else(|e| {
976 operation = Err(e);
977 None
978 });
979
980 let trace_flow_id = NonZero::new(request.trace_flow_id);
981 let request_id = request_id.unwrap_or_else(|| {
982 RequestId(active_requests.requests.insert(ActiveRequest {
983 session,
984 group_or_request,
985 trace_flow_id,
986 _epoch_guard: Epoch::global().guard(),
987 status: zx::Status::OK,
988 count: 1,
989 req_id: is_single_request.then_some(request.reqid),
990 decompression_info,
991 }))
992 });
993
994 Ok(DecodedRequest {
995 request_id,
996 trace_flow_id,
997 operation: operation.map_err(|status| {
998 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
999 })?,
1000 vmo,
1001 })
1002 }
1003
1004 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1005 std::mem::take(&mut *self.vmos.lock())
1006 }
1007
1008 fn map_request(
1010 &self,
1011 mut request: DecodedRequest,
1012 active_request: &mut ActiveRequest<SM::Session>,
1013 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1014 if active_request.status != zx::Status::OK {
1015 return Err(zx::Status::BAD_STATE);
1016 }
1017 let mapping = self.offset_map.mapping();
1018 match (mapping, request.operation.blocks()) {
1019 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1020 return Err(zx::Status::OUT_OF_RANGE);
1021 }
1022 _ => {}
1023 }
1024 let remainder = request.operation.map(
1025 self.offset_map.mapping(),
1026 self.offset_map.max_transfer_blocks(),
1027 self.block_size,
1028 );
1029 if remainder.is_some() {
1030 active_request.count += 1;
1031 }
1032 static CACHE: AtomicU64 = AtomicU64::new(0);
1033 if let Some(context) =
1034 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1035 {
1036 use fuchsia_trace::ArgValue;
1037 let trace_args = [
1038 ArgValue::of("request_id", request.request_id.0),
1039 ArgValue::of("opcode", request.operation.trace_label()),
1040 ];
1041 let _scope =
1042 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1043 if let Some(trace_flow_id) = active_request.trace_flow_id {
1044 fuchsia_trace::flow_step(
1045 &context,
1046 "block_server::start_transaction",
1047 trace_flow_id.get().into(),
1048 &[],
1049 );
1050 }
1051 }
1052 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1053 Ok((request, remainder))
1054 }
1055
1056 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1057 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1058 }
1059}
1060
1061#[repr(transparent)]
1062#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1063pub struct RequestId(usize);
1064
1065#[derive(Clone, Debug)]
1066struct DecodedRequest {
1067 request_id: RequestId,
1068 trace_flow_id: TraceFlowId,
1069 operation: Operation,
1070 vmo: Option<Arc<zx::Vmo>>,
1071}
1072
1073pub type WriteFlags = block_protocol::WriteFlags;
1075pub type WriteOptions = block_protocol::WriteOptions;
1076pub type ReadOptions = block_protocol::ReadOptions;
1077pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1078
1079#[repr(C)]
1080#[derive(Clone, Debug, PartialEq, Eq)]
1081pub enum Operation {
1082 Read {
1087 device_block_offset: u64,
1088 block_count: u32,
1089 _unused: u32,
1090 vmo_offset: u64,
1091 options: ReadOptions,
1092 },
1093 Write {
1094 device_block_offset: u64,
1095 block_count: u32,
1096 _unused: u32,
1097 vmo_offset: u64,
1098 options: WriteOptions,
1099 },
1100 Flush,
1101 Trim {
1102 device_block_offset: u64,
1103 block_count: u32,
1104 },
1105 CloseVmo,
1107 StartDecompressedRead {
1108 required_buffer_size: usize,
1109 device_block_offset: u64,
1110 block_count: u32,
1111 options: ReadOptions,
1112 },
1113 ContinueDecompressedRead {
1114 offset: u64,
1115 device_block_offset: u64,
1116 block_count: u32,
1117 options: ReadOptions,
1118 },
1119}
1120
1121impl Operation {
1122 fn trace_label(&self) -> &'static str {
1123 match self {
1124 Operation::Read { .. } => "read",
1125 Operation::Write { .. } => "write",
1126 Operation::Flush { .. } => "flush",
1127 Operation::Trim { .. } => "trim",
1128 Operation::CloseVmo { .. } => "close_vmo",
1129 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1130 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1131 }
1132 }
1133
1134 fn blocks(&self) -> Option<(u64, u32)> {
1136 match self {
1137 Operation::Read { device_block_offset, block_count, .. }
1138 | Operation::Write { device_block_offset, block_count, .. }
1139 | Operation::Trim { device_block_offset, block_count, .. } => {
1140 Some((*device_block_offset, *block_count))
1141 }
1142 _ => None,
1143 }
1144 }
1145
1146 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1148 match self {
1149 Operation::Read { device_block_offset, block_count, .. }
1150 | Operation::Write { device_block_offset, block_count, .. }
1151 | Operation::Trim { device_block_offset, block_count, .. } => {
1152 Some((device_block_offset, block_count))
1153 }
1154 _ => None,
1155 }
1156 }
1157
1158 fn map(
1161 &mut self,
1162 mapping: Option<&BlockOffsetMapping>,
1163 max_blocks: Option<NonZero<u32>>,
1164 block_size: u32,
1165 ) -> Option<Self> {
1166 let mut max = match self {
1167 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1168 _ => None,
1169 };
1170 let (offset, length) = self.blocks_mut()?;
1171 let orig_offset = *offset;
1172 if let Some(mapping) = mapping {
1173 let delta = *offset - mapping.source_block_offset;
1174 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1175 *offset = mapping.target_block_offset + delta;
1176 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1177 max = match max {
1178 None => Some(mapping_max),
1179 Some(m) => Some(std::cmp::min(m, mapping_max)),
1180 };
1181 };
1182 if let Some(max) = max {
1183 if *length as u64 > max {
1184 let rem = (*length as u64 - max) as u32;
1185 *length = max as u32;
1186 return Some(match self {
1187 Operation::Read {
1188 device_block_offset: _,
1189 block_count: _,
1190 vmo_offset,
1191 _unused,
1192 options,
1193 } => {
1194 let mut options = *options;
1195 options.inline_crypto.dun += max as u32;
1196 Operation::Read {
1197 device_block_offset: orig_offset + max,
1198 block_count: rem,
1199 vmo_offset: *vmo_offset + max * block_size as u64,
1200 _unused: *_unused,
1201 options: options,
1202 }
1203 }
1204 Operation::Write {
1205 device_block_offset: _,
1206 block_count: _,
1207 _unused,
1208 vmo_offset,
1209 options,
1210 } => {
1211 let mut options = *options;
1212 options.inline_crypto.dun += max as u32;
1213 Operation::Write {
1214 device_block_offset: orig_offset + max,
1215 block_count: rem,
1216 _unused: *_unused,
1217 vmo_offset: *vmo_offset + max * block_size as u64,
1218 options: options,
1219 }
1220 }
1221 Operation::Trim { device_block_offset: _, block_count: _ } => {
1222 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1223 }
1224 _ => unreachable!(),
1225 });
1226 }
1227 }
1228 None
1229 }
1230
1231 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1233 if let Operation::Write { options, .. } = self {
1234 options.flags.contains(value)
1235 } else {
1236 false
1237 }
1238 }
1239
1240 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1242 if let Operation::Write { options, .. } = self {
1243 let result = options.flags.contains(value);
1244 options.flags.remove(value);
1245 result
1246 } else {
1247 false
1248 }
1249 }
1250}
1251
1252#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1253pub enum GroupOrRequest {
1254 Group(u16),
1255 Request(u32),
1256}
1257
1258impl GroupOrRequest {
1259 fn is_group(&self) -> bool {
1260 matches!(self, Self::Group(_))
1261 }
1262
1263 fn group_id(&self) -> Option<u16> {
1264 match self {
1265 Self::Group(id) => Some(*id),
1266 Self::Request(_) => None,
1267 }
1268 }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use super::{
1274 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1275 TraceFlowId,
1276 };
1277 use assert_matches::assert_matches;
1278 use block_protocol::{
1279 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1280 WriteFlags, WriteOptions,
1281 };
1282 use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1283 use fuchsia_sync::Mutex;
1284 use futures::FutureExt as _;
1285 use futures::channel::oneshot;
1286 use futures::future::BoxFuture;
1287 use std::borrow::Cow;
1288 use std::future::poll_fn;
1289 use std::num::NonZero;
1290 use std::pin::pin;
1291 use std::sync::Arc;
1292 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1293 use std::task::{Context, Poll};
1294 use zx::HandleBased as _;
1295 use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1296
1297 #[derive(Default)]
1298 struct MockInterface {
1299 read_hook: Option<
1300 Box<
1301 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1302 + Send
1303 + Sync,
1304 >,
1305 >,
1306 write_hook:
1307 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1308 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1309 }
1310
1311 impl super::async_interface::Interface for MockInterface {
1312 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1313 Ok(())
1314 }
1315
1316 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1317 Cow::Owned(test_device_info())
1318 }
1319
1320 async fn read(
1321 &self,
1322 device_block_offset: u64,
1323 block_count: u32,
1324 vmo: &Arc<zx::Vmo>,
1325 vmo_offset: u64,
1326 _opts: ReadOptions,
1327 _trace_flow_id: TraceFlowId,
1328 ) -> Result<(), zx::Status> {
1329 if let Some(read_hook) = &self.read_hook {
1330 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1331 } else {
1332 unimplemented!();
1333 }
1334 }
1335
1336 async fn write(
1337 &self,
1338 device_block_offset: u64,
1339 _block_count: u32,
1340 _vmo: &Arc<zx::Vmo>,
1341 _vmo_offset: u64,
1342 opts: WriteOptions,
1343 _trace_flow_id: TraceFlowId,
1344 ) -> Result<(), zx::Status> {
1345 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1346 && let Some(barrier_hook) = &self.barrier_hook
1347 {
1348 barrier_hook()?;
1349 }
1350 if let Some(write_hook) = &self.write_hook {
1351 write_hook(device_block_offset).await
1352 } else {
1353 unimplemented!();
1354 }
1355 }
1356
1357 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1358 Ok(())
1359 }
1360
1361 async fn trim(
1362 &self,
1363 _device_block_offset: u64,
1364 _block_count: u32,
1365 _trace_flow_id: TraceFlowId,
1366 ) -> Result<(), zx::Status> {
1367 unreachable!();
1368 }
1369
1370 async fn get_volume_info(
1371 &self,
1372 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1373 let () = std::future::pending().await;
1375 unreachable!();
1376 }
1377 }
1378
1379 const BLOCK_SIZE: u32 = 512;
1380 const MAX_TRANSFER_BLOCKS: u32 = 10;
1381
1382 fn test_device_info() -> DeviceInfo {
1383 DeviceInfo::Partition(PartitionInfo {
1384 device_flags: fblock::DeviceFlag::READONLY
1385 | fblock::DeviceFlag::BARRIER_SUPPORT
1386 | fblock::DeviceFlag::FUA_SUPPORT,
1387 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1388 block_range: Some(0..100),
1389 type_guid: [1; 16],
1390 instance_guid: [2; 16],
1391 name: "foo".to_string(),
1392 flags: 0xabcd,
1393 })
1394 }
1395
1396 #[fuchsia::test]
1397 async fn test_barriers_ordering() {
1398 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1399 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1400 let barrier_called = Arc::new(AtomicBool::new(false));
1401
1402 futures::join!(
1403 async move {
1404 let barrier_called_clone = barrier_called.clone();
1405 let block_server = BlockServer::new(
1406 BLOCK_SIZE,
1407 Arc::new(MockInterface {
1408 barrier_hook: Some(Box::new(move || {
1409 barrier_called.store(true, Ordering::Relaxed);
1410 Ok(())
1411 })),
1412 write_hook: Some(Box::new(move |device_block_offset| {
1413 let barrier_called = barrier_called_clone.clone();
1414 Box::pin(async move {
1415 if device_block_offset % 2 == 0 {
1417 fasync::Timer::new(fasync::MonotonicInstant::after(
1418 zx::MonotonicDuration::from_millis(200),
1419 ))
1420 .await;
1421 }
1422 assert!(barrier_called.load(Ordering::Relaxed));
1423 Ok(())
1424 })
1425 })),
1426 ..MockInterface::default()
1427 }),
1428 );
1429 block_server.handle_requests(stream).await.unwrap();
1430 },
1431 async move {
1432 let (session_proxy, server) = fidl::endpoints::create_proxy();
1433
1434 proxy.open_session(server).unwrap();
1435
1436 let vmo_id = session_proxy
1437 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1438 .await
1439 .unwrap()
1440 .unwrap();
1441 assert_ne!(vmo_id.id, 0);
1442
1443 let mut fifo =
1444 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1445 let (mut reader, mut writer) = fifo.async_io();
1446
1447 writer
1448 .write_entries(&BlockFifoRequest {
1449 command: BlockFifoCommand {
1450 opcode: BlockOpcode::Write.into_primitive(),
1451 flags: BlockIoFlag::PRE_BARRIER.bits(),
1452 ..Default::default()
1453 },
1454 vmoid: vmo_id.id,
1455 dev_offset: 0,
1456 length: 5,
1457 vmo_offset: 6,
1458 ..Default::default()
1459 })
1460 .await
1461 .unwrap();
1462
1463 for i in 0..10 {
1464 writer
1465 .write_entries(&BlockFifoRequest {
1466 command: BlockFifoCommand {
1467 opcode: BlockOpcode::Write.into_primitive(),
1468 ..Default::default()
1469 },
1470 vmoid: vmo_id.id,
1471 dev_offset: i + 1,
1472 length: 5,
1473 vmo_offset: 6,
1474 ..Default::default()
1475 })
1476 .await
1477 .unwrap();
1478 }
1479 for _ in 0..11 {
1480 let mut response = BlockFifoResponse::default();
1481 reader.read_entries(&mut response).await.unwrap();
1482 assert_eq!(response.status, zx::sys::ZX_OK);
1483 }
1484
1485 std::mem::drop(proxy);
1486 }
1487 );
1488 }
1489
1490 #[fuchsia::test]
1491 async fn test_info() {
1492 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1493
1494 futures::join!(
1495 async {
1496 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1497 block_server.handle_requests(stream).await.unwrap();
1498 },
1499 async {
1500 let expected_info = test_device_info();
1501 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1502 info
1503 } else {
1504 unreachable!()
1505 };
1506
1507 let block_info = proxy.get_info().await.unwrap().unwrap();
1508 assert_eq!(
1509 block_info.block_count,
1510 partition_info.block_range.as_ref().unwrap().end
1511 - partition_info.block_range.as_ref().unwrap().start
1512 );
1513 assert_eq!(
1514 block_info.flags,
1515 fblock::DeviceFlag::READONLY
1516 | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1517 | fblock::DeviceFlag::BARRIER_SUPPORT
1518 | fblock::DeviceFlag::FUA_SUPPORT
1519 );
1520
1521 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1522
1523 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1524 assert_eq!(status, zx::sys::ZX_OK);
1525 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1526
1527 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1528 assert_eq!(status, zx::sys::ZX_OK);
1529 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1530
1531 let (status, name) = proxy.get_name().await.unwrap();
1532 assert_eq!(status, zx::sys::ZX_OK);
1533 assert_eq!(name.as_ref(), Some(&partition_info.name));
1534
1535 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1536 assert_eq!(metadata.name, name);
1537 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1538 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1539 assert_eq!(
1540 metadata.start_block_offset,
1541 Some(partition_info.block_range.as_ref().unwrap().start)
1542 );
1543 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1544 assert_eq!(metadata.flags, Some(partition_info.flags));
1545
1546 std::mem::drop(proxy);
1547 }
1548 );
1549 }
1550
1551 #[fuchsia::test]
1552 async fn test_attach_vmo() {
1553 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1554
1555 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1556 let koid = vmo.koid().unwrap();
1557
1558 futures::join!(
1559 async {
1560 let block_server = BlockServer::new(
1561 BLOCK_SIZE,
1562 Arc::new(MockInterface {
1563 read_hook: Some(Box::new(move |_, _, vmo, _| {
1564 assert_eq!(vmo.koid().unwrap(), koid);
1565 Box::pin(async { Ok(()) })
1566 })),
1567 ..MockInterface::default()
1568 }),
1569 );
1570 block_server.handle_requests(stream).await.unwrap();
1571 },
1572 async move {
1573 let (session_proxy, server) = fidl::endpoints::create_proxy();
1574
1575 proxy.open_session(server).unwrap();
1576
1577 let vmo_id = session_proxy
1578 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1579 .await
1580 .unwrap()
1581 .unwrap();
1582 assert_ne!(vmo_id.id, 0);
1583
1584 let mut fifo =
1585 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1586 let (mut reader, mut writer) = fifo.async_io();
1587
1588 let mut count = 1;
1590 loop {
1591 match session_proxy
1592 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1593 .await
1594 .unwrap()
1595 {
1596 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1597 Err(e) => {
1598 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1599 break;
1600 }
1601 }
1602
1603 if count % 10 == 0 {
1605 writer
1606 .write_entries(&BlockFifoRequest {
1607 command: BlockFifoCommand {
1608 opcode: BlockOpcode::Read.into_primitive(),
1609 ..Default::default()
1610 },
1611 vmoid: vmo_id.id,
1612 length: 1,
1613 ..Default::default()
1614 })
1615 .await
1616 .unwrap();
1617
1618 let mut response = BlockFifoResponse::default();
1619 reader.read_entries(&mut response).await.unwrap();
1620 assert_eq!(response.status, zx::sys::ZX_OK);
1621 }
1622
1623 count += 1;
1624 }
1625
1626 assert_eq!(count, u16::MAX as u64);
1627
1628 writer
1630 .write_entries(&BlockFifoRequest {
1631 command: BlockFifoCommand {
1632 opcode: BlockOpcode::CloseVmo.into_primitive(),
1633 ..Default::default()
1634 },
1635 vmoid: vmo_id.id,
1636 ..Default::default()
1637 })
1638 .await
1639 .unwrap();
1640
1641 let mut response = BlockFifoResponse::default();
1642 reader.read_entries(&mut response).await.unwrap();
1643 assert_eq!(response.status, zx::sys::ZX_OK);
1644
1645 let new_vmo_id = session_proxy
1646 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1647 .await
1648 .unwrap()
1649 .unwrap();
1650 assert_eq!(new_vmo_id.id, vmo_id.id);
1652
1653 std::mem::drop(proxy);
1654 }
1655 );
1656 }
1657
1658 #[fuchsia::test]
1659 async fn test_close() {
1660 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1661
1662 let mut server = std::pin::pin!(
1663 async {
1664 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1665 block_server.handle_requests(stream).await.unwrap();
1666 }
1667 .fuse()
1668 );
1669
1670 let mut client = std::pin::pin!(
1671 async {
1672 let (session_proxy, server) = fidl::endpoints::create_proxy();
1673
1674 proxy.open_session(server).unwrap();
1675
1676 std::mem::drop(proxy);
1679
1680 session_proxy.close().await.unwrap().unwrap();
1681
1682 let _: () = std::future::pending().await;
1684 }
1685 .fuse()
1686 );
1687
1688 futures::select!(
1689 _ = server => {}
1690 _ = client => unreachable!(),
1691 );
1692 }
1693
1694 #[derive(Default)]
1695 struct IoMockInterface {
1696 do_checks: bool,
1697 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1698 return_errors: bool,
1699 }
1700
1701 #[derive(Debug)]
1702 enum ExpectedOp {
1703 Read(u64, u32, u64),
1704 Write(u64, u32, u64),
1705 Trim(u64, u32),
1706 Flush,
1707 }
1708
1709 impl super::async_interface::Interface for IoMockInterface {
1710 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1711 Ok(())
1712 }
1713
1714 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1715 Cow::Owned(test_device_info())
1716 }
1717
1718 async fn read(
1719 &self,
1720 device_block_offset: u64,
1721 block_count: u32,
1722 _vmo: &Arc<zx::Vmo>,
1723 vmo_offset: u64,
1724 _opts: ReadOptions,
1725 _trace_flow_id: TraceFlowId,
1726 ) -> Result<(), zx::Status> {
1727 if self.return_errors {
1728 Err(zx::Status::INTERNAL)
1729 } else {
1730 if self.do_checks {
1731 assert_matches!(
1732 self.expected_op.lock().take(),
1733 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1734 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1735 "Read {device_block_offset} {block_count} {vmo_offset}"
1736 );
1737 }
1738 Ok(())
1739 }
1740 }
1741
1742 async fn write(
1743 &self,
1744 device_block_offset: u64,
1745 block_count: u32,
1746 _vmo: &Arc<zx::Vmo>,
1747 vmo_offset: u64,
1748 _write_opts: WriteOptions,
1749 _trace_flow_id: TraceFlowId,
1750 ) -> Result<(), zx::Status> {
1751 if self.return_errors {
1752 Err(zx::Status::NOT_SUPPORTED)
1753 } else {
1754 if self.do_checks {
1755 assert_matches!(
1756 self.expected_op.lock().take(),
1757 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1758 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1759 "Write {device_block_offset} {block_count} {vmo_offset}"
1760 );
1761 }
1762 Ok(())
1763 }
1764 }
1765
1766 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1767 if self.return_errors {
1768 Err(zx::Status::NO_RESOURCES)
1769 } else {
1770 if self.do_checks {
1771 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1772 }
1773 Ok(())
1774 }
1775 }
1776
1777 async fn trim(
1778 &self,
1779 device_block_offset: u64,
1780 block_count: u32,
1781 _trace_flow_id: TraceFlowId,
1782 ) -> Result<(), zx::Status> {
1783 if self.return_errors {
1784 Err(zx::Status::NO_MEMORY)
1785 } else {
1786 if self.do_checks {
1787 assert_matches!(
1788 self.expected_op.lock().take(),
1789 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1790 block_count == b,
1791 "Trim {device_block_offset} {block_count}"
1792 );
1793 }
1794 Ok(())
1795 }
1796 }
1797 }
1798
1799 #[fuchsia::test]
1800 async fn test_io() {
1801 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1802
1803 let expected_op = Arc::new(Mutex::new(None));
1804 let expected_op_clone = expected_op.clone();
1805
1806 let server = async {
1807 let block_server = BlockServer::new(
1808 BLOCK_SIZE,
1809 Arc::new(IoMockInterface {
1810 return_errors: false,
1811 do_checks: true,
1812 expected_op: expected_op_clone,
1813 }),
1814 );
1815 block_server.handle_requests(stream).await.unwrap();
1816 };
1817
1818 let client = async move {
1819 let (session_proxy, server) = fidl::endpoints::create_proxy();
1820
1821 proxy.open_session(server).unwrap();
1822
1823 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1824 let vmo_id = session_proxy
1825 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1826 .await
1827 .unwrap()
1828 .unwrap();
1829
1830 let mut fifo =
1831 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1832 let (mut reader, mut writer) = fifo.async_io();
1833
1834 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1836 writer
1837 .write_entries(&BlockFifoRequest {
1838 command: BlockFifoCommand {
1839 opcode: BlockOpcode::Read.into_primitive(),
1840 ..Default::default()
1841 },
1842 vmoid: vmo_id.id,
1843 dev_offset: 1,
1844 length: 2,
1845 vmo_offset: 3,
1846 ..Default::default()
1847 })
1848 .await
1849 .unwrap();
1850
1851 let mut response = BlockFifoResponse::default();
1852 reader.read_entries(&mut response).await.unwrap();
1853 assert_eq!(response.status, zx::sys::ZX_OK);
1854
1855 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1857 writer
1858 .write_entries(&BlockFifoRequest {
1859 command: BlockFifoCommand {
1860 opcode: BlockOpcode::Write.into_primitive(),
1861 ..Default::default()
1862 },
1863 vmoid: vmo_id.id,
1864 dev_offset: 4,
1865 length: 5,
1866 vmo_offset: 6,
1867 ..Default::default()
1868 })
1869 .await
1870 .unwrap();
1871
1872 let mut response = BlockFifoResponse::default();
1873 reader.read_entries(&mut response).await.unwrap();
1874 assert_eq!(response.status, zx::sys::ZX_OK);
1875
1876 *expected_op.lock() = Some(ExpectedOp::Flush);
1878 writer
1879 .write_entries(&BlockFifoRequest {
1880 command: BlockFifoCommand {
1881 opcode: BlockOpcode::Flush.into_primitive(),
1882 ..Default::default()
1883 },
1884 ..Default::default()
1885 })
1886 .await
1887 .unwrap();
1888
1889 reader.read_entries(&mut response).await.unwrap();
1890 assert_eq!(response.status, zx::sys::ZX_OK);
1891
1892 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1894 writer
1895 .write_entries(&BlockFifoRequest {
1896 command: BlockFifoCommand {
1897 opcode: BlockOpcode::Trim.into_primitive(),
1898 ..Default::default()
1899 },
1900 dev_offset: 7,
1901 length: 8,
1902 ..Default::default()
1903 })
1904 .await
1905 .unwrap();
1906
1907 reader.read_entries(&mut response).await.unwrap();
1908 assert_eq!(response.status, zx::sys::ZX_OK);
1909
1910 std::mem::drop(proxy);
1911 };
1912
1913 futures::join!(server, client);
1914 }
1915
1916 #[fuchsia::test]
1917 async fn test_io_errors() {
1918 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1919
1920 futures::join!(
1921 async {
1922 let block_server = BlockServer::new(
1923 BLOCK_SIZE,
1924 Arc::new(IoMockInterface {
1925 return_errors: true,
1926 do_checks: false,
1927 expected_op: Arc::new(Mutex::new(None)),
1928 }),
1929 );
1930 block_server.handle_requests(stream).await.unwrap();
1931 },
1932 async move {
1933 let (session_proxy, server) = fidl::endpoints::create_proxy();
1934
1935 proxy.open_session(server).unwrap();
1936
1937 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1938 let vmo_id = session_proxy
1939 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1940 .await
1941 .unwrap()
1942 .unwrap();
1943
1944 let mut fifo =
1945 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1946 let (mut reader, mut writer) = fifo.async_io();
1947
1948 writer
1950 .write_entries(&BlockFifoRequest {
1951 command: BlockFifoCommand {
1952 opcode: BlockOpcode::Read.into_primitive(),
1953 ..Default::default()
1954 },
1955 vmoid: vmo_id.id,
1956 length: 1,
1957 reqid: 1,
1958 ..Default::default()
1959 })
1960 .await
1961 .unwrap();
1962
1963 let mut response = BlockFifoResponse::default();
1964 reader.read_entries(&mut response).await.unwrap();
1965 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1966
1967 writer
1969 .write_entries(&BlockFifoRequest {
1970 command: BlockFifoCommand {
1971 opcode: BlockOpcode::Write.into_primitive(),
1972 ..Default::default()
1973 },
1974 vmoid: vmo_id.id,
1975 length: 1,
1976 reqid: 2,
1977 ..Default::default()
1978 })
1979 .await
1980 .unwrap();
1981
1982 reader.read_entries(&mut response).await.unwrap();
1983 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1984
1985 writer
1987 .write_entries(&BlockFifoRequest {
1988 command: BlockFifoCommand {
1989 opcode: BlockOpcode::Flush.into_primitive(),
1990 ..Default::default()
1991 },
1992 reqid: 3,
1993 ..Default::default()
1994 })
1995 .await
1996 .unwrap();
1997
1998 reader.read_entries(&mut response).await.unwrap();
1999 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2000
2001 writer
2003 .write_entries(&BlockFifoRequest {
2004 command: BlockFifoCommand {
2005 opcode: BlockOpcode::Trim.into_primitive(),
2006 ..Default::default()
2007 },
2008 reqid: 4,
2009 length: 1,
2010 ..Default::default()
2011 })
2012 .await
2013 .unwrap();
2014
2015 reader.read_entries(&mut response).await.unwrap();
2016 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2017
2018 std::mem::drop(proxy);
2019 }
2020 );
2021 }
2022
2023 #[fuchsia::test]
2024 async fn test_invalid_args() {
2025 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2026
2027 futures::join!(
2028 async {
2029 let block_server = BlockServer::new(
2030 BLOCK_SIZE,
2031 Arc::new(IoMockInterface {
2032 return_errors: false,
2033 do_checks: false,
2034 expected_op: Arc::new(Mutex::new(None)),
2035 }),
2036 );
2037 block_server.handle_requests(stream).await.unwrap();
2038 },
2039 async move {
2040 let (session_proxy, server) = fidl::endpoints::create_proxy();
2041
2042 proxy.open_session(server).unwrap();
2043
2044 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2045 let vmo_id = session_proxy
2046 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2047 .await
2048 .unwrap()
2049 .unwrap();
2050
2051 let mut fifo =
2052 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2053
2054 async fn test(
2055 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2056 request: BlockFifoRequest,
2057 ) -> Result<(), zx::Status> {
2058 let (mut reader, mut writer) = fifo.async_io();
2059 writer.write_entries(&request).await.unwrap();
2060 let mut response = BlockFifoResponse::default();
2061 reader.read_entries(&mut response).await.unwrap();
2062 zx::Status::ok(response.status)
2063 }
2064
2065 let good_read_request = || BlockFifoRequest {
2068 command: BlockFifoCommand {
2069 opcode: BlockOpcode::Read.into_primitive(),
2070 ..Default::default()
2071 },
2072 length: 1,
2073 vmoid: vmo_id.id,
2074 ..Default::default()
2075 };
2076
2077 assert_eq!(
2078 test(
2079 &mut fifo,
2080 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2081 )
2082 .await,
2083 Err(zx::Status::IO)
2084 );
2085
2086 assert_eq!(
2087 test(
2088 &mut fifo,
2089 BlockFifoRequest {
2090 vmo_offset: 0xffff_ffff_ffff_ffff,
2091 ..good_read_request()
2092 }
2093 )
2094 .await,
2095 Err(zx::Status::OUT_OF_RANGE)
2096 );
2097
2098 assert_eq!(
2099 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2100 Err(zx::Status::INVALID_ARGS)
2101 );
2102
2103 let good_write_request = || BlockFifoRequest {
2106 command: BlockFifoCommand {
2107 opcode: BlockOpcode::Write.into_primitive(),
2108 ..Default::default()
2109 },
2110 length: 1,
2111 vmoid: vmo_id.id,
2112 ..Default::default()
2113 };
2114
2115 assert_eq!(
2116 test(
2117 &mut fifo,
2118 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2119 )
2120 .await,
2121 Err(zx::Status::IO)
2122 );
2123
2124 assert_eq!(
2125 test(
2126 &mut fifo,
2127 BlockFifoRequest {
2128 vmo_offset: 0xffff_ffff_ffff_ffff,
2129 ..good_write_request()
2130 }
2131 )
2132 .await,
2133 Err(zx::Status::OUT_OF_RANGE)
2134 );
2135
2136 assert_eq!(
2137 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2138 Err(zx::Status::INVALID_ARGS)
2139 );
2140
2141 assert_eq!(
2144 test(
2145 &mut fifo,
2146 BlockFifoRequest {
2147 command: BlockFifoCommand {
2148 opcode: BlockOpcode::CloseVmo.into_primitive(),
2149 ..Default::default()
2150 },
2151 vmoid: vmo_id.id + 1,
2152 ..Default::default()
2153 }
2154 )
2155 .await,
2156 Err(zx::Status::IO)
2157 );
2158
2159 std::mem::drop(proxy);
2160 }
2161 );
2162 }
2163
2164 #[fuchsia::test]
2165 async fn test_concurrent_requests() {
2166 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2167
2168 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2169 let waiting_readers_clone = waiting_readers.clone();
2170
2171 futures::join!(
2172 async move {
2173 let block_server = BlockServer::new(
2174 BLOCK_SIZE,
2175 Arc::new(MockInterface {
2176 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2177 let (tx, rx) = oneshot::channel();
2178 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2179 Box::pin(async move {
2180 let _ = rx.await;
2181 Ok(())
2182 })
2183 })),
2184 ..MockInterface::default()
2185 }),
2186 );
2187 block_server.handle_requests(stream).await.unwrap();
2188 },
2189 async move {
2190 let (session_proxy, server) = fidl::endpoints::create_proxy();
2191
2192 proxy.open_session(server).unwrap();
2193
2194 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2195 let vmo_id = session_proxy
2196 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2197 .await
2198 .unwrap()
2199 .unwrap();
2200
2201 let mut fifo =
2202 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2203 let (mut reader, mut writer) = fifo.async_io();
2204
2205 writer
2206 .write_entries(&BlockFifoRequest {
2207 command: BlockFifoCommand {
2208 opcode: BlockOpcode::Read.into_primitive(),
2209 ..Default::default()
2210 },
2211 reqid: 1,
2212 dev_offset: 1, vmoid: vmo_id.id,
2214 length: 1,
2215 ..Default::default()
2216 })
2217 .await
2218 .unwrap();
2219
2220 writer
2221 .write_entries(&BlockFifoRequest {
2222 command: BlockFifoCommand {
2223 opcode: BlockOpcode::Read.into_primitive(),
2224 ..Default::default()
2225 },
2226 reqid: 2,
2227 dev_offset: 2,
2228 vmoid: vmo_id.id,
2229 length: 1,
2230 ..Default::default()
2231 })
2232 .await
2233 .unwrap();
2234
2235 poll_fn(|cx: &mut Context<'_>| {
2237 if waiting_readers.lock().len() == 2 {
2238 Poll::Ready(())
2239 } else {
2240 cx.waker().wake_by_ref();
2242 Poll::Pending
2243 }
2244 })
2245 .await;
2246
2247 let mut response = BlockFifoResponse::default();
2248 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2249
2250 let (id, tx) = waiting_readers.lock().pop().unwrap();
2251 tx.send(()).unwrap();
2252
2253 reader.read_entries(&mut response).await.unwrap();
2254 assert_eq!(response.status, zx::sys::ZX_OK);
2255 assert_eq!(response.reqid, id);
2256
2257 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2258
2259 let (id, tx) = waiting_readers.lock().pop().unwrap();
2260 tx.send(()).unwrap();
2261
2262 reader.read_entries(&mut response).await.unwrap();
2263 assert_eq!(response.status, zx::sys::ZX_OK);
2264 assert_eq!(response.reqid, id);
2265 }
2266 );
2267 }
2268
2269 #[fuchsia::test]
2270 async fn test_groups() {
2271 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2272
2273 futures::join!(
2274 async move {
2275 let block_server = BlockServer::new(
2276 BLOCK_SIZE,
2277 Arc::new(MockInterface {
2278 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2279 ..MockInterface::default()
2280 }),
2281 );
2282 block_server.handle_requests(stream).await.unwrap();
2283 },
2284 async move {
2285 let (session_proxy, server) = fidl::endpoints::create_proxy();
2286
2287 proxy.open_session(server).unwrap();
2288
2289 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2290 let vmo_id = session_proxy
2291 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2292 .await
2293 .unwrap()
2294 .unwrap();
2295
2296 let mut fifo =
2297 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2298 let (mut reader, mut writer) = fifo.async_io();
2299
2300 writer
2301 .write_entries(&BlockFifoRequest {
2302 command: BlockFifoCommand {
2303 opcode: BlockOpcode::Read.into_primitive(),
2304 flags: BlockIoFlag::GROUP_ITEM.bits(),
2305 ..Default::default()
2306 },
2307 group: 1,
2308 vmoid: vmo_id.id,
2309 length: 1,
2310 ..Default::default()
2311 })
2312 .await
2313 .unwrap();
2314
2315 writer
2316 .write_entries(&BlockFifoRequest {
2317 command: BlockFifoCommand {
2318 opcode: BlockOpcode::Read.into_primitive(),
2319 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2320 ..Default::default()
2321 },
2322 reqid: 2,
2323 group: 1,
2324 vmoid: vmo_id.id,
2325 length: 1,
2326 ..Default::default()
2327 })
2328 .await
2329 .unwrap();
2330
2331 let mut response = BlockFifoResponse::default();
2332 reader.read_entries(&mut response).await.unwrap();
2333 assert_eq!(response.status, zx::sys::ZX_OK);
2334 assert_eq!(response.reqid, 2);
2335 assert_eq!(response.group, 1);
2336 }
2337 );
2338 }
2339
2340 #[fuchsia::test]
2341 async fn test_group_error() {
2342 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2343
2344 let counter = Arc::new(AtomicU64::new(0));
2345 let counter_clone = counter.clone();
2346
2347 futures::join!(
2348 async move {
2349 let block_server = BlockServer::new(
2350 BLOCK_SIZE,
2351 Arc::new(MockInterface {
2352 read_hook: Some(Box::new(move |_, _, _, _| {
2353 counter_clone.fetch_add(1, Ordering::Relaxed);
2354 Box::pin(async { Err(zx::Status::BAD_STATE) })
2355 })),
2356 ..MockInterface::default()
2357 }),
2358 );
2359 block_server.handle_requests(stream).await.unwrap();
2360 },
2361 async move {
2362 let (session_proxy, server) = fidl::endpoints::create_proxy();
2363
2364 proxy.open_session(server).unwrap();
2365
2366 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2367 let vmo_id = session_proxy
2368 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2369 .await
2370 .unwrap()
2371 .unwrap();
2372
2373 let mut fifo =
2374 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2375 let (mut reader, mut writer) = fifo.async_io();
2376
2377 writer
2378 .write_entries(&BlockFifoRequest {
2379 command: BlockFifoCommand {
2380 opcode: BlockOpcode::Read.into_primitive(),
2381 flags: BlockIoFlag::GROUP_ITEM.bits(),
2382 ..Default::default()
2383 },
2384 group: 1,
2385 vmoid: vmo_id.id,
2386 length: 1,
2387 ..Default::default()
2388 })
2389 .await
2390 .unwrap();
2391
2392 poll_fn(|cx: &mut Context<'_>| {
2394 if counter.load(Ordering::Relaxed) == 1 {
2395 Poll::Ready(())
2396 } else {
2397 cx.waker().wake_by_ref();
2399 Poll::Pending
2400 }
2401 })
2402 .await;
2403
2404 let mut response = BlockFifoResponse::default();
2405 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2406
2407 writer
2408 .write_entries(&BlockFifoRequest {
2409 command: BlockFifoCommand {
2410 opcode: BlockOpcode::Read.into_primitive(),
2411 flags: BlockIoFlag::GROUP_ITEM.bits(),
2412 ..Default::default()
2413 },
2414 group: 1,
2415 vmoid: vmo_id.id,
2416 length: 1,
2417 ..Default::default()
2418 })
2419 .await
2420 .unwrap();
2421
2422 writer
2423 .write_entries(&BlockFifoRequest {
2424 command: BlockFifoCommand {
2425 opcode: BlockOpcode::Read.into_primitive(),
2426 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2427 ..Default::default()
2428 },
2429 reqid: 2,
2430 group: 1,
2431 vmoid: vmo_id.id,
2432 length: 1,
2433 ..Default::default()
2434 })
2435 .await
2436 .unwrap();
2437
2438 reader.read_entries(&mut response).await.unwrap();
2439 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2440 assert_eq!(response.reqid, 2);
2441 assert_eq!(response.group, 1);
2442
2443 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2444
2445 assert_eq!(counter.load(Ordering::Relaxed), 1);
2447 }
2448 );
2449 }
2450
2451 #[fuchsia::test]
2452 async fn test_group_with_two_lasts() {
2453 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2454
2455 let (tx, rx) = oneshot::channel();
2456
2457 futures::join!(
2458 async move {
2459 let rx = Mutex::new(Some(rx));
2460 let block_server = BlockServer::new(
2461 BLOCK_SIZE,
2462 Arc::new(MockInterface {
2463 read_hook: Some(Box::new(move |_, _, _, _| {
2464 let rx = rx.lock().take().unwrap();
2465 Box::pin(async {
2466 let _ = rx.await;
2467 Ok(())
2468 })
2469 })),
2470 ..MockInterface::default()
2471 }),
2472 );
2473 block_server.handle_requests(stream).await.unwrap();
2474 },
2475 async move {
2476 let (session_proxy, server) = fidl::endpoints::create_proxy();
2477
2478 proxy.open_session(server).unwrap();
2479
2480 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2481 let vmo_id = session_proxy
2482 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2483 .await
2484 .unwrap()
2485 .unwrap();
2486
2487 let mut fifo =
2488 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2489 let (mut reader, mut writer) = fifo.async_io();
2490
2491 writer
2492 .write_entries(&BlockFifoRequest {
2493 command: BlockFifoCommand {
2494 opcode: BlockOpcode::Read.into_primitive(),
2495 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2496 ..Default::default()
2497 },
2498 reqid: 1,
2499 group: 1,
2500 vmoid: vmo_id.id,
2501 length: 1,
2502 ..Default::default()
2503 })
2504 .await
2505 .unwrap();
2506
2507 writer
2508 .write_entries(&BlockFifoRequest {
2509 command: BlockFifoCommand {
2510 opcode: BlockOpcode::Read.into_primitive(),
2511 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2512 ..Default::default()
2513 },
2514 reqid: 2,
2515 group: 1,
2516 vmoid: vmo_id.id,
2517 length: 1,
2518 ..Default::default()
2519 })
2520 .await
2521 .unwrap();
2522
2523 writer
2525 .write_entries(&BlockFifoRequest {
2526 command: BlockFifoCommand {
2527 opcode: BlockOpcode::CloseVmo.into_primitive(),
2528 ..Default::default()
2529 },
2530 reqid: 3,
2531 vmoid: vmo_id.id,
2532 ..Default::default()
2533 })
2534 .await
2535 .unwrap();
2536
2537 let mut response = BlockFifoResponse::default();
2539 reader.read_entries(&mut response).await.unwrap();
2540 assert_eq!(response.status, zx::sys::ZX_OK);
2541 assert_eq!(response.reqid, 3);
2542
2543 tx.send(()).unwrap();
2545
2546 let mut response = BlockFifoResponse::default();
2549 reader.read_entries(&mut response).await.unwrap();
2550 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2551 assert_eq!(response.reqid, 1);
2552 assert_eq!(response.group, 1);
2553 }
2554 );
2555 }
2556
2557 #[fuchsia::test(allow_stalls = false)]
2558 async fn test_requests_dont_block_sessions() {
2559 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2560
2561 let (tx, rx) = oneshot::channel();
2562
2563 fasync::Task::local(async move {
2564 let rx = Mutex::new(Some(rx));
2565 let block_server = BlockServer::new(
2566 BLOCK_SIZE,
2567 Arc::new(MockInterface {
2568 read_hook: Some(Box::new(move |_, _, _, _| {
2569 let rx = rx.lock().take().unwrap();
2570 Box::pin(async {
2571 let _ = rx.await;
2572 Ok(())
2573 })
2574 })),
2575 ..MockInterface::default()
2576 }),
2577 );
2578 block_server.handle_requests(stream).await.unwrap();
2579 })
2580 .detach();
2581
2582 let mut fut = pin!(async {
2583 let (session_proxy, server) = fidl::endpoints::create_proxy();
2584
2585 proxy.open_session(server).unwrap();
2586
2587 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2588 let vmo_id = session_proxy
2589 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2590 .await
2591 .unwrap()
2592 .unwrap();
2593
2594 let mut fifo =
2595 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2596 let (mut reader, mut writer) = fifo.async_io();
2597
2598 writer
2599 .write_entries(&BlockFifoRequest {
2600 command: BlockFifoCommand {
2601 opcode: BlockOpcode::Read.into_primitive(),
2602 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2603 ..Default::default()
2604 },
2605 reqid: 1,
2606 group: 1,
2607 vmoid: vmo_id.id,
2608 length: 1,
2609 ..Default::default()
2610 })
2611 .await
2612 .unwrap();
2613
2614 let mut response = BlockFifoResponse::default();
2615 reader.read_entries(&mut response).await.unwrap();
2616 assert_eq!(response.status, zx::sys::ZX_OK);
2617 });
2618
2619 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2621
2622 let mut fut2 = pin!(proxy.get_volume_info());
2623
2624 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2626
2627 let _ = tx.send(());
2630
2631 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2632 }
2633
2634 #[fuchsia::test]
2635 async fn test_request_flow_control() {
2636 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2637
2638 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2641 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2642 let event_clone = event.clone();
2643 futures::join!(
2644 async move {
2645 let block_server = BlockServer::new(
2646 BLOCK_SIZE,
2647 Arc::new(MockInterface {
2648 read_hook: Some(Box::new(move |_, _, _, _| {
2649 let event_clone = event_clone.clone();
2650 Box::pin(async move {
2651 if !event_clone.1.load(Ordering::SeqCst) {
2652 event_clone.0.listen().await;
2653 }
2654 Ok(())
2655 })
2656 })),
2657 ..MockInterface::default()
2658 }),
2659 );
2660 block_server.handle_requests(stream).await.unwrap();
2661 },
2662 async move {
2663 let (session_proxy, server) = fidl::endpoints::create_proxy();
2664
2665 proxy.open_session(server).unwrap();
2666
2667 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2668 let vmo_id = session_proxy
2669 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2670 .await
2671 .unwrap()
2672 .unwrap();
2673
2674 let mut fifo =
2675 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2676 let (mut reader, mut writer) = fifo.async_io();
2677
2678 for i in 0..MAX_REQUESTS {
2679 writer
2680 .write_entries(&BlockFifoRequest {
2681 command: BlockFifoCommand {
2682 opcode: BlockOpcode::Read.into_primitive(),
2683 ..Default::default()
2684 },
2685 reqid: (i + 1) as u32,
2686 dev_offset: i,
2687 vmoid: vmo_id.id,
2688 length: 1,
2689 ..Default::default()
2690 })
2691 .await
2692 .unwrap();
2693 }
2694 assert!(
2695 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2696 command: BlockFifoCommand {
2697 opcode: BlockOpcode::Read.into_primitive(),
2698 ..Default::default()
2699 },
2700 reqid: u32::MAX,
2701 dev_offset: MAX_REQUESTS,
2702 vmoid: vmo_id.id,
2703 length: 1,
2704 ..Default::default()
2705 })))
2706 .is_pending()
2707 );
2708 event.1.store(true, Ordering::SeqCst);
2710 event.0.notify(usize::MAX);
2711 let mut finished_reqids = vec![];
2713 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2714 let mut response = BlockFifoResponse::default();
2715 reader.read_entries(&mut response).await.unwrap();
2716 assert_eq!(response.status, zx::sys::ZX_OK);
2717 finished_reqids.push(response.reqid);
2718 writer
2719 .write_entries(&BlockFifoRequest {
2720 command: BlockFifoCommand {
2721 opcode: BlockOpcode::Read.into_primitive(),
2722 ..Default::default()
2723 },
2724 reqid: (i + 1) as u32,
2725 dev_offset: i,
2726 vmoid: vmo_id.id,
2727 length: 1,
2728 ..Default::default()
2729 })
2730 .await
2731 .unwrap();
2732 }
2733 let mut response = BlockFifoResponse::default();
2734 for _ in 0..MAX_REQUESTS {
2735 reader.read_entries(&mut response).await.unwrap();
2736 assert_eq!(response.status, zx::sys::ZX_OK);
2737 finished_reqids.push(response.reqid);
2738 }
2739 finished_reqids.sort();
2742 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2743 let mut i = 1;
2744 for reqid in finished_reqids {
2745 assert_eq!(reqid, i);
2746 i += 1;
2747 }
2748 }
2749 );
2750 }
2751
2752 #[fuchsia::test]
2753 async fn test_passthrough_io_with_fixed_map() {
2754 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2755
2756 let expected_op = Arc::new(Mutex::new(None));
2757 let expected_op_clone = expected_op.clone();
2758 futures::join!(
2759 async {
2760 let block_server = BlockServer::new(
2761 BLOCK_SIZE,
2762 Arc::new(IoMockInterface {
2763 return_errors: false,
2764 do_checks: true,
2765 expected_op: expected_op_clone,
2766 }),
2767 );
2768 block_server.handle_requests(stream).await.unwrap();
2769 },
2770 async move {
2771 let (session_proxy, server) = fidl::endpoints::create_proxy();
2772
2773 let mapping = fblock::BlockOffsetMapping {
2774 source_block_offset: 0,
2775 target_block_offset: 10,
2776 length: 20,
2777 };
2778 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2779
2780 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2781 let vmo_id = session_proxy
2782 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2783 .await
2784 .unwrap()
2785 .unwrap();
2786
2787 let mut fifo =
2788 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2789 let (mut reader, mut writer) = fifo.async_io();
2790
2791 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2793 writer
2794 .write_entries(&BlockFifoRequest {
2795 command: BlockFifoCommand {
2796 opcode: BlockOpcode::Read.into_primitive(),
2797 ..Default::default()
2798 },
2799 vmoid: vmo_id.id,
2800 dev_offset: 1,
2801 length: 2,
2802 vmo_offset: 3,
2803 ..Default::default()
2804 })
2805 .await
2806 .unwrap();
2807
2808 let mut response = BlockFifoResponse::default();
2809 reader.read_entries(&mut response).await.unwrap();
2810 assert_eq!(response.status, zx::sys::ZX_OK);
2811
2812 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2814 writer
2815 .write_entries(&BlockFifoRequest {
2816 command: BlockFifoCommand {
2817 opcode: BlockOpcode::Write.into_primitive(),
2818 ..Default::default()
2819 },
2820 vmoid: vmo_id.id,
2821 dev_offset: 4,
2822 length: 5,
2823 vmo_offset: 6,
2824 ..Default::default()
2825 })
2826 .await
2827 .unwrap();
2828
2829 reader.read_entries(&mut response).await.unwrap();
2830 assert_eq!(response.status, zx::sys::ZX_OK);
2831
2832 *expected_op.lock() = Some(ExpectedOp::Flush);
2834 writer
2835 .write_entries(&BlockFifoRequest {
2836 command: BlockFifoCommand {
2837 opcode: BlockOpcode::Flush.into_primitive(),
2838 ..Default::default()
2839 },
2840 ..Default::default()
2841 })
2842 .await
2843 .unwrap();
2844
2845 reader.read_entries(&mut response).await.unwrap();
2846 assert_eq!(response.status, zx::sys::ZX_OK);
2847
2848 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2850 writer
2851 .write_entries(&BlockFifoRequest {
2852 command: BlockFifoCommand {
2853 opcode: BlockOpcode::Trim.into_primitive(),
2854 ..Default::default()
2855 },
2856 dev_offset: 7,
2857 length: 3,
2858 ..Default::default()
2859 })
2860 .await
2861 .unwrap();
2862
2863 reader.read_entries(&mut response).await.unwrap();
2864 assert_eq!(response.status, zx::sys::ZX_OK);
2865
2866 *expected_op.lock() = None;
2868 writer
2869 .write_entries(&BlockFifoRequest {
2870 command: BlockFifoCommand {
2871 opcode: BlockOpcode::Read.into_primitive(),
2872 ..Default::default()
2873 },
2874 vmoid: vmo_id.id,
2875 dev_offset: 19,
2876 length: 2,
2877 vmo_offset: 3,
2878 ..Default::default()
2879 })
2880 .await
2881 .unwrap();
2882
2883 reader.read_entries(&mut response).await.unwrap();
2884 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2885
2886 std::mem::drop(proxy);
2887 }
2888 );
2889 }
2890
2891 #[fuchsia::test]
2892 fn operation_map() {
2893 const BLOCK_SIZE: u32 = 512;
2894
2895 #[track_caller]
2896 fn expect_map_result(
2897 mut operation: Operation,
2898 mapping: Option<BlockOffsetMapping>,
2899 max_blocks: Option<NonZero<u32>>,
2900 expected_operations: Vec<Operation>,
2901 ) {
2902 let mut ops = vec![];
2903 while let Some(remainder) =
2904 operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2905 {
2906 ops.push(operation);
2907 operation = remainder;
2908 }
2909 ops.push(operation);
2910 assert_eq!(ops, expected_operations);
2911 }
2912
2913 expect_map_result(
2915 Operation::Read {
2916 device_block_offset: 10,
2917 block_count: 200,
2918 _unused: 0,
2919 vmo_offset: 0,
2920 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2921 },
2922 None,
2923 None,
2924 vec![Operation::Read {
2925 device_block_offset: 10,
2926 block_count: 200,
2927 _unused: 0,
2928 vmo_offset: 0,
2929 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2930 }],
2931 );
2932
2933 expect_map_result(
2935 Operation::Read {
2936 device_block_offset: 10,
2937 block_count: 200,
2938 _unused: 0,
2939 vmo_offset: 0,
2940 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2941 },
2942 None,
2943 NonZero::new(120),
2944 vec![
2945 Operation::Read {
2946 device_block_offset: 10,
2947 block_count: 120,
2948 _unused: 0,
2949 vmo_offset: 0,
2950 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2951 },
2952 Operation::Read {
2953 device_block_offset: 130,
2954 block_count: 80,
2955 _unused: 0,
2956 vmo_offset: 120 * BLOCK_SIZE as u64,
2957 options: ReadOptions {
2958 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
2960 },
2961 },
2962 ],
2963 );
2964 expect_map_result(
2965 Operation::Trim { device_block_offset: 10, block_count: 200 },
2966 None,
2967 NonZero::new(120),
2968 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2969 );
2970
2971 expect_map_result(
2973 Operation::Read {
2974 device_block_offset: 10,
2975 block_count: 200,
2976 _unused: 0,
2977 vmo_offset: 0,
2978 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2979 },
2980 Some(BlockOffsetMapping {
2981 source_block_offset: 10,
2982 target_block_offset: 100,
2983 length: 200,
2984 }),
2985 NonZero::new(120),
2986 vec![
2987 Operation::Read {
2988 device_block_offset: 100,
2989 block_count: 120,
2990 _unused: 0,
2991 vmo_offset: 0,
2992 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2993 },
2994 Operation::Read {
2995 device_block_offset: 220,
2996 block_count: 80,
2997 _unused: 0,
2998 vmo_offset: 120 * BLOCK_SIZE as u64,
2999 options: ReadOptions {
3000 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3001 },
3002 },
3003 ],
3004 );
3005 expect_map_result(
3006 Operation::Trim { device_block_offset: 10, block_count: 200 },
3007 Some(BlockOffsetMapping {
3008 source_block_offset: 10,
3009 target_block_offset: 100,
3010 length: 200,
3011 }),
3012 NonZero::new(120),
3013 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3014 );
3015 }
3016
3017 #[fuchsia::test]
3019 async fn test_pre_barrier_flush_failure() {
3020 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3021
3022 struct NoBarrierInterface;
3023 impl super::async_interface::Interface for NoBarrierInterface {
3024 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3025 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3026 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3028 block_range: Some(0..100),
3029 type_guid: [0; 16],
3030 instance_guid: [0; 16],
3031 name: "test".to_string(),
3032 flags: 0,
3033 }))
3034 }
3035 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3036 Ok(())
3037 }
3038 async fn read(
3039 &self,
3040 _: u64,
3041 _: u32,
3042 _: &Arc<zx::Vmo>,
3043 _: u64,
3044 _: ReadOptions,
3045 _: TraceFlowId,
3046 ) -> Result<(), zx::Status> {
3047 unreachable!()
3048 }
3049 async fn write(
3050 &self,
3051 _: u64,
3052 _: u32,
3053 _: &Arc<zx::Vmo>,
3054 _: u64,
3055 _: WriteOptions,
3056 _: TraceFlowId,
3057 ) -> Result<(), zx::Status> {
3058 panic!("Write should not be called");
3059 }
3060 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3061 Err(zx::Status::IO)
3062 }
3063 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3064 unreachable!()
3065 }
3066 }
3067
3068 futures::join!(
3069 async move {
3070 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3071 block_server.handle_requests(stream).await.unwrap();
3072 },
3073 async move {
3074 let (session_proxy, server) = fidl::endpoints::create_proxy();
3075 proxy.open_session(server).unwrap();
3076 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3077 let vmo_id = session_proxy
3078 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3079 .await
3080 .unwrap()
3081 .unwrap();
3082
3083 let mut fifo =
3084 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3085 let (mut reader, mut writer) = fifo.async_io();
3086
3087 writer
3088 .write_entries(&BlockFifoRequest {
3089 command: BlockFifoCommand {
3090 opcode: BlockOpcode::Write.into_primitive(),
3091 flags: BlockIoFlag::PRE_BARRIER.bits(),
3092 ..Default::default()
3093 },
3094 vmoid: vmo_id.id,
3095 length: 1,
3096 ..Default::default()
3097 })
3098 .await
3099 .unwrap();
3100
3101 let mut response = BlockFifoResponse::default();
3102 reader.read_entries(&mut response).await.unwrap();
3103 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3104 }
3105 );
3106 }
3107
3108 #[fuchsia::test]
3111 async fn test_post_barrier_write_failure() {
3112 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3113
3114 struct NoBarrierInterface;
3115 impl super::async_interface::Interface for NoBarrierInterface {
3116 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3117 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3118 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3120 block_range: Some(0..100),
3121 type_guid: [0; 16],
3122 instance_guid: [0; 16],
3123 name: "test".to_string(),
3124 flags: 0,
3125 }))
3126 }
3127 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3128 Ok(())
3129 }
3130 async fn read(
3131 &self,
3132 _: u64,
3133 _: u32,
3134 _: &Arc<zx::Vmo>,
3135 _: u64,
3136 _: ReadOptions,
3137 _: TraceFlowId,
3138 ) -> Result<(), zx::Status> {
3139 unreachable!()
3140 }
3141 async fn write(
3142 &self,
3143 _: u64,
3144 _: u32,
3145 _: &Arc<zx::Vmo>,
3146 _: u64,
3147 _: WriteOptions,
3148 _: TraceFlowId,
3149 ) -> Result<(), zx::Status> {
3150 Err(zx::Status::IO)
3151 }
3152 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3153 panic!("Flush should not be called")
3154 }
3155 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3156 unreachable!()
3157 }
3158 }
3159
3160 futures::join!(
3161 async move {
3162 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3163 block_server.handle_requests(stream).await.unwrap();
3164 },
3165 async move {
3166 let (session_proxy, server) = fidl::endpoints::create_proxy();
3167 proxy.open_session(server).unwrap();
3168 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3169 let vmo_id = session_proxy
3170 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3171 .await
3172 .unwrap()
3173 .unwrap();
3174
3175 let mut fifo =
3176 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3177 let (mut reader, mut writer) = fifo.async_io();
3178
3179 writer
3180 .write_entries(&BlockFifoRequest {
3181 command: BlockFifoCommand {
3182 opcode: BlockOpcode::Write.into_primitive(),
3183 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3184 ..Default::default()
3185 },
3186 vmoid: vmo_id.id,
3187 length: 1,
3188 ..Default::default()
3189 })
3190 .await
3191 .unwrap();
3192
3193 let mut response = BlockFifoResponse::default();
3194 reader.read_entries(&mut response).await.unwrap();
3195 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3196 }
3197 );
3198 }
3199}