1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[cfg(test)]
25mod decompression_tests;
26
27pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
28
29type TraceFlowId = Option<NonZero<u64>>;
30
31#[derive(Clone)]
32pub enum DeviceInfo {
33 Block(BlockInfo),
34 Partition(PartitionInfo),
35}
36
37impl DeviceInfo {
38 pub fn device_flags(&self) -> fblock::DeviceFlag {
39 match self {
40 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
41 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
42 }
43 }
44
45 pub fn block_count(&self) -> Option<u64> {
46 match self {
47 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
48 Self::Partition(PartitionInfo { block_range, .. }) => {
49 block_range.as_ref().map(|range| range.end - range.start)
50 }
51 }
52 }
53
54 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
55 match self {
56 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
57 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
58 max_transfer_blocks.clone()
59 }
60 }
61 }
62
63 fn max_transfer_size(&self, block_size: u32) -> u32 {
64 if let Some(max_blocks) = self.max_transfer_blocks() {
65 max_blocks.get() * block_size
66 } else {
67 MAX_TRANSFER_UNBOUNDED
68 }
69 }
70}
71
72#[derive(Clone, Default)]
74pub struct BlockInfo {
75 pub device_flags: fblock::DeviceFlag,
76 pub block_count: u64,
77 pub max_transfer_blocks: Option<NonZero<u32>>,
78}
79
80#[derive(Clone, Default)]
82pub struct PartitionInfo {
83 pub device_flags: fblock::DeviceFlag,
85 pub max_transfer_blocks: Option<NonZero<u32>>,
86 pub block_range: Option<Range<u64>>,
90 pub type_guid: [u8; 16],
91 pub instance_guid: [u8; 16],
92 pub name: String,
93 pub flags: u64,
94}
95
96struct ActiveRequest<S> {
99 session: S,
100 group_or_request: GroupOrRequest,
101 trace_flow_id: TraceFlowId,
102 _epoch_guard: EpochGuard<'static>,
103 status: zx::Status,
104 count: u32,
105 req_id: Option<u32>,
106 decompression_info: Option<DecompressionInfo>,
107}
108
109struct DecompressionInfo {
110 compressed_range: Range<usize>,
112
113 uncompressed_range: Range<u64>,
115
116 bytes_so_far: u64,
117 mapping: Arc<VmoMapping>,
118 buffer: Option<Buffer<'static>>,
119}
120
121impl DecompressionInfo {
122 fn uncompressed_slice(&self) -> *mut [u8] {
124 std::ptr::slice_from_raw_parts_mut(
125 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
126 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
127 )
128 }
129}
130
131pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
132
133impl<S> Default for ActiveRequests<S> {
134 fn default() -> Self {
135 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
136 }
137}
138
139impl<S> ActiveRequests<S> {
140 fn complete_and_take_response(
141 &self,
142 request_id: RequestId,
143 status: zx::Status,
144 ) -> Option<(S, BlockFifoResponse)> {
145 self.0.lock().complete_and_take_response(request_id, status)
146 }
147
148 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
149 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
150 }
151}
152
153struct ActiveRequestsInner<S> {
154 requests: Slab<ActiveRequest<S>>,
155}
156
157impl<S> ActiveRequestsInner<S> {
159 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
161 let group = &mut self.requests[request_id.0];
162
163 group.count = group.count.checked_sub(1).unwrap();
164 if status != zx::Status::OK && group.status == zx::Status::OK {
165 group.status = status
166 }
167
168 fuchsia_trace::duration!(
169 "storage",
170 "block_server::finish_transaction",
171 "request_id" => request_id.0,
172 "group_completed" => group.count == 0,
173 "status" => status.into_raw());
174 if let Some(trace_flow_id) = group.trace_flow_id {
175 fuchsia_trace::flow_step!(
176 "storage",
177 "block_server::finish_request",
178 trace_flow_id.get().into()
179 );
180 }
181
182 if group.count == 0
183 && group.status == zx::Status::OK
184 && let Some(info) = &mut group.decompression_info
185 {
186 thread_local! {
187 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
188 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
189 }
190 DECOMPRESSOR.with(|decompressor| {
191 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
193 let mut decompressor = decompressor.borrow_mut();
194 if let Err(error) = decompressor.decompress_to_buffer(
195 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
196 target_slice,
197 ) {
198 log::warn!(error:?; "Decompression error");
199 group.status = zx::Status::IO_DATA_INTEGRITY;
200 };
201 });
202 }
203 }
204
205 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
207 let group = &self.requests[request_id.0];
208 match group.req_id {
209 Some(reqid) if group.count == 0 => {
210 let group = self.requests.remove(request_id.0);
211 Some((
212 group.session,
213 BlockFifoResponse {
214 status: group.status.into_raw(),
215 reqid,
216 group: group.group_or_request.group_id().unwrap_or(0),
217 ..Default::default()
218 },
219 ))
220 }
221 _ => None,
222 }
223 }
224
225 fn complete_and_take_response(
227 &mut self,
228 request_id: RequestId,
229 status: zx::Status,
230 ) -> Option<(S, BlockFifoResponse)> {
231 self.complete(request_id, status);
232 self.take_response(request_id)
233 }
234}
235
236pub struct BlockServer<SM> {
239 block_size: u32,
240 session_manager: Arc<SM>,
241}
242
243#[derive(Debug)]
245pub struct BlockOffsetMapping {
246 source_block_offset: u64,
247 target_block_offset: u64,
248 length: u64,
249}
250
251impl BlockOffsetMapping {
252 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
253 blocks.0 >= self.source_block_offset
254 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
255 }
256}
257
258impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
259 type Error = zx::Status;
260
261 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
262 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
263 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
264 Ok(Self {
265 source_block_offset: wire.source_block_offset,
266 target_block_offset: wire.target_block_offset,
267 length: wire.length,
268 })
269 }
270}
271
272pub struct OffsetMap {
274 mapping: Option<BlockOffsetMapping>,
275 max_transfer_blocks: Option<NonZero<u32>>,
276}
277
278impl OffsetMap {
279 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
281 Self { mapping: Some(mapping), max_transfer_blocks }
282 }
283
284 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
286 Self { mapping: None, max_transfer_blocks }
287 }
288
289 pub fn is_empty(&self) -> bool {
290 self.mapping.is_none()
291 }
292
293 fn mapping(&self) -> Option<&BlockOffsetMapping> {
294 self.mapping.as_ref()
295 }
296
297 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
298 self.max_transfer_blocks
299 }
300}
301
302pub trait SessionManager: 'static {
305 const SUPPORTS_DECOMPRESSION: bool;
306
307 type Session;
308
309 fn on_attach_vmo(
310 self: Arc<Self>,
311 vmo: &Arc<zx::Vmo>,
312 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
313
314 fn open_session(
319 self: Arc<Self>,
320 stream: fblock::SessionRequestStream,
321 offset_map: OffsetMap,
322 block_size: u32,
323 ) -> impl Future<Output = Result<(), Error>> + Send;
324
325 fn get_info(&self) -> Cow<'_, DeviceInfo>;
327
328 fn get_volume_info(
330 &self,
331 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
332 {
333 async { Err(zx::Status::NOT_SUPPORTED) }
334 }
335
336 fn query_slices(
338 &self,
339 _start_slices: &[u64],
340 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
341 async { Err(zx::Status::NOT_SUPPORTED) }
342 }
343
344 fn extend(
346 &self,
347 _start_slice: u64,
348 _slice_count: u64,
349 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
350 async { Err(zx::Status::NOT_SUPPORTED) }
351 }
352
353 fn shrink(
355 &self,
356 _start_slice: u64,
357 _slice_count: u64,
358 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
359 async { Err(zx::Status::NOT_SUPPORTED) }
360 }
361
362 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
364}
365
366pub trait IntoSessionManager {
367 type SM: SessionManager;
368
369 fn into_session_manager(self) -> Arc<Self::SM>;
370}
371
372impl<SM: SessionManager> BlockServer<SM> {
373 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
374 Self { block_size, session_manager: session_manager.into_session_manager() }
375 }
376
377 pub fn session_manager(&self) -> &SM {
378 self.session_manager.as_ref()
379 }
380
381 pub async fn handle_requests(
383 &self,
384 mut requests: fblock::BlockRequestStream,
385 ) -> Result<(), Error> {
386 let scope = fasync::Scope::new();
387 loop {
388 match requests.try_next().await {
389 Ok(Some(request)) => {
390 if let Some(session) = self.handle_request(request).await? {
391 scope.spawn(session.map(|_| ()));
392 }
393 }
394 Ok(None) => break,
395 Err(err) => log::warn!(err:?; "Invalid request"),
396 }
397 }
398 scope.await;
399 Ok(())
400 }
401
402 async fn handle_request(
405 &self,
406 request: fblock::BlockRequest,
407 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
408 match request {
409 fblock::BlockRequest::GetInfo { responder } => {
410 let info = self.device_info();
411 let max_transfer_size = info.max_transfer_size(self.block_size);
412 let (block_count, mut flags) = match info.as_ref() {
413 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
414 (*block_count, *device_flags)
415 }
416 DeviceInfo::Partition(partition_info) => {
417 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
418 range.end - range.start
419 } else {
420 let volume_info = self.session_manager.get_volume_info().await?;
421 volume_info.0.slice_size * volume_info.1.partition_slice_count
422 / self.block_size as u64
423 };
424 (block_count, partition_info.device_flags)
425 }
426 };
427 if SM::SUPPORTS_DECOMPRESSION {
428 flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
429 }
430 responder.send(Ok(&fblock::BlockInfo {
431 block_count,
432 block_size: self.block_size,
433 max_transfer_size,
434 flags,
435 }))?;
436 }
437 fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
438 let info = self.device_info();
439 return Ok(Some(self.session_manager.clone().open_session(
440 session.into_stream(),
441 OffsetMap::empty(info.max_transfer_blocks()),
442 self.block_size,
443 )));
444 }
445 fblock::BlockRequest::OpenSessionWithOffsetMap {
446 session,
447 mapping,
448 control_handle: _,
449 } => {
450 let info = self.device_info();
451 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
452 Ok(m) => m,
453 Err(status) => {
454 session.close_with_epitaph(status)?;
455 return Ok(None);
456 }
457 };
458 if let Some(max) = info.block_count() {
459 if initial_mapping.target_block_offset + initial_mapping.length > max {
460 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
461 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
462 return Ok(None);
463 }
464 }
465 return Ok(Some(self.session_manager.clone().open_session(
466 session.into_stream(),
467 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
468 self.block_size,
469 )));
470 }
471 fblock::BlockRequest::GetTypeGuid { responder } => {
472 let info = self.device_info();
473 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
474 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
475 guid.value.copy_from_slice(&partition_info.type_guid);
476 responder.send(zx::sys::ZX_OK, Some(&guid))?;
477 } else {
478 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
479 }
480 }
481 fblock::BlockRequest::GetInstanceGuid { responder } => {
482 let info = self.device_info();
483 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
484 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
485 guid.value.copy_from_slice(&partition_info.instance_guid);
486 responder.send(zx::sys::ZX_OK, Some(&guid))?;
487 } else {
488 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
489 }
490 }
491 fblock::BlockRequest::GetName { responder } => {
492 let info = self.device_info();
493 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
494 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
495 } else {
496 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497 }
498 }
499 fblock::BlockRequest::GetMetadata { responder } => {
500 let info = self.device_info();
501 if let DeviceInfo::Partition(info) = info.as_ref() {
502 let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503 type_guid.value.copy_from_slice(&info.type_guid);
504 let mut instance_guid =
505 fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
506 instance_guid.value.copy_from_slice(&info.instance_guid);
507 responder.send(Ok(&fblock::BlockGetMetadataResponse {
508 name: Some(info.name.clone()),
509 type_guid: Some(type_guid),
510 instance_guid: Some(instance_guid),
511 start_block_offset: info.block_range.as_ref().map(|range| range.start),
512 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
513 flags: Some(info.flags),
514 ..Default::default()
515 }))?;
516 } else {
517 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
518 }
519 }
520 fblock::BlockRequest::QuerySlices { responder, start_slices } => {
521 match self.session_manager.query_slices(&start_slices).await {
522 Ok(mut results) => {
523 let results_len = results.len();
524 assert!(results_len <= 16);
525 results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
526 responder.send(
527 zx::sys::ZX_OK,
528 &results.try_into().unwrap(),
529 results_len as u64,
530 )?;
531 }
532 Err(s) => {
533 responder.send(
534 s.into_raw(),
535 &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
536 0,
537 )?;
538 }
539 }
540 }
541 fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
542 match self.session_manager.get_volume_info().await {
543 Ok((manager_info, volume_info)) => {
544 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
545 }
546 Err(s) => responder.send(s.into_raw(), None, None)?,
547 }
548 }
549 fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
550 responder.send(
551 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
552 .into_raw(),
553 )?;
554 }
555 fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
556 responder.send(
557 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
558 .into_raw(),
559 )?;
560 }
561 fblock::BlockRequest::Destroy { responder, .. } => {
562 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
563 }
564 }
565 Ok(None)
566 }
567
568 fn device_info(&self) -> Cow<'_, DeviceInfo> {
569 self.session_manager.get_info()
570 }
571}
572
573struct SessionHelper<SM: SessionManager> {
574 session_manager: Arc<SM>,
575 offset_map: OffsetMap,
576 block_size: u32,
577 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
578 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
579}
580
581struct VmoMapping {
582 base: usize,
583 size: usize,
584}
585
586impl VmoMapping {
587 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
588 let size = vmo.get_size().unwrap() as usize;
589 Ok(Arc::new(Self {
590 base: fuchsia_runtime::vmar_root_self()
591 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
592 .inspect_err(|error| {
593 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
594 })?,
595 size,
596 }))
597 }
598}
599
600impl Drop for VmoMapping {
601 fn drop(&mut self) {
602 unsafe {
604 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
605 }
606 }
607}
608
609impl<SM: SessionManager> SessionHelper<SM> {
610 fn new(
611 session_manager: Arc<SM>,
612 offset_map: OffsetMap,
613 block_size: u32,
614 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
615 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
616 Ok((
617 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
618 fifo,
619 ))
620 }
621
622 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
623 match request {
624 fblock::SessionRequest::GetFifo { responder } => {
625 let rights = zx::Rights::TRANSFER
626 | zx::Rights::READ
627 | zx::Rights::WRITE
628 | zx::Rights::SIGNAL
629 | zx::Rights::WAIT;
630 match self.peer_fifo.duplicate_handle(rights) {
631 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
632 Err(s) => responder.send(Err(s.into_raw()))?,
633 }
634 Ok(())
635 }
636 fblock::SessionRequest::AttachVmo { vmo, responder } => {
637 let vmo = Arc::new(vmo);
638 let vmo_id = {
639 let mut vmos = self.vmos.lock();
640 if vmos.len() == u16::MAX as usize {
641 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
642 return Ok(());
643 } else {
644 let vmo_id = match vmos.last_entry() {
645 None => 1,
646 Some(o) => {
647 o.key().checked_add(1).unwrap_or_else(|| {
648 let mut vmo_id = 1;
649 for (&id, _) in &*vmos {
651 if id > vmo_id {
652 break;
653 }
654 vmo_id = id + 1;
655 }
656 vmo_id
657 })
658 }
659 };
660 vmos.insert(vmo_id, (vmo.clone(), None));
661 vmo_id
662 }
663 };
664 self.session_manager.clone().on_attach_vmo(&vmo).await?;
665 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
666 Ok(())
667 }
668 fblock::SessionRequest::Close { responder } => {
669 responder.send(Ok(()))?;
670 Err(anyhow!("Closed"))
671 }
672 }
673 }
674
675 fn decode_fifo_request(
677 &self,
678 session: SM::Session,
679 request: &BlockFifoRequest,
680 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
681 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
682
683 let request_bytes = request.length as u64 * self.block_size as u64;
684
685 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
686 .ok_or(zx::Status::INVALID_ARGS)
687 .and_then(|code| {
688 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
689 if code != BlockOpcode::Read {
690 return Err(zx::Status::INVALID_ARGS);
691 }
692 if !SM::SUPPORTS_DECOMPRESSION {
693 return Err(zx::Status::NOT_SUPPORTED);
694 }
695 }
696 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
697 if request.length == 0 {
698 return Err(zx::Status::INVALID_ARGS);
699 }
700 if request.dev_offset.checked_add(request.length as u64).is_none()
702 || (code != BlockOpcode::Trim
703 && request_bytes.checked_add(request.vmo_offset).is_none())
704 {
705 return Err(zx::Status::OUT_OF_RANGE);
706 }
707 }
708 Ok(match code {
709 BlockOpcode::Read => Operation::Read {
710 device_block_offset: request.dev_offset,
711 block_count: request.length,
712 _unused: 0,
713 vmo_offset: request
714 .vmo_offset
715 .checked_mul(self.block_size as u64)
716 .ok_or(zx::Status::OUT_OF_RANGE)?,
717 options: ReadOptions {
718 inline_crypto: InlineCryptoOptions {
719 dun: request.dun,
720 slot: request.slot,
721 },
722 },
723 },
724 BlockOpcode::Write => {
725 let mut options = WriteOptions {
726 inline_crypto: InlineCryptoOptions {
727 dun: request.dun,
728 slot: request.slot,
729 },
730 ..WriteOptions::default()
731 };
732 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
733 options.flags |= WriteFlags::FORCE_ACCESS;
734 }
735 if flags.contains(BlockIoFlag::PRE_BARRIER) {
736 options.flags |= WriteFlags::PRE_BARRIER;
737 }
738 Operation::Write {
739 device_block_offset: request.dev_offset,
740 block_count: request.length,
741 _unused: 0,
742 options,
743 vmo_offset: request
744 .vmo_offset
745 .checked_mul(self.block_size as u64)
746 .ok_or(zx::Status::OUT_OF_RANGE)?,
747 }
748 }
749 BlockOpcode::Flush => Operation::Flush,
750 BlockOpcode::Trim => Operation::Trim {
751 device_block_offset: request.dev_offset,
752 block_count: request.length,
753 },
754 BlockOpcode::CloseVmo => Operation::CloseVmo,
755 })
756 });
757
758 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
759 GroupOrRequest::Group(request.group)
760 } else {
761 GroupOrRequest::Request(request.reqid)
762 };
763
764 let mut active_requests = self.session_manager.active_requests().0.lock();
765 let mut request_id = None;
766
767 if group_or_request.is_group() {
778 for (key, group) in &mut active_requests.requests {
782 if group.group_or_request == group_or_request {
783 if group.req_id.is_some() {
784 if group.status == zx::Status::OK {
786 group.status = zx::Status::INVALID_ARGS;
787 }
788 return Err(None);
790 }
791 if group.status == zx::Status::OK
793 && let Some(info) = &mut group.decompression_info
794 {
795 if let Ok(Operation::Read {
796 device_block_offset,
797 mut block_count,
798 options,
799 vmo_offset: 0,
800 ..
801 }) = operation
802 {
803 let remaining_bytes = info
804 .compressed_range
805 .end
806 .next_multiple_of(self.block_size as usize)
807 as u64
808 - info.bytes_so_far;
809 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
810 || request.total_compressed_bytes != 0
811 || request.uncompressed_bytes != 0
812 || request.compressed_prefix_bytes != 0
813 || (flags.contains(BlockIoFlag::GROUP_LAST)
814 && info.bytes_so_far + request_bytes
815 < info.compressed_range.end as u64)
816 || (!flags.contains(BlockIoFlag::GROUP_LAST)
817 && request_bytes >= remaining_bytes)
818 {
819 group.status = zx::Status::INVALID_ARGS;
820 } else {
821 if request_bytes > remaining_bytes {
830 block_count = (remaining_bytes / self.block_size as u64) as u32;
831 }
832
833 operation = Ok(Operation::ContinueDecompressedRead {
834 offset: info.bytes_so_far,
835 device_block_offset,
836 block_count,
837 options,
838 });
839
840 info.bytes_so_far += block_count as u64 * self.block_size as u64;
841 }
842 } else {
843 group.status = zx::Status::INVALID_ARGS;
844 }
845 }
846 if flags.contains(BlockIoFlag::GROUP_LAST) {
847 group.req_id = Some(request.reqid);
848 if group.status != zx::Status::OK {
851 operation = Err(group.status);
852 }
853 } else if group.status != zx::Status::OK {
854 return Err(None);
857 }
858 request_id = Some(RequestId(key));
859 group.count += 1;
860 break;
861 }
862 }
863 }
864
865 let is_single_request =
866 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
867
868 let mut decompression_info = None;
869 let vmo = match operation {
870 Ok(Operation::Read {
871 device_block_offset,
872 mut block_count,
873 options,
874 vmo_offset,
875 ..
876 }) => match self.vmos.lock().get_mut(&request.vmoid) {
877 Some((vmo, mapping)) => {
878 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
879 let compressed_range = request.compressed_prefix_bytes as usize
880 ..request.compressed_prefix_bytes as usize
881 + request.total_compressed_bytes as usize;
882 let required_buffer_size =
883 compressed_range.end.next_multiple_of(self.block_size as usize);
884
885 if compressed_range.start >= compressed_range.end
887 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
888 || (is_single_request && request_bytes < compressed_range.end as u64)
889 || (!is_single_request && request_bytes >= required_buffer_size as u64)
890 {
891 Err(zx::Status::INVALID_ARGS)
892 } else {
893 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
900 block_count =
901 (required_buffer_size / self.block_size as usize) as u32;
902 required_buffer_size as u64
903 } else {
904 request_bytes
905 };
906
907 match mapping {
909 Some(mapping) => Ok(mapping.clone()),
910 None => {
911 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
912 }
913 }
914 .and_then(|mapping| {
915 if vmo_offset
918 .checked_add(request.uncompressed_bytes as u64)
919 .is_some_and(|end| end <= mapping.size as u64)
920 {
921 Ok(mapping)
922 } else {
923 Err(zx::Status::OUT_OF_RANGE)
924 }
925 })
926 .map(|mapping| {
927 operation = Ok(Operation::StartDecompressedRead {
932 required_buffer_size,
933 device_block_offset,
934 block_count,
935 options,
936 });
937 decompression_info = Some(DecompressionInfo {
940 compressed_range,
941 bytes_so_far,
942 mapping,
943 uncompressed_range: vmo_offset
944 ..vmo_offset + request.uncompressed_bytes as u64,
945 buffer: None,
946 });
947 None
948 })
949 }
950 } else {
951 Ok(Some(vmo.clone()))
952 }
953 }
954 None => Err(zx::Status::IO),
955 },
956 Ok(Operation::Write { .. }) => self
957 .vmos
958 .lock()
959 .get(&request.vmoid)
960 .cloned()
961 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
962 Ok(Operation::CloseVmo) => {
963 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
964 let vmo_clone = vmo.clone();
965 Epoch::global().defer(move || drop(vmo_clone));
968 Ok(Some(vmo))
969 })
970 }
971 _ => Ok(None),
972 }
973 .unwrap_or_else(|e| {
974 operation = Err(e);
975 None
976 });
977
978 let trace_flow_id = NonZero::new(request.trace_flow_id);
979 let request_id = request_id.unwrap_or_else(|| {
980 RequestId(active_requests.requests.insert(ActiveRequest {
981 session,
982 group_or_request,
983 trace_flow_id,
984 _epoch_guard: Epoch::global().guard(),
985 status: zx::Status::OK,
986 count: 1,
987 req_id: is_single_request.then_some(request.reqid),
988 decompression_info,
989 }))
990 });
991
992 Ok(DecodedRequest {
993 request_id,
994 trace_flow_id,
995 operation: operation.map_err(|status| {
996 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
997 })?,
998 vmo,
999 })
1000 }
1001
1002 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1003 std::mem::take(&mut *self.vmos.lock())
1004 }
1005
1006 fn map_request(
1008 &self,
1009 mut request: DecodedRequest,
1010 active_request: &mut ActiveRequest<SM::Session>,
1011 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1012 if active_request.status != zx::Status::OK {
1013 return Err(zx::Status::BAD_STATE);
1014 }
1015 let mapping = self.offset_map.mapping();
1016 match (mapping, request.operation.blocks()) {
1017 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1018 return Err(zx::Status::OUT_OF_RANGE);
1019 }
1020 _ => {}
1021 }
1022 let remainder = request.operation.map(
1023 self.offset_map.mapping(),
1024 self.offset_map.max_transfer_blocks(),
1025 self.block_size,
1026 );
1027 if remainder.is_some() {
1028 active_request.count += 1;
1029 }
1030 static CACHE: AtomicU64 = AtomicU64::new(0);
1031 if let Some(context) =
1032 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1033 {
1034 use fuchsia_trace::ArgValue;
1035 let trace_args = [
1036 ArgValue::of("request_id", request.request_id.0),
1037 ArgValue::of("opcode", request.operation.trace_label()),
1038 ];
1039 let _scope =
1040 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1041 if let Some(trace_flow_id) = active_request.trace_flow_id {
1042 fuchsia_trace::flow_step(
1043 &context,
1044 "block_server::start_transaction",
1045 trace_flow_id.get().into(),
1046 &[],
1047 );
1048 }
1049 }
1050 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1051 Ok((request, remainder))
1052 }
1053
1054 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1055 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1056 }
1057}
1058
1059#[repr(transparent)]
1060#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1061pub struct RequestId(usize);
1062
1063#[derive(Clone, Debug)]
1064struct DecodedRequest {
1065 request_id: RequestId,
1066 trace_flow_id: TraceFlowId,
1067 operation: Operation,
1068 vmo: Option<Arc<zx::Vmo>>,
1069}
1070
1071pub type WriteFlags = block_protocol::WriteFlags;
1073pub type WriteOptions = block_protocol::WriteOptions;
1074pub type ReadOptions = block_protocol::ReadOptions;
1075
1076#[repr(C)]
1077#[derive(Clone, Debug, PartialEq, Eq)]
1078pub enum Operation {
1079 Read {
1084 device_block_offset: u64,
1085 block_count: u32,
1086 _unused: u32,
1087 vmo_offset: u64,
1088 options: ReadOptions,
1089 },
1090 Write {
1091 device_block_offset: u64,
1092 block_count: u32,
1093 _unused: u32,
1094 vmo_offset: u64,
1095 options: WriteOptions,
1096 },
1097 Flush,
1098 Trim {
1099 device_block_offset: u64,
1100 block_count: u32,
1101 },
1102 CloseVmo,
1104 StartDecompressedRead {
1105 required_buffer_size: usize,
1106 device_block_offset: u64,
1107 block_count: u32,
1108 options: ReadOptions,
1109 },
1110 ContinueDecompressedRead {
1111 offset: u64,
1112 device_block_offset: u64,
1113 block_count: u32,
1114 options: ReadOptions,
1115 },
1116}
1117
1118impl Operation {
1119 fn trace_label(&self) -> &'static str {
1120 match self {
1121 Operation::Read { .. } => "read",
1122 Operation::Write { .. } => "write",
1123 Operation::Flush { .. } => "flush",
1124 Operation::Trim { .. } => "trim",
1125 Operation::CloseVmo { .. } => "close_vmo",
1126 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1127 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1128 }
1129 }
1130
1131 fn blocks(&self) -> Option<(u64, u32)> {
1133 match self {
1134 Operation::Read { device_block_offset, block_count, .. }
1135 | Operation::Write { device_block_offset, block_count, .. }
1136 | Operation::Trim { device_block_offset, block_count, .. } => {
1137 Some((*device_block_offset, *block_count))
1138 }
1139 _ => None,
1140 }
1141 }
1142
1143 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1145 match self {
1146 Operation::Read { device_block_offset, block_count, .. }
1147 | Operation::Write { device_block_offset, block_count, .. }
1148 | Operation::Trim { device_block_offset, block_count, .. } => {
1149 Some((device_block_offset, block_count))
1150 }
1151 _ => None,
1152 }
1153 }
1154
1155 fn map(
1158 &mut self,
1159 mapping: Option<&BlockOffsetMapping>,
1160 max_blocks: Option<NonZero<u32>>,
1161 block_size: u32,
1162 ) -> Option<Self> {
1163 let mut max = match self {
1164 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1165 _ => None,
1166 };
1167 let (offset, length) = self.blocks_mut()?;
1168 let orig_offset = *offset;
1169 if let Some(mapping) = mapping {
1170 let delta = *offset - mapping.source_block_offset;
1171 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1172 *offset = mapping.target_block_offset + delta;
1173 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1174 max = match max {
1175 None => Some(mapping_max),
1176 Some(m) => Some(std::cmp::min(m, mapping_max)),
1177 };
1178 };
1179 if let Some(max) = max {
1180 if *length as u64 > max {
1181 let rem = (*length as u64 - max) as u32;
1182 *length = max as u32;
1183 return Some(match self {
1184 Operation::Read {
1185 device_block_offset: _,
1186 block_count: _,
1187 vmo_offset,
1188 _unused,
1189 options,
1190 } => {
1191 let mut options = *options;
1192 options.inline_crypto.dun += max as u32;
1193 Operation::Read {
1194 device_block_offset: orig_offset + max,
1195 block_count: rem,
1196 vmo_offset: *vmo_offset + max * block_size as u64,
1197 _unused: *_unused,
1198 options: options,
1199 }
1200 }
1201 Operation::Write {
1202 device_block_offset: _,
1203 block_count: _,
1204 _unused,
1205 vmo_offset,
1206 options,
1207 } => {
1208 let mut options = *options;
1209 options.inline_crypto.dun += max as u32;
1210 Operation::Write {
1211 device_block_offset: orig_offset + max,
1212 block_count: rem,
1213 _unused: *_unused,
1214 vmo_offset: *vmo_offset + max * block_size as u64,
1215 options: options,
1216 }
1217 }
1218 Operation::Trim { device_block_offset: _, block_count: _ } => {
1219 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1220 }
1221 _ => unreachable!(),
1222 });
1223 }
1224 }
1225 None
1226 }
1227
1228 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1230 if let Operation::Write { options, .. } = self {
1231 options.flags.contains(value)
1232 } else {
1233 false
1234 }
1235 }
1236
1237 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1239 if let Operation::Write { options, .. } = self {
1240 let result = options.flags.contains(value);
1241 options.flags.remove(value);
1242 result
1243 } else {
1244 false
1245 }
1246 }
1247}
1248
1249#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1250pub enum GroupOrRequest {
1251 Group(u16),
1252 Request(u32),
1253}
1254
1255impl GroupOrRequest {
1256 fn is_group(&self) -> bool {
1257 matches!(self, Self::Group(_))
1258 }
1259
1260 fn group_id(&self) -> Option<u16> {
1261 match self {
1262 Self::Group(id) => Some(*id),
1263 Self::Request(_) => None,
1264 }
1265 }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270 use super::{
1271 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1272 TraceFlowId,
1273 };
1274 use assert_matches::assert_matches;
1275 use block_protocol::{
1276 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1277 WriteFlags, WriteOptions,
1278 };
1279 use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1280 use fuchsia_sync::Mutex;
1281 use futures::FutureExt as _;
1282 use futures::channel::oneshot;
1283 use futures::future::BoxFuture;
1284 use std::borrow::Cow;
1285 use std::future::poll_fn;
1286 use std::num::NonZero;
1287 use std::pin::pin;
1288 use std::sync::Arc;
1289 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1290 use std::task::{Context, Poll};
1291 use zx::HandleBased as _;
1292 use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1293
1294 #[derive(Default)]
1295 struct MockInterface {
1296 read_hook: Option<
1297 Box<
1298 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1299 + Send
1300 + Sync,
1301 >,
1302 >,
1303 write_hook:
1304 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1305 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1306 }
1307
1308 impl super::async_interface::Interface for MockInterface {
1309 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1310 Ok(())
1311 }
1312
1313 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1314 Cow::Owned(test_device_info())
1315 }
1316
1317 async fn read(
1318 &self,
1319 device_block_offset: u64,
1320 block_count: u32,
1321 vmo: &Arc<zx::Vmo>,
1322 vmo_offset: u64,
1323 _opts: ReadOptions,
1324 _trace_flow_id: TraceFlowId,
1325 ) -> Result<(), zx::Status> {
1326 if let Some(read_hook) = &self.read_hook {
1327 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1328 } else {
1329 unimplemented!();
1330 }
1331 }
1332
1333 async fn write(
1334 &self,
1335 device_block_offset: u64,
1336 _block_count: u32,
1337 _vmo: &Arc<zx::Vmo>,
1338 _vmo_offset: u64,
1339 opts: WriteOptions,
1340 _trace_flow_id: TraceFlowId,
1341 ) -> Result<(), zx::Status> {
1342 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1343 && let Some(barrier_hook) = &self.barrier_hook
1344 {
1345 barrier_hook()?;
1346 }
1347 if let Some(write_hook) = &self.write_hook {
1348 write_hook(device_block_offset).await
1349 } else {
1350 unimplemented!();
1351 }
1352 }
1353
1354 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1355 Ok(())
1356 }
1357
1358 async fn trim(
1359 &self,
1360 _device_block_offset: u64,
1361 _block_count: u32,
1362 _trace_flow_id: TraceFlowId,
1363 ) -> Result<(), zx::Status> {
1364 unreachable!();
1365 }
1366
1367 async fn get_volume_info(
1368 &self,
1369 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1370 let () = std::future::pending().await;
1372 unreachable!();
1373 }
1374 }
1375
1376 const BLOCK_SIZE: u32 = 512;
1377 const MAX_TRANSFER_BLOCKS: u32 = 10;
1378
1379 fn test_device_info() -> DeviceInfo {
1380 DeviceInfo::Partition(PartitionInfo {
1381 device_flags: fblock::DeviceFlag::READONLY
1382 | fblock::DeviceFlag::BARRIER_SUPPORT
1383 | fblock::DeviceFlag::FUA_SUPPORT,
1384 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1385 block_range: Some(0..100),
1386 type_guid: [1; 16],
1387 instance_guid: [2; 16],
1388 name: "foo".to_string(),
1389 flags: 0xabcd,
1390 })
1391 }
1392
1393 #[fuchsia::test]
1394 async fn test_barriers_ordering() {
1395 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1396 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1397 let barrier_called = Arc::new(AtomicBool::new(false));
1398
1399 futures::join!(
1400 async move {
1401 let barrier_called_clone = barrier_called.clone();
1402 let block_server = BlockServer::new(
1403 BLOCK_SIZE,
1404 Arc::new(MockInterface {
1405 barrier_hook: Some(Box::new(move || {
1406 barrier_called.store(true, Ordering::Relaxed);
1407 Ok(())
1408 })),
1409 write_hook: Some(Box::new(move |device_block_offset| {
1410 let barrier_called = barrier_called_clone.clone();
1411 Box::pin(async move {
1412 if device_block_offset % 2 == 0 {
1414 fasync::Timer::new(fasync::MonotonicInstant::after(
1415 zx::MonotonicDuration::from_millis(200),
1416 ))
1417 .await;
1418 }
1419 assert!(barrier_called.load(Ordering::Relaxed));
1420 Ok(())
1421 })
1422 })),
1423 ..MockInterface::default()
1424 }),
1425 );
1426 block_server.handle_requests(stream).await.unwrap();
1427 },
1428 async move {
1429 let (session_proxy, server) = fidl::endpoints::create_proxy();
1430
1431 proxy.open_session(server).unwrap();
1432
1433 let vmo_id = session_proxy
1434 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1435 .await
1436 .unwrap()
1437 .unwrap();
1438 assert_ne!(vmo_id.id, 0);
1439
1440 let mut fifo =
1441 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1442 let (mut reader, mut writer) = fifo.async_io();
1443
1444 writer
1445 .write_entries(&BlockFifoRequest {
1446 command: BlockFifoCommand {
1447 opcode: BlockOpcode::Write.into_primitive(),
1448 flags: BlockIoFlag::PRE_BARRIER.bits(),
1449 ..Default::default()
1450 },
1451 vmoid: vmo_id.id,
1452 dev_offset: 0,
1453 length: 5,
1454 vmo_offset: 6,
1455 ..Default::default()
1456 })
1457 .await
1458 .unwrap();
1459
1460 for i in 0..10 {
1461 writer
1462 .write_entries(&BlockFifoRequest {
1463 command: BlockFifoCommand {
1464 opcode: BlockOpcode::Write.into_primitive(),
1465 ..Default::default()
1466 },
1467 vmoid: vmo_id.id,
1468 dev_offset: i + 1,
1469 length: 5,
1470 vmo_offset: 6,
1471 ..Default::default()
1472 })
1473 .await
1474 .unwrap();
1475 }
1476 for _ in 0..11 {
1477 let mut response = BlockFifoResponse::default();
1478 reader.read_entries(&mut response).await.unwrap();
1479 assert_eq!(response.status, zx::sys::ZX_OK);
1480 }
1481
1482 std::mem::drop(proxy);
1483 }
1484 );
1485 }
1486
1487 #[fuchsia::test]
1488 async fn test_info() {
1489 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1490
1491 futures::join!(
1492 async {
1493 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1494 block_server.handle_requests(stream).await.unwrap();
1495 },
1496 async {
1497 let expected_info = test_device_info();
1498 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1499 info
1500 } else {
1501 unreachable!()
1502 };
1503
1504 let block_info = proxy.get_info().await.unwrap().unwrap();
1505 assert_eq!(
1506 block_info.block_count,
1507 partition_info.block_range.as_ref().unwrap().end
1508 - partition_info.block_range.as_ref().unwrap().start
1509 );
1510 assert_eq!(
1511 block_info.flags,
1512 fblock::DeviceFlag::READONLY
1513 | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1514 | fblock::DeviceFlag::BARRIER_SUPPORT
1515 | fblock::DeviceFlag::FUA_SUPPORT
1516 );
1517
1518 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1519
1520 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1521 assert_eq!(status, zx::sys::ZX_OK);
1522 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1523
1524 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1525 assert_eq!(status, zx::sys::ZX_OK);
1526 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1527
1528 let (status, name) = proxy.get_name().await.unwrap();
1529 assert_eq!(status, zx::sys::ZX_OK);
1530 assert_eq!(name.as_ref(), Some(&partition_info.name));
1531
1532 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1533 assert_eq!(metadata.name, name);
1534 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1535 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1536 assert_eq!(
1537 metadata.start_block_offset,
1538 Some(partition_info.block_range.as_ref().unwrap().start)
1539 );
1540 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1541 assert_eq!(metadata.flags, Some(partition_info.flags));
1542
1543 std::mem::drop(proxy);
1544 }
1545 );
1546 }
1547
1548 #[fuchsia::test]
1549 async fn test_attach_vmo() {
1550 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1551
1552 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1553 let koid = vmo.koid().unwrap();
1554
1555 futures::join!(
1556 async {
1557 let block_server = BlockServer::new(
1558 BLOCK_SIZE,
1559 Arc::new(MockInterface {
1560 read_hook: Some(Box::new(move |_, _, vmo, _| {
1561 assert_eq!(vmo.koid().unwrap(), koid);
1562 Box::pin(async { Ok(()) })
1563 })),
1564 ..MockInterface::default()
1565 }),
1566 );
1567 block_server.handle_requests(stream).await.unwrap();
1568 },
1569 async move {
1570 let (session_proxy, server) = fidl::endpoints::create_proxy();
1571
1572 proxy.open_session(server).unwrap();
1573
1574 let vmo_id = session_proxy
1575 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1576 .await
1577 .unwrap()
1578 .unwrap();
1579 assert_ne!(vmo_id.id, 0);
1580
1581 let mut fifo =
1582 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1583 let (mut reader, mut writer) = fifo.async_io();
1584
1585 let mut count = 1;
1587 loop {
1588 match session_proxy
1589 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1590 .await
1591 .unwrap()
1592 {
1593 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1594 Err(e) => {
1595 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1596 break;
1597 }
1598 }
1599
1600 if count % 10 == 0 {
1602 writer
1603 .write_entries(&BlockFifoRequest {
1604 command: BlockFifoCommand {
1605 opcode: BlockOpcode::Read.into_primitive(),
1606 ..Default::default()
1607 },
1608 vmoid: vmo_id.id,
1609 length: 1,
1610 ..Default::default()
1611 })
1612 .await
1613 .unwrap();
1614
1615 let mut response = BlockFifoResponse::default();
1616 reader.read_entries(&mut response).await.unwrap();
1617 assert_eq!(response.status, zx::sys::ZX_OK);
1618 }
1619
1620 count += 1;
1621 }
1622
1623 assert_eq!(count, u16::MAX as u64);
1624
1625 writer
1627 .write_entries(&BlockFifoRequest {
1628 command: BlockFifoCommand {
1629 opcode: BlockOpcode::CloseVmo.into_primitive(),
1630 ..Default::default()
1631 },
1632 vmoid: vmo_id.id,
1633 ..Default::default()
1634 })
1635 .await
1636 .unwrap();
1637
1638 let mut response = BlockFifoResponse::default();
1639 reader.read_entries(&mut response).await.unwrap();
1640 assert_eq!(response.status, zx::sys::ZX_OK);
1641
1642 let new_vmo_id = session_proxy
1643 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1644 .await
1645 .unwrap()
1646 .unwrap();
1647 assert_eq!(new_vmo_id.id, vmo_id.id);
1649
1650 std::mem::drop(proxy);
1651 }
1652 );
1653 }
1654
1655 #[fuchsia::test]
1656 async fn test_close() {
1657 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1658
1659 let mut server = std::pin::pin!(
1660 async {
1661 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1662 block_server.handle_requests(stream).await.unwrap();
1663 }
1664 .fuse()
1665 );
1666
1667 let mut client = std::pin::pin!(
1668 async {
1669 let (session_proxy, server) = fidl::endpoints::create_proxy();
1670
1671 proxy.open_session(server).unwrap();
1672
1673 std::mem::drop(proxy);
1676
1677 session_proxy.close().await.unwrap().unwrap();
1678
1679 let _: () = std::future::pending().await;
1681 }
1682 .fuse()
1683 );
1684
1685 futures::select!(
1686 _ = server => {}
1687 _ = client => unreachable!(),
1688 );
1689 }
1690
1691 #[derive(Default)]
1692 struct IoMockInterface {
1693 do_checks: bool,
1694 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1695 return_errors: bool,
1696 }
1697
1698 #[derive(Debug)]
1699 enum ExpectedOp {
1700 Read(u64, u32, u64),
1701 Write(u64, u32, u64),
1702 Trim(u64, u32),
1703 Flush,
1704 }
1705
1706 impl super::async_interface::Interface for IoMockInterface {
1707 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1708 Ok(())
1709 }
1710
1711 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1712 Cow::Owned(test_device_info())
1713 }
1714
1715 async fn read(
1716 &self,
1717 device_block_offset: u64,
1718 block_count: u32,
1719 _vmo: &Arc<zx::Vmo>,
1720 vmo_offset: u64,
1721 _opts: ReadOptions,
1722 _trace_flow_id: TraceFlowId,
1723 ) -> Result<(), zx::Status> {
1724 if self.return_errors {
1725 Err(zx::Status::INTERNAL)
1726 } else {
1727 if self.do_checks {
1728 assert_matches!(
1729 self.expected_op.lock().take(),
1730 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1731 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1732 "Read {device_block_offset} {block_count} {vmo_offset}"
1733 );
1734 }
1735 Ok(())
1736 }
1737 }
1738
1739 async fn write(
1740 &self,
1741 device_block_offset: u64,
1742 block_count: u32,
1743 _vmo: &Arc<zx::Vmo>,
1744 vmo_offset: u64,
1745 _write_opts: WriteOptions,
1746 _trace_flow_id: TraceFlowId,
1747 ) -> Result<(), zx::Status> {
1748 if self.return_errors {
1749 Err(zx::Status::NOT_SUPPORTED)
1750 } else {
1751 if self.do_checks {
1752 assert_matches!(
1753 self.expected_op.lock().take(),
1754 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1755 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1756 "Write {device_block_offset} {block_count} {vmo_offset}"
1757 );
1758 }
1759 Ok(())
1760 }
1761 }
1762
1763 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1764 if self.return_errors {
1765 Err(zx::Status::NO_RESOURCES)
1766 } else {
1767 if self.do_checks {
1768 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1769 }
1770 Ok(())
1771 }
1772 }
1773
1774 async fn trim(
1775 &self,
1776 device_block_offset: u64,
1777 block_count: u32,
1778 _trace_flow_id: TraceFlowId,
1779 ) -> Result<(), zx::Status> {
1780 if self.return_errors {
1781 Err(zx::Status::NO_MEMORY)
1782 } else {
1783 if self.do_checks {
1784 assert_matches!(
1785 self.expected_op.lock().take(),
1786 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1787 block_count == b,
1788 "Trim {device_block_offset} {block_count}"
1789 );
1790 }
1791 Ok(())
1792 }
1793 }
1794 }
1795
1796 #[fuchsia::test]
1797 async fn test_io() {
1798 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1799
1800 let expected_op = Arc::new(Mutex::new(None));
1801 let expected_op_clone = expected_op.clone();
1802
1803 let server = async {
1804 let block_server = BlockServer::new(
1805 BLOCK_SIZE,
1806 Arc::new(IoMockInterface {
1807 return_errors: false,
1808 do_checks: true,
1809 expected_op: expected_op_clone,
1810 }),
1811 );
1812 block_server.handle_requests(stream).await.unwrap();
1813 };
1814
1815 let client = async move {
1816 let (session_proxy, server) = fidl::endpoints::create_proxy();
1817
1818 proxy.open_session(server).unwrap();
1819
1820 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1821 let vmo_id = session_proxy
1822 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1823 .await
1824 .unwrap()
1825 .unwrap();
1826
1827 let mut fifo =
1828 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1829 let (mut reader, mut writer) = fifo.async_io();
1830
1831 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1833 writer
1834 .write_entries(&BlockFifoRequest {
1835 command: BlockFifoCommand {
1836 opcode: BlockOpcode::Read.into_primitive(),
1837 ..Default::default()
1838 },
1839 vmoid: vmo_id.id,
1840 dev_offset: 1,
1841 length: 2,
1842 vmo_offset: 3,
1843 ..Default::default()
1844 })
1845 .await
1846 .unwrap();
1847
1848 let mut response = BlockFifoResponse::default();
1849 reader.read_entries(&mut response).await.unwrap();
1850 assert_eq!(response.status, zx::sys::ZX_OK);
1851
1852 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1854 writer
1855 .write_entries(&BlockFifoRequest {
1856 command: BlockFifoCommand {
1857 opcode: BlockOpcode::Write.into_primitive(),
1858 ..Default::default()
1859 },
1860 vmoid: vmo_id.id,
1861 dev_offset: 4,
1862 length: 5,
1863 vmo_offset: 6,
1864 ..Default::default()
1865 })
1866 .await
1867 .unwrap();
1868
1869 let mut response = BlockFifoResponse::default();
1870 reader.read_entries(&mut response).await.unwrap();
1871 assert_eq!(response.status, zx::sys::ZX_OK);
1872
1873 *expected_op.lock() = Some(ExpectedOp::Flush);
1875 writer
1876 .write_entries(&BlockFifoRequest {
1877 command: BlockFifoCommand {
1878 opcode: BlockOpcode::Flush.into_primitive(),
1879 ..Default::default()
1880 },
1881 ..Default::default()
1882 })
1883 .await
1884 .unwrap();
1885
1886 reader.read_entries(&mut response).await.unwrap();
1887 assert_eq!(response.status, zx::sys::ZX_OK);
1888
1889 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1891 writer
1892 .write_entries(&BlockFifoRequest {
1893 command: BlockFifoCommand {
1894 opcode: BlockOpcode::Trim.into_primitive(),
1895 ..Default::default()
1896 },
1897 dev_offset: 7,
1898 length: 8,
1899 ..Default::default()
1900 })
1901 .await
1902 .unwrap();
1903
1904 reader.read_entries(&mut response).await.unwrap();
1905 assert_eq!(response.status, zx::sys::ZX_OK);
1906
1907 std::mem::drop(proxy);
1908 };
1909
1910 futures::join!(server, client);
1911 }
1912
1913 #[fuchsia::test]
1914 async fn test_io_errors() {
1915 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1916
1917 futures::join!(
1918 async {
1919 let block_server = BlockServer::new(
1920 BLOCK_SIZE,
1921 Arc::new(IoMockInterface {
1922 return_errors: true,
1923 do_checks: false,
1924 expected_op: Arc::new(Mutex::new(None)),
1925 }),
1926 );
1927 block_server.handle_requests(stream).await.unwrap();
1928 },
1929 async move {
1930 let (session_proxy, server) = fidl::endpoints::create_proxy();
1931
1932 proxy.open_session(server).unwrap();
1933
1934 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1935 let vmo_id = session_proxy
1936 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1937 .await
1938 .unwrap()
1939 .unwrap();
1940
1941 let mut fifo =
1942 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1943 let (mut reader, mut writer) = fifo.async_io();
1944
1945 writer
1947 .write_entries(&BlockFifoRequest {
1948 command: BlockFifoCommand {
1949 opcode: BlockOpcode::Read.into_primitive(),
1950 ..Default::default()
1951 },
1952 vmoid: vmo_id.id,
1953 length: 1,
1954 reqid: 1,
1955 ..Default::default()
1956 })
1957 .await
1958 .unwrap();
1959
1960 let mut response = BlockFifoResponse::default();
1961 reader.read_entries(&mut response).await.unwrap();
1962 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1963
1964 writer
1966 .write_entries(&BlockFifoRequest {
1967 command: BlockFifoCommand {
1968 opcode: BlockOpcode::Write.into_primitive(),
1969 ..Default::default()
1970 },
1971 vmoid: vmo_id.id,
1972 length: 1,
1973 reqid: 2,
1974 ..Default::default()
1975 })
1976 .await
1977 .unwrap();
1978
1979 reader.read_entries(&mut response).await.unwrap();
1980 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1981
1982 writer
1984 .write_entries(&BlockFifoRequest {
1985 command: BlockFifoCommand {
1986 opcode: BlockOpcode::Flush.into_primitive(),
1987 ..Default::default()
1988 },
1989 reqid: 3,
1990 ..Default::default()
1991 })
1992 .await
1993 .unwrap();
1994
1995 reader.read_entries(&mut response).await.unwrap();
1996 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1997
1998 writer
2000 .write_entries(&BlockFifoRequest {
2001 command: BlockFifoCommand {
2002 opcode: BlockOpcode::Trim.into_primitive(),
2003 ..Default::default()
2004 },
2005 reqid: 4,
2006 length: 1,
2007 ..Default::default()
2008 })
2009 .await
2010 .unwrap();
2011
2012 reader.read_entries(&mut response).await.unwrap();
2013 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2014
2015 std::mem::drop(proxy);
2016 }
2017 );
2018 }
2019
2020 #[fuchsia::test]
2021 async fn test_invalid_args() {
2022 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2023
2024 futures::join!(
2025 async {
2026 let block_server = BlockServer::new(
2027 BLOCK_SIZE,
2028 Arc::new(IoMockInterface {
2029 return_errors: false,
2030 do_checks: false,
2031 expected_op: Arc::new(Mutex::new(None)),
2032 }),
2033 );
2034 block_server.handle_requests(stream).await.unwrap();
2035 },
2036 async move {
2037 let (session_proxy, server) = fidl::endpoints::create_proxy();
2038
2039 proxy.open_session(server).unwrap();
2040
2041 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2042 let vmo_id = session_proxy
2043 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2044 .await
2045 .unwrap()
2046 .unwrap();
2047
2048 let mut fifo =
2049 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2050
2051 async fn test(
2052 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2053 request: BlockFifoRequest,
2054 ) -> Result<(), zx::Status> {
2055 let (mut reader, mut writer) = fifo.async_io();
2056 writer.write_entries(&request).await.unwrap();
2057 let mut response = BlockFifoResponse::default();
2058 reader.read_entries(&mut response).await.unwrap();
2059 zx::Status::ok(response.status)
2060 }
2061
2062 let good_read_request = || BlockFifoRequest {
2065 command: BlockFifoCommand {
2066 opcode: BlockOpcode::Read.into_primitive(),
2067 ..Default::default()
2068 },
2069 length: 1,
2070 vmoid: vmo_id.id,
2071 ..Default::default()
2072 };
2073
2074 assert_eq!(
2075 test(
2076 &mut fifo,
2077 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2078 )
2079 .await,
2080 Err(zx::Status::IO)
2081 );
2082
2083 assert_eq!(
2084 test(
2085 &mut fifo,
2086 BlockFifoRequest {
2087 vmo_offset: 0xffff_ffff_ffff_ffff,
2088 ..good_read_request()
2089 }
2090 )
2091 .await,
2092 Err(zx::Status::OUT_OF_RANGE)
2093 );
2094
2095 assert_eq!(
2096 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2097 Err(zx::Status::INVALID_ARGS)
2098 );
2099
2100 let good_write_request = || BlockFifoRequest {
2103 command: BlockFifoCommand {
2104 opcode: BlockOpcode::Write.into_primitive(),
2105 ..Default::default()
2106 },
2107 length: 1,
2108 vmoid: vmo_id.id,
2109 ..Default::default()
2110 };
2111
2112 assert_eq!(
2113 test(
2114 &mut fifo,
2115 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2116 )
2117 .await,
2118 Err(zx::Status::IO)
2119 );
2120
2121 assert_eq!(
2122 test(
2123 &mut fifo,
2124 BlockFifoRequest {
2125 vmo_offset: 0xffff_ffff_ffff_ffff,
2126 ..good_write_request()
2127 }
2128 )
2129 .await,
2130 Err(zx::Status::OUT_OF_RANGE)
2131 );
2132
2133 assert_eq!(
2134 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2135 Err(zx::Status::INVALID_ARGS)
2136 );
2137
2138 assert_eq!(
2141 test(
2142 &mut fifo,
2143 BlockFifoRequest {
2144 command: BlockFifoCommand {
2145 opcode: BlockOpcode::CloseVmo.into_primitive(),
2146 ..Default::default()
2147 },
2148 vmoid: vmo_id.id + 1,
2149 ..Default::default()
2150 }
2151 )
2152 .await,
2153 Err(zx::Status::IO)
2154 );
2155
2156 std::mem::drop(proxy);
2157 }
2158 );
2159 }
2160
2161 #[fuchsia::test]
2162 async fn test_concurrent_requests() {
2163 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2164
2165 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2166 let waiting_readers_clone = waiting_readers.clone();
2167
2168 futures::join!(
2169 async move {
2170 let block_server = BlockServer::new(
2171 BLOCK_SIZE,
2172 Arc::new(MockInterface {
2173 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2174 let (tx, rx) = oneshot::channel();
2175 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2176 Box::pin(async move {
2177 let _ = rx.await;
2178 Ok(())
2179 })
2180 })),
2181 ..MockInterface::default()
2182 }),
2183 );
2184 block_server.handle_requests(stream).await.unwrap();
2185 },
2186 async move {
2187 let (session_proxy, server) = fidl::endpoints::create_proxy();
2188
2189 proxy.open_session(server).unwrap();
2190
2191 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2192 let vmo_id = session_proxy
2193 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2194 .await
2195 .unwrap()
2196 .unwrap();
2197
2198 let mut fifo =
2199 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2200 let (mut reader, mut writer) = fifo.async_io();
2201
2202 writer
2203 .write_entries(&BlockFifoRequest {
2204 command: BlockFifoCommand {
2205 opcode: BlockOpcode::Read.into_primitive(),
2206 ..Default::default()
2207 },
2208 reqid: 1,
2209 dev_offset: 1, vmoid: vmo_id.id,
2211 length: 1,
2212 ..Default::default()
2213 })
2214 .await
2215 .unwrap();
2216
2217 writer
2218 .write_entries(&BlockFifoRequest {
2219 command: BlockFifoCommand {
2220 opcode: BlockOpcode::Read.into_primitive(),
2221 ..Default::default()
2222 },
2223 reqid: 2,
2224 dev_offset: 2,
2225 vmoid: vmo_id.id,
2226 length: 1,
2227 ..Default::default()
2228 })
2229 .await
2230 .unwrap();
2231
2232 poll_fn(|cx: &mut Context<'_>| {
2234 if waiting_readers.lock().len() == 2 {
2235 Poll::Ready(())
2236 } else {
2237 cx.waker().wake_by_ref();
2239 Poll::Pending
2240 }
2241 })
2242 .await;
2243
2244 let mut response = BlockFifoResponse::default();
2245 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2246
2247 let (id, tx) = waiting_readers.lock().pop().unwrap();
2248 tx.send(()).unwrap();
2249
2250 reader.read_entries(&mut response).await.unwrap();
2251 assert_eq!(response.status, zx::sys::ZX_OK);
2252 assert_eq!(response.reqid, id);
2253
2254 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2255
2256 let (id, tx) = waiting_readers.lock().pop().unwrap();
2257 tx.send(()).unwrap();
2258
2259 reader.read_entries(&mut response).await.unwrap();
2260 assert_eq!(response.status, zx::sys::ZX_OK);
2261 assert_eq!(response.reqid, id);
2262 }
2263 );
2264 }
2265
2266 #[fuchsia::test]
2267 async fn test_groups() {
2268 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2269
2270 futures::join!(
2271 async move {
2272 let block_server = BlockServer::new(
2273 BLOCK_SIZE,
2274 Arc::new(MockInterface {
2275 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2276 ..MockInterface::default()
2277 }),
2278 );
2279 block_server.handle_requests(stream).await.unwrap();
2280 },
2281 async move {
2282 let (session_proxy, server) = fidl::endpoints::create_proxy();
2283
2284 proxy.open_session(server).unwrap();
2285
2286 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2287 let vmo_id = session_proxy
2288 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2289 .await
2290 .unwrap()
2291 .unwrap();
2292
2293 let mut fifo =
2294 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2295 let (mut reader, mut writer) = fifo.async_io();
2296
2297 writer
2298 .write_entries(&BlockFifoRequest {
2299 command: BlockFifoCommand {
2300 opcode: BlockOpcode::Read.into_primitive(),
2301 flags: BlockIoFlag::GROUP_ITEM.bits(),
2302 ..Default::default()
2303 },
2304 group: 1,
2305 vmoid: vmo_id.id,
2306 length: 1,
2307 ..Default::default()
2308 })
2309 .await
2310 .unwrap();
2311
2312 writer
2313 .write_entries(&BlockFifoRequest {
2314 command: BlockFifoCommand {
2315 opcode: BlockOpcode::Read.into_primitive(),
2316 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2317 ..Default::default()
2318 },
2319 reqid: 2,
2320 group: 1,
2321 vmoid: vmo_id.id,
2322 length: 1,
2323 ..Default::default()
2324 })
2325 .await
2326 .unwrap();
2327
2328 let mut response = BlockFifoResponse::default();
2329 reader.read_entries(&mut response).await.unwrap();
2330 assert_eq!(response.status, zx::sys::ZX_OK);
2331 assert_eq!(response.reqid, 2);
2332 assert_eq!(response.group, 1);
2333 }
2334 );
2335 }
2336
2337 #[fuchsia::test]
2338 async fn test_group_error() {
2339 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2340
2341 let counter = Arc::new(AtomicU64::new(0));
2342 let counter_clone = counter.clone();
2343
2344 futures::join!(
2345 async move {
2346 let block_server = BlockServer::new(
2347 BLOCK_SIZE,
2348 Arc::new(MockInterface {
2349 read_hook: Some(Box::new(move |_, _, _, _| {
2350 counter_clone.fetch_add(1, Ordering::Relaxed);
2351 Box::pin(async { Err(zx::Status::BAD_STATE) })
2352 })),
2353 ..MockInterface::default()
2354 }),
2355 );
2356 block_server.handle_requests(stream).await.unwrap();
2357 },
2358 async move {
2359 let (session_proxy, server) = fidl::endpoints::create_proxy();
2360
2361 proxy.open_session(server).unwrap();
2362
2363 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2364 let vmo_id = session_proxy
2365 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2366 .await
2367 .unwrap()
2368 .unwrap();
2369
2370 let mut fifo =
2371 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2372 let (mut reader, mut writer) = fifo.async_io();
2373
2374 writer
2375 .write_entries(&BlockFifoRequest {
2376 command: BlockFifoCommand {
2377 opcode: BlockOpcode::Read.into_primitive(),
2378 flags: BlockIoFlag::GROUP_ITEM.bits(),
2379 ..Default::default()
2380 },
2381 group: 1,
2382 vmoid: vmo_id.id,
2383 length: 1,
2384 ..Default::default()
2385 })
2386 .await
2387 .unwrap();
2388
2389 poll_fn(|cx: &mut Context<'_>| {
2391 if counter.load(Ordering::Relaxed) == 1 {
2392 Poll::Ready(())
2393 } else {
2394 cx.waker().wake_by_ref();
2396 Poll::Pending
2397 }
2398 })
2399 .await;
2400
2401 let mut response = BlockFifoResponse::default();
2402 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2403
2404 writer
2405 .write_entries(&BlockFifoRequest {
2406 command: BlockFifoCommand {
2407 opcode: BlockOpcode::Read.into_primitive(),
2408 flags: BlockIoFlag::GROUP_ITEM.bits(),
2409 ..Default::default()
2410 },
2411 group: 1,
2412 vmoid: vmo_id.id,
2413 length: 1,
2414 ..Default::default()
2415 })
2416 .await
2417 .unwrap();
2418
2419 writer
2420 .write_entries(&BlockFifoRequest {
2421 command: BlockFifoCommand {
2422 opcode: BlockOpcode::Read.into_primitive(),
2423 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2424 ..Default::default()
2425 },
2426 reqid: 2,
2427 group: 1,
2428 vmoid: vmo_id.id,
2429 length: 1,
2430 ..Default::default()
2431 })
2432 .await
2433 .unwrap();
2434
2435 reader.read_entries(&mut response).await.unwrap();
2436 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2437 assert_eq!(response.reqid, 2);
2438 assert_eq!(response.group, 1);
2439
2440 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2441
2442 assert_eq!(counter.load(Ordering::Relaxed), 1);
2444 }
2445 );
2446 }
2447
2448 #[fuchsia::test]
2449 async fn test_group_with_two_lasts() {
2450 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2451
2452 let (tx, rx) = oneshot::channel();
2453
2454 futures::join!(
2455 async move {
2456 let rx = Mutex::new(Some(rx));
2457 let block_server = BlockServer::new(
2458 BLOCK_SIZE,
2459 Arc::new(MockInterface {
2460 read_hook: Some(Box::new(move |_, _, _, _| {
2461 let rx = rx.lock().take().unwrap();
2462 Box::pin(async {
2463 let _ = rx.await;
2464 Ok(())
2465 })
2466 })),
2467 ..MockInterface::default()
2468 }),
2469 );
2470 block_server.handle_requests(stream).await.unwrap();
2471 },
2472 async move {
2473 let (session_proxy, server) = fidl::endpoints::create_proxy();
2474
2475 proxy.open_session(server).unwrap();
2476
2477 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2478 let vmo_id = session_proxy
2479 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2480 .await
2481 .unwrap()
2482 .unwrap();
2483
2484 let mut fifo =
2485 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2486 let (mut reader, mut writer) = fifo.async_io();
2487
2488 writer
2489 .write_entries(&BlockFifoRequest {
2490 command: BlockFifoCommand {
2491 opcode: BlockOpcode::Read.into_primitive(),
2492 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2493 ..Default::default()
2494 },
2495 reqid: 1,
2496 group: 1,
2497 vmoid: vmo_id.id,
2498 length: 1,
2499 ..Default::default()
2500 })
2501 .await
2502 .unwrap();
2503
2504 writer
2505 .write_entries(&BlockFifoRequest {
2506 command: BlockFifoCommand {
2507 opcode: BlockOpcode::Read.into_primitive(),
2508 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2509 ..Default::default()
2510 },
2511 reqid: 2,
2512 group: 1,
2513 vmoid: vmo_id.id,
2514 length: 1,
2515 ..Default::default()
2516 })
2517 .await
2518 .unwrap();
2519
2520 writer
2522 .write_entries(&BlockFifoRequest {
2523 command: BlockFifoCommand {
2524 opcode: BlockOpcode::CloseVmo.into_primitive(),
2525 ..Default::default()
2526 },
2527 reqid: 3,
2528 vmoid: vmo_id.id,
2529 ..Default::default()
2530 })
2531 .await
2532 .unwrap();
2533
2534 let mut response = BlockFifoResponse::default();
2536 reader.read_entries(&mut response).await.unwrap();
2537 assert_eq!(response.status, zx::sys::ZX_OK);
2538 assert_eq!(response.reqid, 3);
2539
2540 tx.send(()).unwrap();
2542
2543 let mut response = BlockFifoResponse::default();
2546 reader.read_entries(&mut response).await.unwrap();
2547 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2548 assert_eq!(response.reqid, 1);
2549 assert_eq!(response.group, 1);
2550 }
2551 );
2552 }
2553
2554 #[fuchsia::test(allow_stalls = false)]
2555 async fn test_requests_dont_block_sessions() {
2556 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2557
2558 let (tx, rx) = oneshot::channel();
2559
2560 fasync::Task::local(async move {
2561 let rx = Mutex::new(Some(rx));
2562 let block_server = BlockServer::new(
2563 BLOCK_SIZE,
2564 Arc::new(MockInterface {
2565 read_hook: Some(Box::new(move |_, _, _, _| {
2566 let rx = rx.lock().take().unwrap();
2567 Box::pin(async {
2568 let _ = rx.await;
2569 Ok(())
2570 })
2571 })),
2572 ..MockInterface::default()
2573 }),
2574 );
2575 block_server.handle_requests(stream).await.unwrap();
2576 })
2577 .detach();
2578
2579 let mut fut = pin!(async {
2580 let (session_proxy, server) = fidl::endpoints::create_proxy();
2581
2582 proxy.open_session(server).unwrap();
2583
2584 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2585 let vmo_id = session_proxy
2586 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2587 .await
2588 .unwrap()
2589 .unwrap();
2590
2591 let mut fifo =
2592 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2593 let (mut reader, mut writer) = fifo.async_io();
2594
2595 writer
2596 .write_entries(&BlockFifoRequest {
2597 command: BlockFifoCommand {
2598 opcode: BlockOpcode::Read.into_primitive(),
2599 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2600 ..Default::default()
2601 },
2602 reqid: 1,
2603 group: 1,
2604 vmoid: vmo_id.id,
2605 length: 1,
2606 ..Default::default()
2607 })
2608 .await
2609 .unwrap();
2610
2611 let mut response = BlockFifoResponse::default();
2612 reader.read_entries(&mut response).await.unwrap();
2613 assert_eq!(response.status, zx::sys::ZX_OK);
2614 });
2615
2616 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2618
2619 let mut fut2 = pin!(proxy.get_volume_info());
2620
2621 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2623
2624 let _ = tx.send(());
2627
2628 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2629 }
2630
2631 #[fuchsia::test]
2632 async fn test_request_flow_control() {
2633 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2634
2635 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2638 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2639 let event_clone = event.clone();
2640 futures::join!(
2641 async move {
2642 let block_server = BlockServer::new(
2643 BLOCK_SIZE,
2644 Arc::new(MockInterface {
2645 read_hook: Some(Box::new(move |_, _, _, _| {
2646 let event_clone = event_clone.clone();
2647 Box::pin(async move {
2648 if !event_clone.1.load(Ordering::SeqCst) {
2649 event_clone.0.listen().await;
2650 }
2651 Ok(())
2652 })
2653 })),
2654 ..MockInterface::default()
2655 }),
2656 );
2657 block_server.handle_requests(stream).await.unwrap();
2658 },
2659 async move {
2660 let (session_proxy, server) = fidl::endpoints::create_proxy();
2661
2662 proxy.open_session(server).unwrap();
2663
2664 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2665 let vmo_id = session_proxy
2666 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2667 .await
2668 .unwrap()
2669 .unwrap();
2670
2671 let mut fifo =
2672 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2673 let (mut reader, mut writer) = fifo.async_io();
2674
2675 for i in 0..MAX_REQUESTS {
2676 writer
2677 .write_entries(&BlockFifoRequest {
2678 command: BlockFifoCommand {
2679 opcode: BlockOpcode::Read.into_primitive(),
2680 ..Default::default()
2681 },
2682 reqid: (i + 1) as u32,
2683 dev_offset: i,
2684 vmoid: vmo_id.id,
2685 length: 1,
2686 ..Default::default()
2687 })
2688 .await
2689 .unwrap();
2690 }
2691 assert!(
2692 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2693 command: BlockFifoCommand {
2694 opcode: BlockOpcode::Read.into_primitive(),
2695 ..Default::default()
2696 },
2697 reqid: u32::MAX,
2698 dev_offset: MAX_REQUESTS,
2699 vmoid: vmo_id.id,
2700 length: 1,
2701 ..Default::default()
2702 })))
2703 .is_pending()
2704 );
2705 event.1.store(true, Ordering::SeqCst);
2707 event.0.notify(usize::MAX);
2708 let mut finished_reqids = vec![];
2710 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2711 let mut response = BlockFifoResponse::default();
2712 reader.read_entries(&mut response).await.unwrap();
2713 assert_eq!(response.status, zx::sys::ZX_OK);
2714 finished_reqids.push(response.reqid);
2715 writer
2716 .write_entries(&BlockFifoRequest {
2717 command: BlockFifoCommand {
2718 opcode: BlockOpcode::Read.into_primitive(),
2719 ..Default::default()
2720 },
2721 reqid: (i + 1) as u32,
2722 dev_offset: i,
2723 vmoid: vmo_id.id,
2724 length: 1,
2725 ..Default::default()
2726 })
2727 .await
2728 .unwrap();
2729 }
2730 let mut response = BlockFifoResponse::default();
2731 for _ in 0..MAX_REQUESTS {
2732 reader.read_entries(&mut response).await.unwrap();
2733 assert_eq!(response.status, zx::sys::ZX_OK);
2734 finished_reqids.push(response.reqid);
2735 }
2736 finished_reqids.sort();
2739 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2740 let mut i = 1;
2741 for reqid in finished_reqids {
2742 assert_eq!(reqid, i);
2743 i += 1;
2744 }
2745 }
2746 );
2747 }
2748
2749 #[fuchsia::test]
2750 async fn test_passthrough_io_with_fixed_map() {
2751 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2752
2753 let expected_op = Arc::new(Mutex::new(None));
2754 let expected_op_clone = expected_op.clone();
2755 futures::join!(
2756 async {
2757 let block_server = BlockServer::new(
2758 BLOCK_SIZE,
2759 Arc::new(IoMockInterface {
2760 return_errors: false,
2761 do_checks: true,
2762 expected_op: expected_op_clone,
2763 }),
2764 );
2765 block_server.handle_requests(stream).await.unwrap();
2766 },
2767 async move {
2768 let (session_proxy, server) = fidl::endpoints::create_proxy();
2769
2770 let mapping = fblock::BlockOffsetMapping {
2771 source_block_offset: 0,
2772 target_block_offset: 10,
2773 length: 20,
2774 };
2775 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2776
2777 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2778 let vmo_id = session_proxy
2779 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2780 .await
2781 .unwrap()
2782 .unwrap();
2783
2784 let mut fifo =
2785 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2786 let (mut reader, mut writer) = fifo.async_io();
2787
2788 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2790 writer
2791 .write_entries(&BlockFifoRequest {
2792 command: BlockFifoCommand {
2793 opcode: BlockOpcode::Read.into_primitive(),
2794 ..Default::default()
2795 },
2796 vmoid: vmo_id.id,
2797 dev_offset: 1,
2798 length: 2,
2799 vmo_offset: 3,
2800 ..Default::default()
2801 })
2802 .await
2803 .unwrap();
2804
2805 let mut response = BlockFifoResponse::default();
2806 reader.read_entries(&mut response).await.unwrap();
2807 assert_eq!(response.status, zx::sys::ZX_OK);
2808
2809 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2811 writer
2812 .write_entries(&BlockFifoRequest {
2813 command: BlockFifoCommand {
2814 opcode: BlockOpcode::Write.into_primitive(),
2815 ..Default::default()
2816 },
2817 vmoid: vmo_id.id,
2818 dev_offset: 4,
2819 length: 5,
2820 vmo_offset: 6,
2821 ..Default::default()
2822 })
2823 .await
2824 .unwrap();
2825
2826 reader.read_entries(&mut response).await.unwrap();
2827 assert_eq!(response.status, zx::sys::ZX_OK);
2828
2829 *expected_op.lock() = Some(ExpectedOp::Flush);
2831 writer
2832 .write_entries(&BlockFifoRequest {
2833 command: BlockFifoCommand {
2834 opcode: BlockOpcode::Flush.into_primitive(),
2835 ..Default::default()
2836 },
2837 ..Default::default()
2838 })
2839 .await
2840 .unwrap();
2841
2842 reader.read_entries(&mut response).await.unwrap();
2843 assert_eq!(response.status, zx::sys::ZX_OK);
2844
2845 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2847 writer
2848 .write_entries(&BlockFifoRequest {
2849 command: BlockFifoCommand {
2850 opcode: BlockOpcode::Trim.into_primitive(),
2851 ..Default::default()
2852 },
2853 dev_offset: 7,
2854 length: 3,
2855 ..Default::default()
2856 })
2857 .await
2858 .unwrap();
2859
2860 reader.read_entries(&mut response).await.unwrap();
2861 assert_eq!(response.status, zx::sys::ZX_OK);
2862
2863 *expected_op.lock() = None;
2865 writer
2866 .write_entries(&BlockFifoRequest {
2867 command: BlockFifoCommand {
2868 opcode: BlockOpcode::Read.into_primitive(),
2869 ..Default::default()
2870 },
2871 vmoid: vmo_id.id,
2872 dev_offset: 19,
2873 length: 2,
2874 vmo_offset: 3,
2875 ..Default::default()
2876 })
2877 .await
2878 .unwrap();
2879
2880 reader.read_entries(&mut response).await.unwrap();
2881 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2882
2883 std::mem::drop(proxy);
2884 }
2885 );
2886 }
2887
2888 #[fuchsia::test]
2889 fn operation_map() {
2890 const BLOCK_SIZE: u32 = 512;
2891
2892 #[track_caller]
2893 fn expect_map_result(
2894 mut operation: Operation,
2895 mapping: Option<BlockOffsetMapping>,
2896 max_blocks: Option<NonZero<u32>>,
2897 expected_operations: Vec<Operation>,
2898 ) {
2899 let mut ops = vec![];
2900 while let Some(remainder) =
2901 operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2902 {
2903 ops.push(operation);
2904 operation = remainder;
2905 }
2906 ops.push(operation);
2907 assert_eq!(ops, expected_operations);
2908 }
2909
2910 expect_map_result(
2912 Operation::Read {
2913 device_block_offset: 10,
2914 block_count: 200,
2915 _unused: 0,
2916 vmo_offset: 0,
2917 options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2918 },
2919 None,
2920 None,
2921 vec![Operation::Read {
2922 device_block_offset: 10,
2923 block_count: 200,
2924 _unused: 0,
2925 vmo_offset: 0,
2926 options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2927 }],
2928 );
2929
2930 expect_map_result(
2932 Operation::Read {
2933 device_block_offset: 10,
2934 block_count: 200,
2935 _unused: 0,
2936 vmo_offset: 0,
2937 options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2938 },
2939 None,
2940 NonZero::new(120),
2941 vec![
2942 Operation::Read {
2943 device_block_offset: 10,
2944 block_count: 120,
2945 _unused: 0,
2946 vmo_offset: 0,
2947 options: ReadOptions {
2948 inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 },
2949 },
2950 },
2951 Operation::Read {
2952 device_block_offset: 130,
2953 block_count: 80,
2954 _unused: 0,
2955 vmo_offset: 120 * BLOCK_SIZE as u64,
2956 options: ReadOptions {
2957 inline_crypto: InlineCryptoOptions { dun: 1000 + 120, slot: 1 },
2959 },
2960 },
2961 ],
2962 );
2963 expect_map_result(
2964 Operation::Trim { device_block_offset: 10, block_count: 200 },
2965 None,
2966 NonZero::new(120),
2967 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2968 );
2969
2970 expect_map_result(
2972 Operation::Read {
2973 device_block_offset: 10,
2974 block_count: 200,
2975 _unused: 0,
2976 vmo_offset: 0,
2977 options: ReadOptions { inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 } },
2978 },
2979 Some(BlockOffsetMapping {
2980 source_block_offset: 10,
2981 target_block_offset: 100,
2982 length: 200,
2983 }),
2984 NonZero::new(120),
2985 vec![
2986 Operation::Read {
2987 device_block_offset: 100,
2988 block_count: 120,
2989 _unused: 0,
2990 vmo_offset: 0,
2991 options: ReadOptions {
2992 inline_crypto: InlineCryptoOptions { dun: 1000, slot: 1 },
2993 },
2994 },
2995 Operation::Read {
2996 device_block_offset: 220,
2997 block_count: 80,
2998 _unused: 0,
2999 vmo_offset: 120 * BLOCK_SIZE as u64,
3000 options: ReadOptions {
3001 inline_crypto: InlineCryptoOptions { dun: 1000 + 120, slot: 1 },
3002 },
3003 },
3004 ],
3005 );
3006 expect_map_result(
3007 Operation::Trim { device_block_offset: 10, block_count: 200 },
3008 Some(BlockOffsetMapping {
3009 source_block_offset: 10,
3010 target_block_offset: 100,
3011 length: 200,
3012 }),
3013 NonZero::new(120),
3014 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3015 );
3016 }
3017
3018 #[fuchsia::test]
3020 async fn test_pre_barrier_flush_failure() {
3021 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3022
3023 struct NoBarrierInterface;
3024 impl super::async_interface::Interface for NoBarrierInterface {
3025 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3026 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3027 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3029 block_range: Some(0..100),
3030 type_guid: [0; 16],
3031 instance_guid: [0; 16],
3032 name: "test".to_string(),
3033 flags: 0,
3034 }))
3035 }
3036 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3037 Ok(())
3038 }
3039 async fn read(
3040 &self,
3041 _: u64,
3042 _: u32,
3043 _: &Arc<zx::Vmo>,
3044 _: u64,
3045 _: ReadOptions,
3046 _: TraceFlowId,
3047 ) -> Result<(), zx::Status> {
3048 unreachable!()
3049 }
3050 async fn write(
3051 &self,
3052 _: u64,
3053 _: u32,
3054 _: &Arc<zx::Vmo>,
3055 _: u64,
3056 _: WriteOptions,
3057 _: TraceFlowId,
3058 ) -> Result<(), zx::Status> {
3059 panic!("Write should not be called");
3060 }
3061 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3062 Err(zx::Status::IO)
3063 }
3064 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3065 unreachable!()
3066 }
3067 }
3068
3069 futures::join!(
3070 async move {
3071 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3072 block_server.handle_requests(stream).await.unwrap();
3073 },
3074 async move {
3075 let (session_proxy, server) = fidl::endpoints::create_proxy();
3076 proxy.open_session(server).unwrap();
3077 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3078 let vmo_id = session_proxy
3079 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3080 .await
3081 .unwrap()
3082 .unwrap();
3083
3084 let mut fifo =
3085 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3086 let (mut reader, mut writer) = fifo.async_io();
3087
3088 writer
3089 .write_entries(&BlockFifoRequest {
3090 command: BlockFifoCommand {
3091 opcode: BlockOpcode::Write.into_primitive(),
3092 flags: BlockIoFlag::PRE_BARRIER.bits(),
3093 ..Default::default()
3094 },
3095 vmoid: vmo_id.id,
3096 length: 1,
3097 ..Default::default()
3098 })
3099 .await
3100 .unwrap();
3101
3102 let mut response = BlockFifoResponse::default();
3103 reader.read_entries(&mut response).await.unwrap();
3104 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3105 }
3106 );
3107 }
3108
3109 #[fuchsia::test]
3112 async fn test_post_barrier_write_failure() {
3113 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3114
3115 struct NoBarrierInterface;
3116 impl super::async_interface::Interface for NoBarrierInterface {
3117 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3118 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3119 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3121 block_range: Some(0..100),
3122 type_guid: [0; 16],
3123 instance_guid: [0; 16],
3124 name: "test".to_string(),
3125 flags: 0,
3126 }))
3127 }
3128 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3129 Ok(())
3130 }
3131 async fn read(
3132 &self,
3133 _: u64,
3134 _: u32,
3135 _: &Arc<zx::Vmo>,
3136 _: u64,
3137 _: ReadOptions,
3138 _: TraceFlowId,
3139 ) -> Result<(), zx::Status> {
3140 unreachable!()
3141 }
3142 async fn write(
3143 &self,
3144 _: u64,
3145 _: u32,
3146 _: &Arc<zx::Vmo>,
3147 _: u64,
3148 _: WriteOptions,
3149 _: TraceFlowId,
3150 ) -> Result<(), zx::Status> {
3151 Err(zx::Status::IO)
3152 }
3153 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3154 panic!("Flush should not be called")
3155 }
3156 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3157 unreachable!()
3158 }
3159 }
3160
3161 futures::join!(
3162 async move {
3163 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3164 block_server.handle_requests(stream).await.unwrap();
3165 },
3166 async move {
3167 let (session_proxy, server) = fidl::endpoints::create_proxy();
3168 proxy.open_session(server).unwrap();
3169 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3170 let vmo_id = session_proxy
3171 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3172 .await
3173 .unwrap()
3174 .unwrap();
3175
3176 let mut fifo =
3177 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3178 let (mut reader, mut writer) = fifo.async_io();
3179
3180 writer
3181 .write_entries(&BlockFifoRequest {
3182 command: BlockFifoCommand {
3183 opcode: BlockOpcode::Write.into_primitive(),
3184 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3185 ..Default::default()
3186 },
3187 vmoid: vmo_id.id,
3188 length: 1,
3189 ..Default::default()
3190 })
3191 .await
3192 .unwrap();
3193
3194 let mut response = BlockFifoResponse::default();
3195 reader.read_entries(&mut response).await.unwrap();
3196 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3197 }
3198 );
3199 }
3200}