1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37 Block(BlockInfo),
38 Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42 pub fn block_count(&self) -> Option<u64> {
43 match self {
44 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
45 Self::Partition(PartitionInfo { block_range, .. }) => {
46 block_range.as_ref().map(|range| range.end - range.start)
47 }
48 }
49 }
50
51 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
52 match self {
53 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
54 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
55 max_transfer_blocks.clone()
56 }
57 }
58 }
59
60 fn max_transfer_size(&self, block_size: u32) -> u32 {
61 if let Some(max_blocks) = self.max_transfer_blocks() {
62 max_blocks.get() * block_size
63 } else {
64 MAX_TRANSFER_UNBOUNDED
65 }
66 }
67}
68
69#[derive(Clone, Default)]
71pub struct BlockInfo {
72 pub device_flags: fblock::Flag,
73 pub block_count: u64,
74 pub max_transfer_blocks: Option<NonZero<u32>>,
75}
76
77#[derive(Clone, Default)]
79pub struct PartitionInfo {
80 pub device_flags: fblock::Flag,
82 pub max_transfer_blocks: Option<NonZero<u32>>,
83 pub block_range: Option<Range<u64>>,
87 pub type_guid: [u8; 16],
88 pub instance_guid: [u8; 16],
89 pub name: String,
90 pub flags: u64,
91}
92
93struct ActiveRequest<S> {
96 session: S,
97 group_or_request: GroupOrRequest,
98 trace_flow_id: TraceFlowId,
99 _epoch_guard: EpochGuard<'static>,
100 status: zx::Status,
101 count: u32,
102 req_id: Option<u32>,
103 decompression_info: Option<DecompressionInfo>,
104}
105
106struct DecompressionInfo {
107 compressed_range: Range<usize>,
109
110 uncompressed_range: Range<u64>,
112
113 bytes_so_far: u64,
114 mapping: Arc<VmoMapping>,
115 buffer: Option<Buffer<'static>>,
116}
117
118impl DecompressionInfo {
119 fn uncompressed_slice(&self) -> *mut [u8] {
121 std::ptr::slice_from_raw_parts_mut(
122 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
123 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
124 )
125 }
126}
127
128pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
129
130impl<S> Default for ActiveRequests<S> {
131 fn default() -> Self {
132 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
133 }
134}
135
136impl<S> ActiveRequests<S> {
137 fn complete_and_take_response(
138 &self,
139 request_id: RequestId,
140 status: zx::Status,
141 ) -> Option<(S, BlockFifoResponse)> {
142 self.0.lock().complete_and_take_response(request_id, status)
143 }
144
145 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
146 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
147 }
148}
149
150struct ActiveRequestsInner<S> {
151 requests: Slab<ActiveRequest<S>>,
152}
153
154impl<S> ActiveRequestsInner<S> {
156 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
158 let group = &mut self.requests[request_id.0];
159
160 group.count = group.count.checked_sub(1).unwrap();
161 if status != zx::Status::OK && group.status == zx::Status::OK {
162 group.status = status
163 }
164
165 fuchsia_trace::duration!(
166 c"storage",
167 c"block_server::finish_transaction",
168 "request_id" => request_id.0,
169 "group_completed" => group.count == 0,
170 "status" => status.into_raw());
171 if let Some(trace_flow_id) = group.trace_flow_id {
172 fuchsia_trace::flow_step!(
173 c"storage",
174 c"block_server::finish_request",
175 trace_flow_id.get().into()
176 );
177 }
178
179 if group.count == 0
180 && group.status == zx::Status::OK
181 && let Some(info) = &mut group.decompression_info
182 {
183 thread_local! {
184 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
185 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
186 }
187 DECOMPRESSOR.with(|decompressor| {
188 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
190 let mut decompressor = decompressor.borrow_mut();
191 if let Err(error) = decompressor.decompress_to_buffer(
192 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
193 target_slice,
194 ) {
195 log::warn!(error:?; "Decompression error");
196 group.status = zx::Status::IO_DATA_INTEGRITY;
197 };
198 });
199 }
200 }
201
202 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
204 let group = &self.requests[request_id.0];
205 match group.req_id {
206 Some(reqid) if group.count == 0 => {
207 let group = self.requests.remove(request_id.0);
208 Some((
209 group.session,
210 BlockFifoResponse {
211 status: group.status.into_raw(),
212 reqid,
213 group: group.group_or_request.group_id().unwrap_or(0),
214 ..Default::default()
215 },
216 ))
217 }
218 _ => None,
219 }
220 }
221
222 fn complete_and_take_response(
224 &mut self,
225 request_id: RequestId,
226 status: zx::Status,
227 ) -> Option<(S, BlockFifoResponse)> {
228 self.complete(request_id, status);
229 self.take_response(request_id)
230 }
231}
232
233pub struct BlockServer<SM> {
236 block_size: u32,
237 session_manager: Arc<SM>,
238}
239
240#[derive(Debug)]
242pub struct BlockOffsetMapping {
243 source_block_offset: u64,
244 target_block_offset: u64,
245 length: u64,
246}
247
248impl BlockOffsetMapping {
249 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
250 blocks.0 >= self.source_block_offset
251 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
252 }
253}
254
255impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
256 type Error = zx::Status;
257
258 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
259 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
260 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
261 Ok(Self {
262 source_block_offset: wire.source_block_offset,
263 target_block_offset: wire.target_block_offset,
264 length: wire.length,
265 })
266 }
267}
268
269pub struct OffsetMap {
271 mapping: Option<BlockOffsetMapping>,
272 max_transfer_blocks: Option<NonZero<u32>>,
273}
274
275impl OffsetMap {
276 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
278 Self { mapping: Some(mapping), max_transfer_blocks }
279 }
280
281 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
283 Self { mapping: None, max_transfer_blocks }
284 }
285
286 pub fn is_empty(&self) -> bool {
287 self.mapping.is_none()
288 }
289
290 fn mapping(&self) -> Option<&BlockOffsetMapping> {
291 self.mapping.as_ref()
292 }
293
294 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
295 self.max_transfer_blocks
296 }
297}
298
299pub trait SessionManager: 'static {
302 const SUPPORTS_DECOMPRESSION: bool;
303
304 type Session;
305
306 fn on_attach_vmo(
307 self: Arc<Self>,
308 vmo: &Arc<zx::Vmo>,
309 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
310
311 fn open_session(
316 self: Arc<Self>,
317 stream: fblock::SessionRequestStream,
318 offset_map: OffsetMap,
319 block_size: u32,
320 ) -> impl Future<Output = Result<(), Error>> + Send;
321
322 fn get_info(&self) -> Cow<'_, DeviceInfo>;
324
325 fn get_volume_info(
327 &self,
328 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
329 {
330 async { Err(zx::Status::NOT_SUPPORTED) }
331 }
332
333 fn query_slices(
335 &self,
336 _start_slices: &[u64],
337 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
338 async { Err(zx::Status::NOT_SUPPORTED) }
339 }
340
341 fn extend(
343 &self,
344 _start_slice: u64,
345 _slice_count: u64,
346 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
347 async { Err(zx::Status::NOT_SUPPORTED) }
348 }
349
350 fn shrink(
352 &self,
353 _start_slice: u64,
354 _slice_count: u64,
355 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
356 async { Err(zx::Status::NOT_SUPPORTED) }
357 }
358
359 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
361}
362
363pub trait IntoSessionManager {
364 type SM: SessionManager;
365
366 fn into_session_manager(self) -> Arc<Self::SM>;
367}
368
369impl<SM: SessionManager> BlockServer<SM> {
370 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
371 Self { block_size, session_manager: session_manager.into_session_manager() }
372 }
373
374 pub fn session_manager(&self) -> &SM {
375 self.session_manager.as_ref()
376 }
377
378 pub async fn handle_requests(
380 &self,
381 mut requests: fvolume::VolumeRequestStream,
382 ) -> Result<(), Error> {
383 let scope = fasync::Scope::new();
384 loop {
385 match requests.try_next().await {
386 Ok(Some(request)) => {
387 if let Some(session) = self.handle_request(request).await? {
388 scope.spawn(session.map(|_| ()));
389 }
390 }
391 Ok(None) => break,
392 Err(err) => log::warn!(err:?; "Invalid request"),
393 }
394 }
395 scope.await;
396 Ok(())
397 }
398
399 async fn handle_request(
402 &self,
403 request: fvolume::VolumeRequest,
404 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
405 match request {
406 fvolume::VolumeRequest::GetInfo { responder } => {
407 let info = self.device_info();
408 let max_transfer_size = info.max_transfer_size(self.block_size);
409 let (block_count, mut flags) = match info.as_ref() {
410 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
411 (*block_count, *device_flags)
412 }
413 DeviceInfo::Partition(partition_info) => {
414 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
415 range.end - range.start
416 } else {
417 let volume_info = self.session_manager.get_volume_info().await?;
418 volume_info.0.slice_size * volume_info.1.partition_slice_count
419 / self.block_size as u64
420 };
421 (block_count, partition_info.device_flags)
422 }
423 };
424 if SM::SUPPORTS_DECOMPRESSION {
425 flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
426 }
427 responder.send(Ok(&fblock::BlockInfo {
428 block_count,
429 block_size: self.block_size,
430 max_transfer_size,
431 flags,
432 }))?;
433 }
434 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
435 let info = self.device_info();
436 return Ok(Some(self.session_manager.clone().open_session(
437 session.into_stream(),
438 OffsetMap::empty(info.max_transfer_blocks()),
439 self.block_size,
440 )));
441 }
442 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
443 session,
444 mapping,
445 control_handle: _,
446 } => {
447 let info = self.device_info();
448 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
449 Ok(m) => m,
450 Err(status) => {
451 session.close_with_epitaph(status)?;
452 return Ok(None);
453 }
454 };
455 if let Some(max) = info.block_count() {
456 if initial_mapping.target_block_offset + initial_mapping.length > max {
457 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
458 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
459 return Ok(None);
460 }
461 }
462 return Ok(Some(self.session_manager.clone().open_session(
463 session.into_stream(),
464 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
465 self.block_size,
466 )));
467 }
468 fvolume::VolumeRequest::GetTypeGuid { responder } => {
469 let info = self.device_info();
470 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
471 let mut guid =
472 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
473 guid.value.copy_from_slice(&partition_info.type_guid);
474 responder.send(zx::sys::ZX_OK, Some(&guid))?;
475 } else {
476 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
477 }
478 }
479 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
480 let info = self.device_info();
481 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
482 let mut guid =
483 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
484 guid.value.copy_from_slice(&partition_info.instance_guid);
485 responder.send(zx::sys::ZX_OK, Some(&guid))?;
486 } else {
487 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
488 }
489 }
490 fvolume::VolumeRequest::GetName { responder } => {
491 let info = self.device_info();
492 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
493 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
494 } else {
495 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
496 }
497 }
498 fvolume::VolumeRequest::GetMetadata { responder } => {
499 let info = self.device_info();
500 if let DeviceInfo::Partition(info) = info.as_ref() {
501 let mut type_guid =
502 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
503 type_guid.value.copy_from_slice(&info.type_guid);
504 let mut instance_guid =
505 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
506 instance_guid.value.copy_from_slice(&info.instance_guid);
507 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
508 name: Some(info.name.clone()),
509 type_guid: Some(type_guid),
510 instance_guid: Some(instance_guid),
511 start_block_offset: info.block_range.as_ref().map(|range| range.start),
512 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
513 flags: Some(info.flags),
514 ..Default::default()
515 }))?;
516 } else {
517 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
518 }
519 }
520 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
521 match self.session_manager.query_slices(&start_slices).await {
522 Ok(mut results) => {
523 let results_len = results.len();
524 assert!(results_len <= 16);
525 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
526 responder.send(
527 zx::sys::ZX_OK,
528 &results.try_into().unwrap(),
529 results_len as u64,
530 )?;
531 }
532 Err(s) => {
533 responder.send(
534 s.into_raw(),
535 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
536 0,
537 )?;
538 }
539 }
540 }
541 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
542 match self.session_manager.get_volume_info().await {
543 Ok((manager_info, volume_info)) => {
544 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
545 }
546 Err(s) => responder.send(s.into_raw(), None, None)?,
547 }
548 }
549 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
550 responder.send(
551 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
552 .into_raw(),
553 )?;
554 }
555 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
556 responder.send(
557 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
558 .into_raw(),
559 )?;
560 }
561 fvolume::VolumeRequest::Destroy { responder, .. } => {
562 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
563 }
564 }
565 Ok(None)
566 }
567
568 fn device_info(&self) -> Cow<'_, DeviceInfo> {
569 self.session_manager.get_info()
570 }
571}
572
573struct SessionHelper<SM: SessionManager> {
574 session_manager: Arc<SM>,
575 offset_map: OffsetMap,
576 block_size: u32,
577 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
578 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
579}
580
581struct VmoMapping {
582 base: usize,
583 size: usize,
584}
585
586impl VmoMapping {
587 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
588 let size = vmo.get_size().unwrap() as usize;
589 Ok(Arc::new(Self {
590 base: fuchsia_runtime::vmar_root_self()
591 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
592 .inspect_err(|error| {
593 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
594 })?,
595 size,
596 }))
597 }
598}
599
600impl Drop for VmoMapping {
601 fn drop(&mut self) {
602 unsafe {
604 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
605 }
606 }
607}
608
609impl<SM: SessionManager> SessionHelper<SM> {
610 fn new(
611 session_manager: Arc<SM>,
612 offset_map: OffsetMap,
613 block_size: u32,
614 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
615 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
616 Ok((
617 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
618 fifo,
619 ))
620 }
621
622 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
623 match request {
624 fblock::SessionRequest::GetFifo { responder } => {
625 let rights = zx::Rights::TRANSFER
626 | zx::Rights::READ
627 | zx::Rights::WRITE
628 | zx::Rights::SIGNAL
629 | zx::Rights::WAIT;
630 match self.peer_fifo.duplicate_handle(rights) {
631 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
632 Err(s) => responder.send(Err(s.into_raw()))?,
633 }
634 Ok(())
635 }
636 fblock::SessionRequest::AttachVmo { vmo, responder } => {
637 let vmo = Arc::new(vmo);
638 let vmo_id = {
639 let mut vmos = self.vmos.lock();
640 if vmos.len() == u16::MAX as usize {
641 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
642 return Ok(());
643 } else {
644 let vmo_id = match vmos.last_entry() {
645 None => 1,
646 Some(o) => {
647 o.key().checked_add(1).unwrap_or_else(|| {
648 let mut vmo_id = 1;
649 for (&id, _) in &*vmos {
651 if id > vmo_id {
652 break;
653 }
654 vmo_id = id + 1;
655 }
656 vmo_id
657 })
658 }
659 };
660 vmos.insert(vmo_id, (vmo.clone(), None));
661 vmo_id
662 }
663 };
664 self.session_manager.clone().on_attach_vmo(&vmo).await?;
665 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
666 Ok(())
667 }
668 fblock::SessionRequest::Close { responder } => {
669 responder.send(Ok(()))?;
670 Err(anyhow!("Closed"))
671 }
672 }
673 }
674
675 fn decode_fifo_request(
677 &self,
678 session: SM::Session,
679 request: &BlockFifoRequest,
680 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
681 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
682
683 let request_bytes = request.length as u64 * self.block_size as u64;
684
685 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
686 .ok_or(zx::Status::INVALID_ARGS)
687 .and_then(|code| {
688 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
689 if code != BlockOpcode::Read {
690 return Err(zx::Status::INVALID_ARGS);
691 }
692 if !SM::SUPPORTS_DECOMPRESSION {
693 return Err(zx::Status::NOT_SUPPORTED);
694 }
695 }
696 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
697 if request.length == 0 {
698 return Err(zx::Status::INVALID_ARGS);
699 }
700 if request.dev_offset.checked_add(request.length as u64).is_none()
702 || (code != BlockOpcode::Trim
703 && request_bytes.checked_add(request.vmo_offset).is_none())
704 {
705 return Err(zx::Status::OUT_OF_RANGE);
706 }
707 }
708 Ok(match code {
709 BlockOpcode::Read => Operation::Read {
710 device_block_offset: request.dev_offset,
711 block_count: request.length,
712 _unused: 0,
713 vmo_offset: request
714 .vmo_offset
715 .checked_mul(self.block_size as u64)
716 .ok_or(zx::Status::OUT_OF_RANGE)?,
717 options: ReadOptions {
718 inline_crypto_options: InlineCryptoOptions {
719 dun: request.dun,
720 slot: request.slot,
721 },
722 },
723 },
724 BlockOpcode::Write => {
725 let mut options = WriteOptions {
726 inline_crypto_options: InlineCryptoOptions {
727 dun: request.dun,
728 slot: request.slot,
729 },
730 ..WriteOptions::default()
731 };
732 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
733 options.flags |= WriteFlags::FORCE_ACCESS;
734 }
735 if flags.contains(BlockIoFlag::PRE_BARRIER) {
736 options.flags |= WriteFlags::PRE_BARRIER;
737 }
738 Operation::Write {
739 device_block_offset: request.dev_offset,
740 block_count: request.length,
741 _unused: 0,
742 options,
743 vmo_offset: request
744 .vmo_offset
745 .checked_mul(self.block_size as u64)
746 .ok_or(zx::Status::OUT_OF_RANGE)?,
747 }
748 }
749 BlockOpcode::Flush => Operation::Flush,
750 BlockOpcode::Trim => Operation::Trim {
751 device_block_offset: request.dev_offset,
752 block_count: request.length,
753 },
754 BlockOpcode::CloseVmo => Operation::CloseVmo,
755 })
756 });
757
758 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
759 GroupOrRequest::Group(request.group)
760 } else {
761 GroupOrRequest::Request(request.reqid)
762 };
763
764 let mut active_requests = self.session_manager.active_requests().0.lock();
765 let mut request_id = None;
766
767 if group_or_request.is_group() {
778 for (key, group) in &mut active_requests.requests {
782 if group.group_or_request == group_or_request {
783 if group.req_id.is_some() {
784 if group.status == zx::Status::OK {
786 group.status = zx::Status::INVALID_ARGS;
787 }
788 return Err(None);
790 }
791 if group.status == zx::Status::OK
793 && let Some(info) = &mut group.decompression_info
794 {
795 if let Ok(Operation::Read {
796 device_block_offset,
797 mut block_count,
798 options,
799 vmo_offset: 0,
800 ..
801 }) = operation
802 {
803 let remaining_bytes = info
804 .compressed_range
805 .end
806 .next_multiple_of(self.block_size as usize)
807 as u64
808 - info.bytes_so_far;
809 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
810 || request.total_compressed_bytes != 0
811 || request.uncompressed_bytes != 0
812 || request.compressed_prefix_bytes != 0
813 || (flags.contains(BlockIoFlag::GROUP_LAST)
814 && info.bytes_so_far + request_bytes
815 < info.compressed_range.end as u64)
816 || (!flags.contains(BlockIoFlag::GROUP_LAST)
817 && request_bytes >= remaining_bytes)
818 {
819 group.status = zx::Status::INVALID_ARGS;
820 } else {
821 if request_bytes > remaining_bytes {
830 block_count = (remaining_bytes / self.block_size as u64) as u32;
831 }
832
833 operation = Ok(Operation::ContinueDecompressedRead {
834 offset: info.bytes_so_far,
835 device_block_offset,
836 block_count,
837 options,
838 });
839
840 info.bytes_so_far += block_count as u64 * self.block_size as u64;
841 }
842 } else {
843 group.status = zx::Status::INVALID_ARGS;
844 }
845 }
846 if flags.contains(BlockIoFlag::GROUP_LAST) {
847 group.req_id = Some(request.reqid);
848 if group.status != zx::Status::OK {
851 operation = Err(group.status);
852 }
853 } else if group.status != zx::Status::OK {
854 return Err(None);
857 }
858 request_id = Some(RequestId(key));
859 group.count += 1;
860 break;
861 }
862 }
863 }
864
865 let is_single_request =
866 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
867
868 let mut decompression_info = None;
869 let vmo = match operation {
870 Ok(Operation::Read {
871 device_block_offset,
872 mut block_count,
873 options,
874 vmo_offset,
875 ..
876 }) => match self.vmos.lock().get_mut(&request.vmoid) {
877 Some((vmo, mapping)) => {
878 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
879 let compressed_range = request.compressed_prefix_bytes as usize
880 ..request.compressed_prefix_bytes as usize
881 + request.total_compressed_bytes as usize;
882 let required_buffer_size =
883 compressed_range.end.next_multiple_of(self.block_size as usize);
884
885 if compressed_range.start >= compressed_range.end
887 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
888 || (is_single_request && request_bytes < compressed_range.end as u64)
889 || (!is_single_request && request_bytes >= required_buffer_size as u64)
890 {
891 Err(zx::Status::INVALID_ARGS)
892 } else {
893 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
900 block_count =
901 (required_buffer_size / self.block_size as usize) as u32;
902 required_buffer_size as u64
903 } else {
904 request_bytes
905 };
906
907 match mapping {
909 Some(mapping) => Ok(mapping.clone()),
910 None => {
911 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
912 }
913 }
914 .and_then(|mapping| {
915 if vmo_offset
918 .checked_add(request.uncompressed_bytes as u64)
919 .is_some_and(|end| end <= mapping.size as u64)
920 {
921 Ok(mapping)
922 } else {
923 Err(zx::Status::OUT_OF_RANGE)
924 }
925 })
926 .map(|mapping| {
927 operation = Ok(Operation::StartDecompressedRead {
932 required_buffer_size,
933 device_block_offset,
934 block_count,
935 options,
936 });
937 decompression_info = Some(DecompressionInfo {
940 compressed_range,
941 bytes_so_far,
942 mapping,
943 uncompressed_range: vmo_offset
944 ..vmo_offset + request.uncompressed_bytes as u64,
945 buffer: None,
946 });
947 None
948 })
949 }
950 } else {
951 Ok(Some(vmo.clone()))
952 }
953 }
954 None => Err(zx::Status::IO),
955 },
956 Ok(Operation::Write { .. }) => self
957 .vmos
958 .lock()
959 .get(&request.vmoid)
960 .cloned()
961 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
962 Ok(Operation::CloseVmo) => {
963 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
964 let vmo_clone = vmo.clone();
965 Epoch::global().defer(move || drop(vmo_clone));
968 Ok(Some(vmo))
969 })
970 }
971 _ => Ok(None),
972 }
973 .unwrap_or_else(|e| {
974 operation = Err(e);
975 None
976 });
977
978 let trace_flow_id = NonZero::new(request.trace_flow_id);
979 let request_id = request_id.unwrap_or_else(|| {
980 RequestId(active_requests.requests.insert(ActiveRequest {
981 session,
982 group_or_request,
983 trace_flow_id,
984 _epoch_guard: Epoch::global().guard(),
985 status: zx::Status::OK,
986 count: 1,
987 req_id: is_single_request.then_some(request.reqid),
988 decompression_info,
989 }))
990 });
991
992 Ok(DecodedRequest {
993 request_id,
994 trace_flow_id,
995 operation: operation.map_err(|status| {
996 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
997 })?,
998 vmo,
999 })
1000 }
1001
1002 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1003 std::mem::take(&mut *self.vmos.lock())
1004 }
1005
1006 fn map_request(
1008 &self,
1009 mut request: DecodedRequest,
1010 active_request: &mut ActiveRequest<SM::Session>,
1011 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1012 if active_request.status != zx::Status::OK {
1013 return Err(zx::Status::BAD_STATE);
1014 }
1015 let mapping = self.offset_map.mapping();
1016 match (mapping, request.operation.blocks()) {
1017 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1018 return Err(zx::Status::OUT_OF_RANGE);
1019 }
1020 _ => {}
1021 }
1022 let remainder = request.operation.map(
1023 self.offset_map.mapping(),
1024 self.offset_map.max_transfer_blocks(),
1025 self.block_size,
1026 );
1027 if remainder.is_some() {
1028 active_request.count += 1;
1029 }
1030 static CACHE: AtomicU64 = AtomicU64::new(0);
1031 if let Some(context) =
1032 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
1033 {
1034 use fuchsia_trace::ArgValue;
1035 let trace_args = [
1036 ArgValue::of("request_id", request.request_id.0),
1037 ArgValue::of("opcode", request.operation.trace_label()),
1038 ];
1039 let _scope = fuchsia_trace::duration(
1040 c"storage",
1041 c"block_server::start_transaction",
1042 &trace_args,
1043 );
1044 if let Some(trace_flow_id) = active_request.trace_flow_id {
1045 fuchsia_trace::flow_step(
1046 &context,
1047 c"block_server::start_transaction",
1048 trace_flow_id.get().into(),
1049 &[],
1050 );
1051 }
1052 }
1053 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1054 Ok((request, remainder))
1055 }
1056
1057 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1058 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1059 }
1060}
1061
1062#[repr(transparent)]
1063#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1064pub struct RequestId(usize);
1065
1066#[derive(Clone, Debug)]
1067struct DecodedRequest {
1068 request_id: RequestId,
1069 trace_flow_id: TraceFlowId,
1070 operation: Operation,
1071 vmo: Option<Arc<zx::Vmo>>,
1072}
1073
1074pub type WriteFlags = block_protocol::WriteFlags;
1076pub type WriteOptions = block_protocol::WriteOptions;
1077pub type ReadOptions = block_protocol::ReadOptions;
1078
1079#[repr(C)]
1080#[derive(Clone, Debug, PartialEq, Eq)]
1081pub enum Operation {
1082 Read {
1087 device_block_offset: u64,
1088 block_count: u32,
1089 _unused: u32,
1090 vmo_offset: u64,
1091 options: ReadOptions,
1092 },
1093 Write {
1094 device_block_offset: u64,
1095 block_count: u32,
1096 _unused: u32,
1097 vmo_offset: u64,
1098 options: WriteOptions,
1099 },
1100 Flush,
1101 Trim {
1102 device_block_offset: u64,
1103 block_count: u32,
1104 },
1105 CloseVmo,
1107 StartDecompressedRead {
1108 required_buffer_size: usize,
1109 device_block_offset: u64,
1110 block_count: u32,
1111 options: ReadOptions,
1112 },
1113 ContinueDecompressedRead {
1114 offset: u64,
1115 device_block_offset: u64,
1116 block_count: u32,
1117 options: ReadOptions,
1118 },
1119}
1120
1121impl Operation {
1122 fn trace_label(&self) -> &'static str {
1123 match self {
1124 Operation::Read { .. } => "read",
1125 Operation::Write { .. } => "write",
1126 Operation::Flush { .. } => "flush",
1127 Operation::Trim { .. } => "trim",
1128 Operation::CloseVmo { .. } => "close_vmo",
1129 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1130 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1131 }
1132 }
1133
1134 fn blocks(&self) -> Option<(u64, u32)> {
1136 match self {
1137 Operation::Read { device_block_offset, block_count, .. }
1138 | Operation::Write { device_block_offset, block_count, .. }
1139 | Operation::Trim { device_block_offset, block_count, .. } => {
1140 Some((*device_block_offset, *block_count))
1141 }
1142 _ => None,
1143 }
1144 }
1145
1146 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1148 match self {
1149 Operation::Read { device_block_offset, block_count, .. }
1150 | Operation::Write { device_block_offset, block_count, .. }
1151 | Operation::Trim { device_block_offset, block_count, .. } => {
1152 Some((device_block_offset, block_count))
1153 }
1154 _ => None,
1155 }
1156 }
1157
1158 fn map(
1161 &mut self,
1162 mapping: Option<&BlockOffsetMapping>,
1163 max_blocks: Option<NonZero<u32>>,
1164 block_size: u32,
1165 ) -> Option<Self> {
1166 let mut max = match self {
1167 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1168 _ => None,
1169 };
1170 let (offset, length) = self.blocks_mut()?;
1171 let orig_offset = *offset;
1172 if let Some(mapping) = mapping {
1173 let delta = *offset - mapping.source_block_offset;
1174 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1175 *offset = mapping.target_block_offset + delta;
1176 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1177 max = match max {
1178 None => Some(mapping_max),
1179 Some(m) => Some(std::cmp::min(m, mapping_max)),
1180 };
1181 };
1182 if let Some(max) = max {
1183 if *length as u64 > max {
1184 let rem = (*length as u64 - max) as u32;
1185 *length = max as u32;
1186 return Some(match self {
1187 Operation::Read {
1188 device_block_offset: _,
1189 block_count: _,
1190 vmo_offset,
1191 _unused,
1192 options,
1193 } => Operation::Read {
1194 device_block_offset: orig_offset + max,
1195 block_count: rem,
1196 vmo_offset: *vmo_offset + max * block_size as u64,
1197 _unused: *_unused,
1198 options: *options,
1199 },
1200 Operation::Write {
1201 device_block_offset: _,
1202 block_count: _,
1203 _unused,
1204 vmo_offset,
1205 options,
1206 } => {
1207 let new_options = WriteOptions {
1209 flags: options.flags & !WriteFlags::PRE_BARRIER,
1210 ..*options
1211 };
1212
1213 Operation::Write {
1214 device_block_offset: orig_offset + max,
1215 block_count: rem,
1216 _unused: *_unused,
1217 vmo_offset: *vmo_offset + max * block_size as u64,
1218 options: new_options,
1219 }
1220 }
1221 Operation::Trim { device_block_offset: _, block_count: _ } => {
1222 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1223 }
1224 _ => unreachable!(),
1225 });
1226 }
1227 }
1228 None
1229 }
1230}
1231
1232#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1233pub enum GroupOrRequest {
1234 Group(u16),
1235 Request(u32),
1236}
1237
1238impl GroupOrRequest {
1239 fn is_group(&self) -> bool {
1240 matches!(self, Self::Group(_))
1241 }
1242
1243 fn group_id(&self) -> Option<u16> {
1244 match self {
1245 Self::Group(id) => Some(*id),
1246 Self::Request(_) => None,
1247 }
1248 }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253 use super::{
1254 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1255 TraceFlowId,
1256 };
1257 use assert_matches::assert_matches;
1258 use block_protocol::{
1259 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1260 WriteOptions,
1261 };
1262 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1263 use fuchsia_sync::Mutex;
1264 use futures::FutureExt as _;
1265 use futures::channel::oneshot;
1266 use futures::future::BoxFuture;
1267 use std::borrow::Cow;
1268 use std::future::poll_fn;
1269 use std::num::NonZero;
1270 use std::pin::pin;
1271 use std::sync::Arc;
1272 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1273 use std::task::{Context, Poll};
1274 use zx::{AsHandleRef as _, HandleBased as _};
1275 use {
1276 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1277 fuchsia_async as fasync,
1278 };
1279
1280 #[derive(Default)]
1281 struct MockInterface {
1282 read_hook: Option<
1283 Box<
1284 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1285 + Send
1286 + Sync,
1287 >,
1288 >,
1289 write_hook:
1290 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1291 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1292 }
1293
1294 impl super::async_interface::Interface for MockInterface {
1295 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1296 Ok(())
1297 }
1298
1299 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1300 Cow::Owned(test_device_info())
1301 }
1302
1303 async fn read(
1304 &self,
1305 device_block_offset: u64,
1306 block_count: u32,
1307 vmo: &Arc<zx::Vmo>,
1308 vmo_offset: u64,
1309 _opts: ReadOptions,
1310 _trace_flow_id: TraceFlowId,
1311 ) -> Result<(), zx::Status> {
1312 if let Some(read_hook) = &self.read_hook {
1313 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1314 } else {
1315 unimplemented!();
1316 }
1317 }
1318
1319 async fn write(
1320 &self,
1321 device_block_offset: u64,
1322 _block_count: u32,
1323 _vmo: &Arc<zx::Vmo>,
1324 _vmo_offset: u64,
1325 _opts: WriteOptions,
1326 _trace_flow_id: TraceFlowId,
1327 ) -> Result<(), zx::Status> {
1328 if let Some(write_hook) = &self.write_hook {
1329 write_hook(device_block_offset).await
1330 } else {
1331 unimplemented!();
1332 }
1333 }
1334
1335 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1336 unreachable!();
1337 }
1338
1339 fn barrier(&self) -> Result<(), zx::Status> {
1340 if let Some(barrier_hook) = &self.barrier_hook {
1341 barrier_hook()
1342 } else {
1343 unimplemented!();
1344 }
1345 }
1346
1347 async fn trim(
1348 &self,
1349 _device_block_offset: u64,
1350 _block_count: u32,
1351 _trace_flow_id: TraceFlowId,
1352 ) -> Result<(), zx::Status> {
1353 unreachable!();
1354 }
1355
1356 async fn get_volume_info(
1357 &self,
1358 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1359 let () = std::future::pending().await;
1361 unreachable!();
1362 }
1363 }
1364
1365 const BLOCK_SIZE: u32 = 512;
1366 const MAX_TRANSFER_BLOCKS: u32 = 10;
1367
1368 fn test_device_info() -> DeviceInfo {
1369 DeviceInfo::Partition(PartitionInfo {
1370 device_flags: fblock::Flag::READONLY,
1371 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1372 block_range: Some(0..100),
1373 type_guid: [1; 16],
1374 instance_guid: [2; 16],
1375 name: "foo".to_string(),
1376 flags: 0xabcd,
1377 })
1378 }
1379
1380 #[fuchsia::test]
1381 async fn test_split_request_only_issues_single_barrier() {
1382 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1383 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1384 let barrier_called = Arc::new(AtomicU32::new(0));
1385 let barrier_called_clone = barrier_called.clone();
1386
1387 futures::join!(
1388 async move {
1389 let block_server = BlockServer::new(
1390 BLOCK_SIZE,
1391 Arc::new(MockInterface {
1392 barrier_hook: Some(Box::new(move || {
1393 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1394 Ok(())
1395 })),
1396 write_hook: Some(Box::new(move |_device_block_offset| {
1397 Box::pin(async move { Ok(()) })
1398 })),
1399 ..MockInterface::default()
1400 }),
1401 );
1402 block_server.handle_requests(stream).await.unwrap();
1403 },
1404 async move {
1405 let (session_proxy, server) = fidl::endpoints::create_proxy();
1406
1407 proxy.open_session(server).unwrap();
1408
1409 let vmo_id = session_proxy
1410 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1411 .await
1412 .unwrap()
1413 .unwrap();
1414 assert_ne!(vmo_id.id, 0);
1415
1416 let mut fifo =
1417 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1418 let (mut reader, mut writer) = fifo.async_io();
1419 writer
1422 .write_entries(&BlockFifoRequest {
1423 command: BlockFifoCommand {
1424 opcode: BlockOpcode::Write.into_primitive(),
1425 flags: BlockIoFlag::PRE_BARRIER.bits(),
1426 ..Default::default()
1427 },
1428 vmoid: vmo_id.id,
1429 dev_offset: 0,
1430 length: MAX_TRANSFER_BLOCKS + 1,
1431 vmo_offset: 6,
1432 ..Default::default()
1433 })
1434 .await
1435 .unwrap();
1436
1437 let mut response = BlockFifoResponse::default();
1438 reader.read_entries(&mut response).await.unwrap();
1439 assert_eq!(response.status, zx::sys::ZX_OK);
1440
1441 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1442
1443 std::mem::drop(proxy);
1444 }
1445 );
1446 }
1447
1448 #[fuchsia::test]
1449 async fn test_barriers_not_supported() {
1450 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1451 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1452
1453 futures::join!(
1454 async move {
1455 let block_server = BlockServer::new(
1456 BLOCK_SIZE,
1457 Arc::new(MockInterface {
1458 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1459 write_hook: Some(Box::new(move |_device_block_offset| {
1460 Box::pin(async move { Ok(()) })
1461 })),
1462 ..MockInterface::default()
1463 }),
1464 );
1465 block_server.handle_requests(stream).await.unwrap();
1466 },
1467 async move {
1468 let (session_proxy, server) = fidl::endpoints::create_proxy();
1469
1470 proxy.open_session(server).unwrap();
1471
1472 let vmo_id = session_proxy
1473 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1474 .await
1475 .unwrap()
1476 .unwrap();
1477 assert_ne!(vmo_id.id, 0);
1478
1479 let mut fifo =
1480 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1481 let (mut reader, mut writer) = fifo.async_io();
1482
1483 writer
1486 .write_entries(&BlockFifoRequest {
1487 command: BlockFifoCommand {
1488 opcode: BlockOpcode::Write.into_primitive(),
1489 flags: BlockIoFlag::PRE_BARRIER.bits(),
1490 ..Default::default()
1491 },
1492 vmoid: vmo_id.id,
1493 dev_offset: 0,
1494 length: MAX_TRANSFER_BLOCKS + 1,
1495 vmo_offset: 6,
1496 ..Default::default()
1497 })
1498 .await
1499 .unwrap();
1500
1501 let mut response = BlockFifoResponse::default();
1502 reader.read_entries(&mut response).await.unwrap();
1503 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1504
1505 std::mem::drop(proxy);
1506 }
1507 );
1508 }
1509
1510 #[fuchsia::test]
1511 async fn test_barriers_ordering() {
1512 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1513 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1514 let barrier_called = Arc::new(AtomicBool::new(false));
1515
1516 futures::join!(
1517 async move {
1518 let barrier_called_clone = barrier_called.clone();
1519 let block_server = BlockServer::new(
1520 BLOCK_SIZE,
1521 Arc::new(MockInterface {
1522 barrier_hook: Some(Box::new(move || {
1523 barrier_called.store(true, Ordering::Relaxed);
1524 Ok(())
1525 })),
1526 write_hook: Some(Box::new(move |device_block_offset| {
1527 let barrier_called = barrier_called_clone.clone();
1528 Box::pin(async move {
1529 if device_block_offset % 2 == 0 {
1531 fasync::Timer::new(fasync::MonotonicInstant::after(
1532 zx::MonotonicDuration::from_millis(200),
1533 ))
1534 .await;
1535 }
1536 assert!(barrier_called.load(Ordering::Relaxed));
1537 Ok(())
1538 })
1539 })),
1540 ..MockInterface::default()
1541 }),
1542 );
1543 block_server.handle_requests(stream).await.unwrap();
1544 },
1545 async move {
1546 let (session_proxy, server) = fidl::endpoints::create_proxy();
1547
1548 proxy.open_session(server).unwrap();
1549
1550 let vmo_id = session_proxy
1551 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1552 .await
1553 .unwrap()
1554 .unwrap();
1555 assert_ne!(vmo_id.id, 0);
1556
1557 let mut fifo =
1558 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1559 let (mut reader, mut writer) = fifo.async_io();
1560
1561 writer
1562 .write_entries(&BlockFifoRequest {
1563 command: BlockFifoCommand {
1564 opcode: BlockOpcode::Write.into_primitive(),
1565 flags: BlockIoFlag::PRE_BARRIER.bits(),
1566 ..Default::default()
1567 },
1568 vmoid: vmo_id.id,
1569 dev_offset: 0,
1570 length: 5,
1571 vmo_offset: 6,
1572 ..Default::default()
1573 })
1574 .await
1575 .unwrap();
1576
1577 for i in 0..10 {
1578 writer
1579 .write_entries(&BlockFifoRequest {
1580 command: BlockFifoCommand {
1581 opcode: BlockOpcode::Write.into_primitive(),
1582 ..Default::default()
1583 },
1584 vmoid: vmo_id.id,
1585 dev_offset: i + 1,
1586 length: 5,
1587 vmo_offset: 6,
1588 ..Default::default()
1589 })
1590 .await
1591 .unwrap();
1592 }
1593 for _ in 0..11 {
1594 let mut response = BlockFifoResponse::default();
1595 reader.read_entries(&mut response).await.unwrap();
1596 assert_eq!(response.status, zx::sys::ZX_OK);
1597 }
1598
1599 std::mem::drop(proxy);
1600 }
1601 );
1602 }
1603
1604 #[fuchsia::test]
1605 async fn test_info() {
1606 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1607
1608 futures::join!(
1609 async {
1610 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1611 block_server.handle_requests(stream).await.unwrap();
1612 },
1613 async {
1614 let expected_info = test_device_info();
1615 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1616 info
1617 } else {
1618 unreachable!()
1619 };
1620
1621 let block_info = proxy.get_info().await.unwrap().unwrap();
1622 assert_eq!(
1623 block_info.block_count,
1624 partition_info.block_range.as_ref().unwrap().end
1625 - partition_info.block_range.as_ref().unwrap().start
1626 );
1627 assert_eq!(
1628 block_info.flags,
1629 fblock::Flag::READONLY | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1630 );
1631
1632 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1633
1634 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1635 assert_eq!(status, zx::sys::ZX_OK);
1636 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1637
1638 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1639 assert_eq!(status, zx::sys::ZX_OK);
1640 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1641
1642 let (status, name) = proxy.get_name().await.unwrap();
1643 assert_eq!(status, zx::sys::ZX_OK);
1644 assert_eq!(name.as_ref(), Some(&partition_info.name));
1645
1646 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1647 assert_eq!(metadata.name, name);
1648 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1649 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1650 assert_eq!(
1651 metadata.start_block_offset,
1652 Some(partition_info.block_range.as_ref().unwrap().start)
1653 );
1654 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1655 assert_eq!(metadata.flags, Some(partition_info.flags));
1656
1657 std::mem::drop(proxy);
1658 }
1659 );
1660 }
1661
1662 #[fuchsia::test]
1663 async fn test_attach_vmo() {
1664 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1665
1666 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1667 let koid = vmo.get_koid().unwrap();
1668
1669 futures::join!(
1670 async {
1671 let block_server = BlockServer::new(
1672 BLOCK_SIZE,
1673 Arc::new(MockInterface {
1674 read_hook: Some(Box::new(move |_, _, vmo, _| {
1675 assert_eq!(vmo.get_koid().unwrap(), koid);
1676 Box::pin(async { Ok(()) })
1677 })),
1678 ..MockInterface::default()
1679 }),
1680 );
1681 block_server.handle_requests(stream).await.unwrap();
1682 },
1683 async move {
1684 let (session_proxy, server) = fidl::endpoints::create_proxy();
1685
1686 proxy.open_session(server).unwrap();
1687
1688 let vmo_id = session_proxy
1689 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1690 .await
1691 .unwrap()
1692 .unwrap();
1693 assert_ne!(vmo_id.id, 0);
1694
1695 let mut fifo =
1696 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1697 let (mut reader, mut writer) = fifo.async_io();
1698
1699 let mut count = 1;
1701 loop {
1702 match session_proxy
1703 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1704 .await
1705 .unwrap()
1706 {
1707 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1708 Err(e) => {
1709 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1710 break;
1711 }
1712 }
1713
1714 if count % 10 == 0 {
1716 writer
1717 .write_entries(&BlockFifoRequest {
1718 command: BlockFifoCommand {
1719 opcode: BlockOpcode::Read.into_primitive(),
1720 ..Default::default()
1721 },
1722 vmoid: vmo_id.id,
1723 length: 1,
1724 ..Default::default()
1725 })
1726 .await
1727 .unwrap();
1728
1729 let mut response = BlockFifoResponse::default();
1730 reader.read_entries(&mut response).await.unwrap();
1731 assert_eq!(response.status, zx::sys::ZX_OK);
1732 }
1733
1734 count += 1;
1735 }
1736
1737 assert_eq!(count, u16::MAX as u64);
1738
1739 writer
1741 .write_entries(&BlockFifoRequest {
1742 command: BlockFifoCommand {
1743 opcode: BlockOpcode::CloseVmo.into_primitive(),
1744 ..Default::default()
1745 },
1746 vmoid: vmo_id.id,
1747 ..Default::default()
1748 })
1749 .await
1750 .unwrap();
1751
1752 let mut response = BlockFifoResponse::default();
1753 reader.read_entries(&mut response).await.unwrap();
1754 assert_eq!(response.status, zx::sys::ZX_OK);
1755
1756 let new_vmo_id = session_proxy
1757 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1758 .await
1759 .unwrap()
1760 .unwrap();
1761 assert_eq!(new_vmo_id.id, vmo_id.id);
1763
1764 std::mem::drop(proxy);
1765 }
1766 );
1767 }
1768
1769 #[fuchsia::test]
1770 async fn test_close() {
1771 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1772
1773 let mut server = std::pin::pin!(
1774 async {
1775 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1776 block_server.handle_requests(stream).await.unwrap();
1777 }
1778 .fuse()
1779 );
1780
1781 let mut client = std::pin::pin!(
1782 async {
1783 let (session_proxy, server) = fidl::endpoints::create_proxy();
1784
1785 proxy.open_session(server).unwrap();
1786
1787 std::mem::drop(proxy);
1790
1791 session_proxy.close().await.unwrap().unwrap();
1792
1793 let _: () = std::future::pending().await;
1795 }
1796 .fuse()
1797 );
1798
1799 futures::select!(
1800 _ = server => {}
1801 _ = client => unreachable!(),
1802 );
1803 }
1804
1805 #[derive(Default)]
1806 struct IoMockInterface {
1807 do_checks: bool,
1808 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1809 return_errors: bool,
1810 }
1811
1812 #[derive(Debug)]
1813 enum ExpectedOp {
1814 Read(u64, u32, u64),
1815 Write(u64, u32, u64),
1816 Trim(u64, u32),
1817 Flush,
1818 }
1819
1820 impl super::async_interface::Interface for IoMockInterface {
1821 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1822 Ok(())
1823 }
1824
1825 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1826 Cow::Owned(test_device_info())
1827 }
1828
1829 async fn read(
1830 &self,
1831 device_block_offset: u64,
1832 block_count: u32,
1833 _vmo: &Arc<zx::Vmo>,
1834 vmo_offset: u64,
1835 _opts: ReadOptions,
1836 _trace_flow_id: TraceFlowId,
1837 ) -> Result<(), zx::Status> {
1838 if self.return_errors {
1839 Err(zx::Status::INTERNAL)
1840 } else {
1841 if self.do_checks {
1842 assert_matches!(
1843 self.expected_op.lock().take(),
1844 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1845 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1846 "Read {device_block_offset} {block_count} {vmo_offset}"
1847 );
1848 }
1849 Ok(())
1850 }
1851 }
1852
1853 async fn write(
1854 &self,
1855 device_block_offset: u64,
1856 block_count: u32,
1857 _vmo: &Arc<zx::Vmo>,
1858 vmo_offset: u64,
1859 _write_opts: WriteOptions,
1860 _trace_flow_id: TraceFlowId,
1861 ) -> Result<(), zx::Status> {
1862 if self.return_errors {
1863 Err(zx::Status::NOT_SUPPORTED)
1864 } else {
1865 if self.do_checks {
1866 assert_matches!(
1867 self.expected_op.lock().take(),
1868 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1869 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1870 "Write {device_block_offset} {block_count} {vmo_offset}"
1871 );
1872 }
1873 Ok(())
1874 }
1875 }
1876
1877 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1878 if self.return_errors {
1879 Err(zx::Status::NO_RESOURCES)
1880 } else {
1881 if self.do_checks {
1882 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1883 }
1884 Ok(())
1885 }
1886 }
1887
1888 fn barrier(&self) -> Result<(), zx::Status> {
1889 unreachable!()
1890 }
1891
1892 async fn trim(
1893 &self,
1894 device_block_offset: u64,
1895 block_count: u32,
1896 _trace_flow_id: TraceFlowId,
1897 ) -> Result<(), zx::Status> {
1898 if self.return_errors {
1899 Err(zx::Status::NO_MEMORY)
1900 } else {
1901 if self.do_checks {
1902 assert_matches!(
1903 self.expected_op.lock().take(),
1904 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1905 block_count == b,
1906 "Trim {device_block_offset} {block_count}"
1907 );
1908 }
1909 Ok(())
1910 }
1911 }
1912 }
1913
1914 #[fuchsia::test]
1915 async fn test_io() {
1916 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1917
1918 let expected_op = Arc::new(Mutex::new(None));
1919 let expected_op_clone = expected_op.clone();
1920
1921 let server = async {
1922 let block_server = BlockServer::new(
1923 BLOCK_SIZE,
1924 Arc::new(IoMockInterface {
1925 return_errors: false,
1926 do_checks: true,
1927 expected_op: expected_op_clone,
1928 }),
1929 );
1930 block_server.handle_requests(stream).await.unwrap();
1931 };
1932
1933 let client = async move {
1934 let (session_proxy, server) = fidl::endpoints::create_proxy();
1935
1936 proxy.open_session(server).unwrap();
1937
1938 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1939 let vmo_id = session_proxy
1940 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1941 .await
1942 .unwrap()
1943 .unwrap();
1944
1945 let mut fifo =
1946 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1947 let (mut reader, mut writer) = fifo.async_io();
1948
1949 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1951 writer
1952 .write_entries(&BlockFifoRequest {
1953 command: BlockFifoCommand {
1954 opcode: BlockOpcode::Read.into_primitive(),
1955 ..Default::default()
1956 },
1957 vmoid: vmo_id.id,
1958 dev_offset: 1,
1959 length: 2,
1960 vmo_offset: 3,
1961 ..Default::default()
1962 })
1963 .await
1964 .unwrap();
1965
1966 let mut response = BlockFifoResponse::default();
1967 reader.read_entries(&mut response).await.unwrap();
1968 assert_eq!(response.status, zx::sys::ZX_OK);
1969
1970 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1972 writer
1973 .write_entries(&BlockFifoRequest {
1974 command: BlockFifoCommand {
1975 opcode: BlockOpcode::Write.into_primitive(),
1976 ..Default::default()
1977 },
1978 vmoid: vmo_id.id,
1979 dev_offset: 4,
1980 length: 5,
1981 vmo_offset: 6,
1982 ..Default::default()
1983 })
1984 .await
1985 .unwrap();
1986
1987 let mut response = BlockFifoResponse::default();
1988 reader.read_entries(&mut response).await.unwrap();
1989 assert_eq!(response.status, zx::sys::ZX_OK);
1990
1991 *expected_op.lock() = Some(ExpectedOp::Flush);
1993 writer
1994 .write_entries(&BlockFifoRequest {
1995 command: BlockFifoCommand {
1996 opcode: BlockOpcode::Flush.into_primitive(),
1997 ..Default::default()
1998 },
1999 ..Default::default()
2000 })
2001 .await
2002 .unwrap();
2003
2004 reader.read_entries(&mut response).await.unwrap();
2005 assert_eq!(response.status, zx::sys::ZX_OK);
2006
2007 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
2009 writer
2010 .write_entries(&BlockFifoRequest {
2011 command: BlockFifoCommand {
2012 opcode: BlockOpcode::Trim.into_primitive(),
2013 ..Default::default()
2014 },
2015 dev_offset: 7,
2016 length: 8,
2017 ..Default::default()
2018 })
2019 .await
2020 .unwrap();
2021
2022 reader.read_entries(&mut response).await.unwrap();
2023 assert_eq!(response.status, zx::sys::ZX_OK);
2024
2025 std::mem::drop(proxy);
2026 };
2027
2028 futures::join!(server, client);
2029 }
2030
2031 #[fuchsia::test]
2032 async fn test_io_errors() {
2033 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2034
2035 futures::join!(
2036 async {
2037 let block_server = BlockServer::new(
2038 BLOCK_SIZE,
2039 Arc::new(IoMockInterface {
2040 return_errors: true,
2041 do_checks: false,
2042 expected_op: Arc::new(Mutex::new(None)),
2043 }),
2044 );
2045 block_server.handle_requests(stream).await.unwrap();
2046 },
2047 async move {
2048 let (session_proxy, server) = fidl::endpoints::create_proxy();
2049
2050 proxy.open_session(server).unwrap();
2051
2052 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2053 let vmo_id = session_proxy
2054 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2055 .await
2056 .unwrap()
2057 .unwrap();
2058
2059 let mut fifo =
2060 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2061 let (mut reader, mut writer) = fifo.async_io();
2062
2063 writer
2065 .write_entries(&BlockFifoRequest {
2066 command: BlockFifoCommand {
2067 opcode: BlockOpcode::Read.into_primitive(),
2068 ..Default::default()
2069 },
2070 vmoid: vmo_id.id,
2071 length: 1,
2072 reqid: 1,
2073 ..Default::default()
2074 })
2075 .await
2076 .unwrap();
2077
2078 let mut response = BlockFifoResponse::default();
2079 reader.read_entries(&mut response).await.unwrap();
2080 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2081
2082 writer
2084 .write_entries(&BlockFifoRequest {
2085 command: BlockFifoCommand {
2086 opcode: BlockOpcode::Write.into_primitive(),
2087 ..Default::default()
2088 },
2089 vmoid: vmo_id.id,
2090 length: 1,
2091 reqid: 2,
2092 ..Default::default()
2093 })
2094 .await
2095 .unwrap();
2096
2097 reader.read_entries(&mut response).await.unwrap();
2098 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2099
2100 writer
2102 .write_entries(&BlockFifoRequest {
2103 command: BlockFifoCommand {
2104 opcode: BlockOpcode::Flush.into_primitive(),
2105 ..Default::default()
2106 },
2107 reqid: 3,
2108 ..Default::default()
2109 })
2110 .await
2111 .unwrap();
2112
2113 reader.read_entries(&mut response).await.unwrap();
2114 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2115
2116 writer
2118 .write_entries(&BlockFifoRequest {
2119 command: BlockFifoCommand {
2120 opcode: BlockOpcode::Trim.into_primitive(),
2121 ..Default::default()
2122 },
2123 reqid: 4,
2124 length: 1,
2125 ..Default::default()
2126 })
2127 .await
2128 .unwrap();
2129
2130 reader.read_entries(&mut response).await.unwrap();
2131 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2132
2133 std::mem::drop(proxy);
2134 }
2135 );
2136 }
2137
2138 #[fuchsia::test]
2139 async fn test_invalid_args() {
2140 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2141
2142 futures::join!(
2143 async {
2144 let block_server = BlockServer::new(
2145 BLOCK_SIZE,
2146 Arc::new(IoMockInterface {
2147 return_errors: false,
2148 do_checks: false,
2149 expected_op: Arc::new(Mutex::new(None)),
2150 }),
2151 );
2152 block_server.handle_requests(stream).await.unwrap();
2153 },
2154 async move {
2155 let (session_proxy, server) = fidl::endpoints::create_proxy();
2156
2157 proxy.open_session(server).unwrap();
2158
2159 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2160 let vmo_id = session_proxy
2161 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2162 .await
2163 .unwrap()
2164 .unwrap();
2165
2166 let mut fifo =
2167 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2168
2169 async fn test(
2170 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2171 request: BlockFifoRequest,
2172 ) -> Result<(), zx::Status> {
2173 let (mut reader, mut writer) = fifo.async_io();
2174 writer.write_entries(&request).await.unwrap();
2175 let mut response = BlockFifoResponse::default();
2176 reader.read_entries(&mut response).await.unwrap();
2177 zx::Status::ok(response.status)
2178 }
2179
2180 let good_read_request = || BlockFifoRequest {
2183 command: BlockFifoCommand {
2184 opcode: BlockOpcode::Read.into_primitive(),
2185 ..Default::default()
2186 },
2187 length: 1,
2188 vmoid: vmo_id.id,
2189 ..Default::default()
2190 };
2191
2192 assert_eq!(
2193 test(
2194 &mut fifo,
2195 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2196 )
2197 .await,
2198 Err(zx::Status::IO)
2199 );
2200
2201 assert_eq!(
2202 test(
2203 &mut fifo,
2204 BlockFifoRequest {
2205 vmo_offset: 0xffff_ffff_ffff_ffff,
2206 ..good_read_request()
2207 }
2208 )
2209 .await,
2210 Err(zx::Status::OUT_OF_RANGE)
2211 );
2212
2213 assert_eq!(
2214 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2215 Err(zx::Status::INVALID_ARGS)
2216 );
2217
2218 let good_write_request = || BlockFifoRequest {
2221 command: BlockFifoCommand {
2222 opcode: BlockOpcode::Write.into_primitive(),
2223 ..Default::default()
2224 },
2225 length: 1,
2226 vmoid: vmo_id.id,
2227 ..Default::default()
2228 };
2229
2230 assert_eq!(
2231 test(
2232 &mut fifo,
2233 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2234 )
2235 .await,
2236 Err(zx::Status::IO)
2237 );
2238
2239 assert_eq!(
2240 test(
2241 &mut fifo,
2242 BlockFifoRequest {
2243 vmo_offset: 0xffff_ffff_ffff_ffff,
2244 ..good_write_request()
2245 }
2246 )
2247 .await,
2248 Err(zx::Status::OUT_OF_RANGE)
2249 );
2250
2251 assert_eq!(
2252 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2253 Err(zx::Status::INVALID_ARGS)
2254 );
2255
2256 assert_eq!(
2259 test(
2260 &mut fifo,
2261 BlockFifoRequest {
2262 command: BlockFifoCommand {
2263 opcode: BlockOpcode::CloseVmo.into_primitive(),
2264 ..Default::default()
2265 },
2266 vmoid: vmo_id.id + 1,
2267 ..Default::default()
2268 }
2269 )
2270 .await,
2271 Err(zx::Status::IO)
2272 );
2273
2274 std::mem::drop(proxy);
2275 }
2276 );
2277 }
2278
2279 #[fuchsia::test]
2280 async fn test_concurrent_requests() {
2281 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2282
2283 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2284 let waiting_readers_clone = waiting_readers.clone();
2285
2286 futures::join!(
2287 async move {
2288 let block_server = BlockServer::new(
2289 BLOCK_SIZE,
2290 Arc::new(MockInterface {
2291 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2292 let (tx, rx) = oneshot::channel();
2293 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2294 Box::pin(async move {
2295 let _ = rx.await;
2296 Ok(())
2297 })
2298 })),
2299 ..MockInterface::default()
2300 }),
2301 );
2302 block_server.handle_requests(stream).await.unwrap();
2303 },
2304 async move {
2305 let (session_proxy, server) = fidl::endpoints::create_proxy();
2306
2307 proxy.open_session(server).unwrap();
2308
2309 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2310 let vmo_id = session_proxy
2311 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2312 .await
2313 .unwrap()
2314 .unwrap();
2315
2316 let mut fifo =
2317 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2318 let (mut reader, mut writer) = fifo.async_io();
2319
2320 writer
2321 .write_entries(&BlockFifoRequest {
2322 command: BlockFifoCommand {
2323 opcode: BlockOpcode::Read.into_primitive(),
2324 ..Default::default()
2325 },
2326 reqid: 1,
2327 dev_offset: 1, vmoid: vmo_id.id,
2329 length: 1,
2330 ..Default::default()
2331 })
2332 .await
2333 .unwrap();
2334
2335 writer
2336 .write_entries(&BlockFifoRequest {
2337 command: BlockFifoCommand {
2338 opcode: BlockOpcode::Read.into_primitive(),
2339 ..Default::default()
2340 },
2341 reqid: 2,
2342 dev_offset: 2,
2343 vmoid: vmo_id.id,
2344 length: 1,
2345 ..Default::default()
2346 })
2347 .await
2348 .unwrap();
2349
2350 poll_fn(|cx: &mut Context<'_>| {
2352 if waiting_readers.lock().len() == 2 {
2353 Poll::Ready(())
2354 } else {
2355 cx.waker().wake_by_ref();
2357 Poll::Pending
2358 }
2359 })
2360 .await;
2361
2362 let mut response = BlockFifoResponse::default();
2363 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2364
2365 let (id, tx) = waiting_readers.lock().pop().unwrap();
2366 tx.send(()).unwrap();
2367
2368 reader.read_entries(&mut response).await.unwrap();
2369 assert_eq!(response.status, zx::sys::ZX_OK);
2370 assert_eq!(response.reqid, id);
2371
2372 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2373
2374 let (id, tx) = waiting_readers.lock().pop().unwrap();
2375 tx.send(()).unwrap();
2376
2377 reader.read_entries(&mut response).await.unwrap();
2378 assert_eq!(response.status, zx::sys::ZX_OK);
2379 assert_eq!(response.reqid, id);
2380 }
2381 );
2382 }
2383
2384 #[fuchsia::test]
2385 async fn test_groups() {
2386 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2387
2388 futures::join!(
2389 async move {
2390 let block_server = BlockServer::new(
2391 BLOCK_SIZE,
2392 Arc::new(MockInterface {
2393 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2394 ..MockInterface::default()
2395 }),
2396 );
2397 block_server.handle_requests(stream).await.unwrap();
2398 },
2399 async move {
2400 let (session_proxy, server) = fidl::endpoints::create_proxy();
2401
2402 proxy.open_session(server).unwrap();
2403
2404 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2405 let vmo_id = session_proxy
2406 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2407 .await
2408 .unwrap()
2409 .unwrap();
2410
2411 let mut fifo =
2412 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2413 let (mut reader, mut writer) = fifo.async_io();
2414
2415 writer
2416 .write_entries(&BlockFifoRequest {
2417 command: BlockFifoCommand {
2418 opcode: BlockOpcode::Read.into_primitive(),
2419 flags: BlockIoFlag::GROUP_ITEM.bits(),
2420 ..Default::default()
2421 },
2422 group: 1,
2423 vmoid: vmo_id.id,
2424 length: 1,
2425 ..Default::default()
2426 })
2427 .await
2428 .unwrap();
2429
2430 writer
2431 .write_entries(&BlockFifoRequest {
2432 command: BlockFifoCommand {
2433 opcode: BlockOpcode::Read.into_primitive(),
2434 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2435 ..Default::default()
2436 },
2437 reqid: 2,
2438 group: 1,
2439 vmoid: vmo_id.id,
2440 length: 1,
2441 ..Default::default()
2442 })
2443 .await
2444 .unwrap();
2445
2446 let mut response = BlockFifoResponse::default();
2447 reader.read_entries(&mut response).await.unwrap();
2448 assert_eq!(response.status, zx::sys::ZX_OK);
2449 assert_eq!(response.reqid, 2);
2450 assert_eq!(response.group, 1);
2451 }
2452 );
2453 }
2454
2455 #[fuchsia::test]
2456 async fn test_group_error() {
2457 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2458
2459 let counter = Arc::new(AtomicU64::new(0));
2460 let counter_clone = counter.clone();
2461
2462 futures::join!(
2463 async move {
2464 let block_server = BlockServer::new(
2465 BLOCK_SIZE,
2466 Arc::new(MockInterface {
2467 read_hook: Some(Box::new(move |_, _, _, _| {
2468 counter_clone.fetch_add(1, Ordering::Relaxed);
2469 Box::pin(async { Err(zx::Status::BAD_STATE) })
2470 })),
2471 ..MockInterface::default()
2472 }),
2473 );
2474 block_server.handle_requests(stream).await.unwrap();
2475 },
2476 async move {
2477 let (session_proxy, server) = fidl::endpoints::create_proxy();
2478
2479 proxy.open_session(server).unwrap();
2480
2481 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2482 let vmo_id = session_proxy
2483 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2484 .await
2485 .unwrap()
2486 .unwrap();
2487
2488 let mut fifo =
2489 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2490 let (mut reader, mut writer) = fifo.async_io();
2491
2492 writer
2493 .write_entries(&BlockFifoRequest {
2494 command: BlockFifoCommand {
2495 opcode: BlockOpcode::Read.into_primitive(),
2496 flags: BlockIoFlag::GROUP_ITEM.bits(),
2497 ..Default::default()
2498 },
2499 group: 1,
2500 vmoid: vmo_id.id,
2501 length: 1,
2502 ..Default::default()
2503 })
2504 .await
2505 .unwrap();
2506
2507 poll_fn(|cx: &mut Context<'_>| {
2509 if counter.load(Ordering::Relaxed) == 1 {
2510 Poll::Ready(())
2511 } else {
2512 cx.waker().wake_by_ref();
2514 Poll::Pending
2515 }
2516 })
2517 .await;
2518
2519 let mut response = BlockFifoResponse::default();
2520 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2521
2522 writer
2523 .write_entries(&BlockFifoRequest {
2524 command: BlockFifoCommand {
2525 opcode: BlockOpcode::Read.into_primitive(),
2526 flags: BlockIoFlag::GROUP_ITEM.bits(),
2527 ..Default::default()
2528 },
2529 group: 1,
2530 vmoid: vmo_id.id,
2531 length: 1,
2532 ..Default::default()
2533 })
2534 .await
2535 .unwrap();
2536
2537 writer
2538 .write_entries(&BlockFifoRequest {
2539 command: BlockFifoCommand {
2540 opcode: BlockOpcode::Read.into_primitive(),
2541 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2542 ..Default::default()
2543 },
2544 reqid: 2,
2545 group: 1,
2546 vmoid: vmo_id.id,
2547 length: 1,
2548 ..Default::default()
2549 })
2550 .await
2551 .unwrap();
2552
2553 reader.read_entries(&mut response).await.unwrap();
2554 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2555 assert_eq!(response.reqid, 2);
2556 assert_eq!(response.group, 1);
2557
2558 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2559
2560 assert_eq!(counter.load(Ordering::Relaxed), 1);
2562 }
2563 );
2564 }
2565
2566 #[fuchsia::test]
2567 async fn test_group_with_two_lasts() {
2568 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2569
2570 let (tx, rx) = oneshot::channel();
2571
2572 futures::join!(
2573 async move {
2574 let rx = Mutex::new(Some(rx));
2575 let block_server = BlockServer::new(
2576 BLOCK_SIZE,
2577 Arc::new(MockInterface {
2578 read_hook: Some(Box::new(move |_, _, _, _| {
2579 let rx = rx.lock().take().unwrap();
2580 Box::pin(async {
2581 let _ = rx.await;
2582 Ok(())
2583 })
2584 })),
2585 ..MockInterface::default()
2586 }),
2587 );
2588 block_server.handle_requests(stream).await.unwrap();
2589 },
2590 async move {
2591 let (session_proxy, server) = fidl::endpoints::create_proxy();
2592
2593 proxy.open_session(server).unwrap();
2594
2595 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2596 let vmo_id = session_proxy
2597 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2598 .await
2599 .unwrap()
2600 .unwrap();
2601
2602 let mut fifo =
2603 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2604 let (mut reader, mut writer) = fifo.async_io();
2605
2606 writer
2607 .write_entries(&BlockFifoRequest {
2608 command: BlockFifoCommand {
2609 opcode: BlockOpcode::Read.into_primitive(),
2610 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2611 ..Default::default()
2612 },
2613 reqid: 1,
2614 group: 1,
2615 vmoid: vmo_id.id,
2616 length: 1,
2617 ..Default::default()
2618 })
2619 .await
2620 .unwrap();
2621
2622 writer
2623 .write_entries(&BlockFifoRequest {
2624 command: BlockFifoCommand {
2625 opcode: BlockOpcode::Read.into_primitive(),
2626 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2627 ..Default::default()
2628 },
2629 reqid: 2,
2630 group: 1,
2631 vmoid: vmo_id.id,
2632 length: 1,
2633 ..Default::default()
2634 })
2635 .await
2636 .unwrap();
2637
2638 writer
2640 .write_entries(&BlockFifoRequest {
2641 command: BlockFifoCommand {
2642 opcode: BlockOpcode::CloseVmo.into_primitive(),
2643 ..Default::default()
2644 },
2645 reqid: 3,
2646 vmoid: vmo_id.id,
2647 ..Default::default()
2648 })
2649 .await
2650 .unwrap();
2651
2652 let mut response = BlockFifoResponse::default();
2654 reader.read_entries(&mut response).await.unwrap();
2655 assert_eq!(response.status, zx::sys::ZX_OK);
2656 assert_eq!(response.reqid, 3);
2657
2658 tx.send(()).unwrap();
2660
2661 let mut response = BlockFifoResponse::default();
2664 reader.read_entries(&mut response).await.unwrap();
2665 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2666 assert_eq!(response.reqid, 1);
2667 assert_eq!(response.group, 1);
2668 }
2669 );
2670 }
2671
2672 #[fuchsia::test(allow_stalls = false)]
2673 async fn test_requests_dont_block_sessions() {
2674 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2675
2676 let (tx, rx) = oneshot::channel();
2677
2678 fasync::Task::local(async move {
2679 let rx = Mutex::new(Some(rx));
2680 let block_server = BlockServer::new(
2681 BLOCK_SIZE,
2682 Arc::new(MockInterface {
2683 read_hook: Some(Box::new(move |_, _, _, _| {
2684 let rx = rx.lock().take().unwrap();
2685 Box::pin(async {
2686 let _ = rx.await;
2687 Ok(())
2688 })
2689 })),
2690 ..MockInterface::default()
2691 }),
2692 );
2693 block_server.handle_requests(stream).await.unwrap();
2694 })
2695 .detach();
2696
2697 let mut fut = pin!(async {
2698 let (session_proxy, server) = fidl::endpoints::create_proxy();
2699
2700 proxy.open_session(server).unwrap();
2701
2702 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2703 let vmo_id = session_proxy
2704 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2705 .await
2706 .unwrap()
2707 .unwrap();
2708
2709 let mut fifo =
2710 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2711 let (mut reader, mut writer) = fifo.async_io();
2712
2713 writer
2714 .write_entries(&BlockFifoRequest {
2715 command: BlockFifoCommand {
2716 opcode: BlockOpcode::Read.into_primitive(),
2717 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2718 ..Default::default()
2719 },
2720 reqid: 1,
2721 group: 1,
2722 vmoid: vmo_id.id,
2723 length: 1,
2724 ..Default::default()
2725 })
2726 .await
2727 .unwrap();
2728
2729 let mut response = BlockFifoResponse::default();
2730 reader.read_entries(&mut response).await.unwrap();
2731 assert_eq!(response.status, zx::sys::ZX_OK);
2732 });
2733
2734 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2736
2737 let mut fut2 = pin!(proxy.get_volume_info());
2738
2739 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2741
2742 let _ = tx.send(());
2745
2746 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2747 }
2748
2749 #[fuchsia::test]
2750 async fn test_request_flow_control() {
2751 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2752
2753 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2756 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2757 let event_clone = event.clone();
2758 futures::join!(
2759 async move {
2760 let block_server = BlockServer::new(
2761 BLOCK_SIZE,
2762 Arc::new(MockInterface {
2763 read_hook: Some(Box::new(move |_, _, _, _| {
2764 let event_clone = event_clone.clone();
2765 Box::pin(async move {
2766 if !event_clone.1.load(Ordering::SeqCst) {
2767 event_clone.0.listen().await;
2768 }
2769 Ok(())
2770 })
2771 })),
2772 ..MockInterface::default()
2773 }),
2774 );
2775 block_server.handle_requests(stream).await.unwrap();
2776 },
2777 async move {
2778 let (session_proxy, server) = fidl::endpoints::create_proxy();
2779
2780 proxy.open_session(server).unwrap();
2781
2782 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2783 let vmo_id = session_proxy
2784 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2785 .await
2786 .unwrap()
2787 .unwrap();
2788
2789 let mut fifo =
2790 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2791 let (mut reader, mut writer) = fifo.async_io();
2792
2793 for i in 0..MAX_REQUESTS {
2794 writer
2795 .write_entries(&BlockFifoRequest {
2796 command: BlockFifoCommand {
2797 opcode: BlockOpcode::Read.into_primitive(),
2798 ..Default::default()
2799 },
2800 reqid: (i + 1) as u32,
2801 dev_offset: i,
2802 vmoid: vmo_id.id,
2803 length: 1,
2804 ..Default::default()
2805 })
2806 .await
2807 .unwrap();
2808 }
2809 assert!(
2810 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2811 command: BlockFifoCommand {
2812 opcode: BlockOpcode::Read.into_primitive(),
2813 ..Default::default()
2814 },
2815 reqid: u32::MAX,
2816 dev_offset: MAX_REQUESTS,
2817 vmoid: vmo_id.id,
2818 length: 1,
2819 ..Default::default()
2820 })))
2821 .is_pending()
2822 );
2823 event.1.store(true, Ordering::SeqCst);
2825 event.0.notify(usize::MAX);
2826 let mut finished_reqids = vec![];
2828 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2829 let mut response = BlockFifoResponse::default();
2830 reader.read_entries(&mut response).await.unwrap();
2831 assert_eq!(response.status, zx::sys::ZX_OK);
2832 finished_reqids.push(response.reqid);
2833 writer
2834 .write_entries(&BlockFifoRequest {
2835 command: BlockFifoCommand {
2836 opcode: BlockOpcode::Read.into_primitive(),
2837 ..Default::default()
2838 },
2839 reqid: (i + 1) as u32,
2840 dev_offset: i,
2841 vmoid: vmo_id.id,
2842 length: 1,
2843 ..Default::default()
2844 })
2845 .await
2846 .unwrap();
2847 }
2848 let mut response = BlockFifoResponse::default();
2849 for _ in 0..MAX_REQUESTS {
2850 reader.read_entries(&mut response).await.unwrap();
2851 assert_eq!(response.status, zx::sys::ZX_OK);
2852 finished_reqids.push(response.reqid);
2853 }
2854 finished_reqids.sort();
2857 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2858 let mut i = 1;
2859 for reqid in finished_reqids {
2860 assert_eq!(reqid, i);
2861 i += 1;
2862 }
2863 }
2864 );
2865 }
2866
2867 #[fuchsia::test]
2868 async fn test_passthrough_io_with_fixed_map() {
2869 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2870
2871 let expected_op = Arc::new(Mutex::new(None));
2872 let expected_op_clone = expected_op.clone();
2873 futures::join!(
2874 async {
2875 let block_server = BlockServer::new(
2876 BLOCK_SIZE,
2877 Arc::new(IoMockInterface {
2878 return_errors: false,
2879 do_checks: true,
2880 expected_op: expected_op_clone,
2881 }),
2882 );
2883 block_server.handle_requests(stream).await.unwrap();
2884 },
2885 async move {
2886 let (session_proxy, server) = fidl::endpoints::create_proxy();
2887
2888 let mapping = fblock::BlockOffsetMapping {
2889 source_block_offset: 0,
2890 target_block_offset: 10,
2891 length: 20,
2892 };
2893 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2894
2895 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2896 let vmo_id = session_proxy
2897 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2898 .await
2899 .unwrap()
2900 .unwrap();
2901
2902 let mut fifo =
2903 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2904 let (mut reader, mut writer) = fifo.async_io();
2905
2906 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2908 writer
2909 .write_entries(&BlockFifoRequest {
2910 command: BlockFifoCommand {
2911 opcode: BlockOpcode::Read.into_primitive(),
2912 ..Default::default()
2913 },
2914 vmoid: vmo_id.id,
2915 dev_offset: 1,
2916 length: 2,
2917 vmo_offset: 3,
2918 ..Default::default()
2919 })
2920 .await
2921 .unwrap();
2922
2923 let mut response = BlockFifoResponse::default();
2924 reader.read_entries(&mut response).await.unwrap();
2925 assert_eq!(response.status, zx::sys::ZX_OK);
2926
2927 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2929 writer
2930 .write_entries(&BlockFifoRequest {
2931 command: BlockFifoCommand {
2932 opcode: BlockOpcode::Write.into_primitive(),
2933 ..Default::default()
2934 },
2935 vmoid: vmo_id.id,
2936 dev_offset: 4,
2937 length: 5,
2938 vmo_offset: 6,
2939 ..Default::default()
2940 })
2941 .await
2942 .unwrap();
2943
2944 reader.read_entries(&mut response).await.unwrap();
2945 assert_eq!(response.status, zx::sys::ZX_OK);
2946
2947 *expected_op.lock() = Some(ExpectedOp::Flush);
2949 writer
2950 .write_entries(&BlockFifoRequest {
2951 command: BlockFifoCommand {
2952 opcode: BlockOpcode::Flush.into_primitive(),
2953 ..Default::default()
2954 },
2955 ..Default::default()
2956 })
2957 .await
2958 .unwrap();
2959
2960 reader.read_entries(&mut response).await.unwrap();
2961 assert_eq!(response.status, zx::sys::ZX_OK);
2962
2963 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2965 writer
2966 .write_entries(&BlockFifoRequest {
2967 command: BlockFifoCommand {
2968 opcode: BlockOpcode::Trim.into_primitive(),
2969 ..Default::default()
2970 },
2971 dev_offset: 7,
2972 length: 3,
2973 ..Default::default()
2974 })
2975 .await
2976 .unwrap();
2977
2978 reader.read_entries(&mut response).await.unwrap();
2979 assert_eq!(response.status, zx::sys::ZX_OK);
2980
2981 *expected_op.lock() = None;
2983 writer
2984 .write_entries(&BlockFifoRequest {
2985 command: BlockFifoCommand {
2986 opcode: BlockOpcode::Read.into_primitive(),
2987 ..Default::default()
2988 },
2989 vmoid: vmo_id.id,
2990 dev_offset: 19,
2991 length: 2,
2992 vmo_offset: 3,
2993 ..Default::default()
2994 })
2995 .await
2996 .unwrap();
2997
2998 reader.read_entries(&mut response).await.unwrap();
2999 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3000
3001 std::mem::drop(proxy);
3002 }
3003 );
3004 }
3005
3006 #[fuchsia::test]
3007 fn operation_map() {
3008 fn expect_map_result(
3009 mut operation: Operation,
3010 mapping: Option<BlockOffsetMapping>,
3011 max_blocks: Option<NonZero<u32>>,
3012 expected_operations: Vec<Operation>,
3013 ) {
3014 let mut ops = vec![];
3015 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
3016 ops.push(operation);
3017 operation = remainder;
3018 }
3019 ops.push(operation);
3020 assert_eq!(ops, expected_operations);
3021 }
3022
3023 expect_map_result(
3025 Operation::Read {
3026 device_block_offset: 10,
3027 block_count: 200,
3028 _unused: 0,
3029 vmo_offset: 0,
3030 options: ReadOptions::default(),
3031 },
3032 None,
3033 None,
3034 vec![Operation::Read {
3035 device_block_offset: 10,
3036 block_count: 200,
3037 _unused: 0,
3038 vmo_offset: 0,
3039 options: ReadOptions::default(),
3040 }],
3041 );
3042
3043 expect_map_result(
3045 Operation::Read {
3046 device_block_offset: 10,
3047 block_count: 200,
3048 _unused: 0,
3049 vmo_offset: 0,
3050 options: ReadOptions::default(),
3051 },
3052 None,
3053 NonZero::new(120),
3054 vec![
3055 Operation::Read {
3056 device_block_offset: 10,
3057 block_count: 120,
3058 _unused: 0,
3059 vmo_offset: 0,
3060 options: ReadOptions::default(),
3061 },
3062 Operation::Read {
3063 device_block_offset: 130,
3064 block_count: 80,
3065 _unused: 0,
3066 vmo_offset: 120 * 512,
3067 options: ReadOptions::default(),
3068 },
3069 ],
3070 );
3071 expect_map_result(
3072 Operation::Write {
3073 device_block_offset: 10,
3074 block_count: 200,
3075 _unused: 0,
3076 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3077 vmo_offset: 0,
3078 },
3079 None,
3080 NonZero::new(120),
3081 vec![
3082 Operation::Write {
3083 device_block_offset: 10,
3084 block_count: 120,
3085 _unused: 0,
3086 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3087 vmo_offset: 0,
3088 },
3089 Operation::Write {
3090 device_block_offset: 130,
3091 block_count: 80,
3092 _unused: 0,
3093 options: WriteOptions::default(),
3094 vmo_offset: 120 * 512,
3095 },
3096 ],
3097 );
3098 expect_map_result(
3099 Operation::Trim { device_block_offset: 10, block_count: 200 },
3100 None,
3101 NonZero::new(120),
3102 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3103 );
3104
3105 expect_map_result(
3107 Operation::Read {
3108 device_block_offset: 10,
3109 block_count: 200,
3110 _unused: 0,
3111 vmo_offset: 0,
3112 options: ReadOptions::default(),
3113 },
3114 Some(BlockOffsetMapping {
3115 source_block_offset: 10,
3116 target_block_offset: 100,
3117 length: 200,
3118 }),
3119 NonZero::new(120),
3120 vec![
3121 Operation::Read {
3122 device_block_offset: 100,
3123 block_count: 120,
3124 _unused: 0,
3125 vmo_offset: 0,
3126 options: ReadOptions::default(),
3127 },
3128 Operation::Read {
3129 device_block_offset: 220,
3130 block_count: 80,
3131 _unused: 0,
3132 vmo_offset: 120 * 512,
3133 options: ReadOptions::default(),
3134 },
3135 ],
3136 );
3137 expect_map_result(
3138 Operation::Write {
3139 device_block_offset: 10,
3140 block_count: 200,
3141 _unused: 0,
3142 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3143 vmo_offset: 0,
3144 },
3145 Some(BlockOffsetMapping {
3146 source_block_offset: 10,
3147 target_block_offset: 100,
3148 length: 200,
3149 }),
3150 NonZero::new(120),
3151 vec![
3152 Operation::Write {
3153 device_block_offset: 100,
3154 block_count: 120,
3155 _unused: 0,
3156 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3157 vmo_offset: 0,
3158 },
3159 Operation::Write {
3160 device_block_offset: 220,
3161 block_count: 80,
3162 _unused: 0,
3163 options: WriteOptions::default(),
3164 vmo_offset: 120 * 512,
3165 },
3166 ],
3167 );
3168 expect_map_result(
3169 Operation::Trim { device_block_offset: 10, block_count: 200 },
3170 Some(BlockOffsetMapping {
3171 source_block_offset: 10,
3172 target_block_offset: 100,
3173 length: 200,
3174 }),
3175 NonZero::new(120),
3176 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3177 );
3178 }
3179}