1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37 Block(BlockInfo),
38 Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42 pub fn block_count(&self) -> Option<u64> {
43 match self {
44 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
45 Self::Partition(PartitionInfo { block_range, .. }) => {
46 block_range.as_ref().map(|range| range.end - range.start)
47 }
48 }
49 }
50
51 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
52 match self {
53 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
54 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
55 max_transfer_blocks.clone()
56 }
57 }
58 }
59
60 fn max_transfer_size(&self, block_size: u32) -> u32 {
61 if let Some(max_blocks) = self.max_transfer_blocks() {
62 max_blocks.get() * block_size
63 } else {
64 MAX_TRANSFER_UNBOUNDED
65 }
66 }
67}
68
69#[derive(Clone, Default)]
71pub struct BlockInfo {
72 pub device_flags: fblock::Flag,
73 pub block_count: u64,
74 pub max_transfer_blocks: Option<NonZero<u32>>,
75}
76
77#[derive(Clone, Default)]
79pub struct PartitionInfo {
80 pub device_flags: fblock::Flag,
82 pub max_transfer_blocks: Option<NonZero<u32>>,
83 pub block_range: Option<Range<u64>>,
87 pub type_guid: [u8; 16],
88 pub instance_guid: [u8; 16],
89 pub name: String,
90 pub flags: u64,
91}
92
93struct ActiveRequest<S> {
96 session: S,
97 group_or_request: GroupOrRequest,
98 trace_flow_id: TraceFlowId,
99 _epoch_guard: EpochGuard<'static>,
100 status: zx::Status,
101 count: u32,
102 req_id: Option<u32>,
103 decompression_info: Option<DecompressionInfo>,
104}
105
106struct DecompressionInfo {
107 compressed_range: Range<usize>,
109
110 uncompressed_range: Range<u64>,
112
113 bytes_so_far: u64,
114 mapping: Arc<VmoMapping>,
115 buffer: Option<Buffer<'static>>,
116}
117
118impl DecompressionInfo {
119 fn uncompressed_slice(&self) -> *mut [u8] {
121 std::ptr::slice_from_raw_parts_mut(
122 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
123 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
124 )
125 }
126}
127
128pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
129
130impl<S> Default for ActiveRequests<S> {
131 fn default() -> Self {
132 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
133 }
134}
135
136impl<S> ActiveRequests<S> {
137 fn complete_and_take_response(
138 &self,
139 request_id: RequestId,
140 status: zx::Status,
141 ) -> Option<(S, BlockFifoResponse)> {
142 self.0.lock().complete_and_take_response(request_id, status)
143 }
144
145 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
146 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
147 }
148}
149
150struct ActiveRequestsInner<S> {
151 requests: Slab<ActiveRequest<S>>,
152}
153
154impl<S> ActiveRequestsInner<S> {
156 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
158 let group = &mut self.requests[request_id.0];
159
160 group.count = group.count.checked_sub(1).unwrap();
161 if status != zx::Status::OK && group.status == zx::Status::OK {
162 group.status = status
163 }
164
165 fuchsia_trace::duration!(
166 c"storage",
167 c"block_server::finish_transaction",
168 "request_id" => request_id.0,
169 "group_completed" => group.count == 0,
170 "status" => status.into_raw());
171 if let Some(trace_flow_id) = group.trace_flow_id {
172 fuchsia_trace::flow_step!(
173 c"storage",
174 c"block_server::finish_request",
175 trace_flow_id.get().into()
176 );
177 }
178
179 if group.count == 0
180 && group.status == zx::Status::OK
181 && let Some(info) = &mut group.decompression_info
182 {
183 thread_local! {
184 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
185 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
186 }
187 DECOMPRESSOR.with(|decompressor| {
188 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
190 let mut decompressor = decompressor.borrow_mut();
191 if let Err(error) = decompressor.decompress_to_buffer(
192 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
193 target_slice,
194 ) {
195 log::warn!(error:?; "Decompression error");
196 group.status = zx::Status::IO_DATA_INTEGRITY;
197 };
198 });
199 }
200 }
201
202 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
204 let group = &self.requests[request_id.0];
205 match group.req_id {
206 Some(reqid) if group.count == 0 => {
207 let group = self.requests.remove(request_id.0);
208 Some((
209 group.session,
210 BlockFifoResponse {
211 status: group.status.into_raw(),
212 reqid,
213 group: group.group_or_request.group_id().unwrap_or(0),
214 ..Default::default()
215 },
216 ))
217 }
218 _ => None,
219 }
220 }
221
222 fn complete_and_take_response(
224 &mut self,
225 request_id: RequestId,
226 status: zx::Status,
227 ) -> Option<(S, BlockFifoResponse)> {
228 self.complete(request_id, status);
229 self.take_response(request_id)
230 }
231}
232
233pub struct BlockServer<SM> {
236 block_size: u32,
237 session_manager: Arc<SM>,
238}
239
240#[derive(Debug)]
242pub struct BlockOffsetMapping {
243 source_block_offset: u64,
244 target_block_offset: u64,
245 length: u64,
246}
247
248impl BlockOffsetMapping {
249 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
250 blocks.0 >= self.source_block_offset
251 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
252 }
253}
254
255impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
256 type Error = zx::Status;
257
258 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
259 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
260 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
261 Ok(Self {
262 source_block_offset: wire.source_block_offset,
263 target_block_offset: wire.target_block_offset,
264 length: wire.length,
265 })
266 }
267}
268
269pub struct OffsetMap {
271 mapping: Option<BlockOffsetMapping>,
272 max_transfer_blocks: Option<NonZero<u32>>,
273}
274
275impl OffsetMap {
276 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
278 Self { mapping: Some(mapping), max_transfer_blocks }
279 }
280
281 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
283 Self { mapping: None, max_transfer_blocks }
284 }
285
286 pub fn is_empty(&self) -> bool {
287 self.mapping.is_none()
288 }
289
290 fn mapping(&self) -> Option<&BlockOffsetMapping> {
291 self.mapping.as_ref()
292 }
293
294 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
295 self.max_transfer_blocks
296 }
297}
298
299pub trait SessionManager: 'static {
302 const SUPPORTS_DECOMPRESSION: bool;
303
304 type Session;
305
306 fn on_attach_vmo(
307 self: Arc<Self>,
308 vmo: &Arc<zx::Vmo>,
309 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
310
311 fn open_session(
316 self: Arc<Self>,
317 stream: fblock::SessionRequestStream,
318 offset_map: OffsetMap,
319 block_size: u32,
320 ) -> impl Future<Output = Result<(), Error>> + Send;
321
322 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
324
325 fn get_volume_info(
327 &self,
328 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
329 {
330 async { Err(zx::Status::NOT_SUPPORTED) }
331 }
332
333 fn query_slices(
335 &self,
336 _start_slices: &[u64],
337 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
338 async { Err(zx::Status::NOT_SUPPORTED) }
339 }
340
341 fn extend(
343 &self,
344 _start_slice: u64,
345 _slice_count: u64,
346 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
347 async { Err(zx::Status::NOT_SUPPORTED) }
348 }
349
350 fn shrink(
352 &self,
353 _start_slice: u64,
354 _slice_count: u64,
355 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
356 async { Err(zx::Status::NOT_SUPPORTED) }
357 }
358
359 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
361}
362
363pub trait IntoSessionManager {
364 type SM: SessionManager;
365
366 fn into_session_manager(self) -> Arc<Self::SM>;
367}
368
369impl<SM: SessionManager> BlockServer<SM> {
370 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
371 Self { block_size, session_manager: session_manager.into_session_manager() }
372 }
373
374 pub fn session_manager(&self) -> &SM {
375 self.session_manager.as_ref()
376 }
377
378 pub async fn handle_requests(
380 &self,
381 mut requests: fvolume::VolumeRequestStream,
382 ) -> Result<(), Error> {
383 let scope = fasync::Scope::new();
384 loop {
385 match requests.try_next().await {
386 Ok(Some(request)) => {
387 if let Some(session) = self.handle_request(request).await? {
388 scope.spawn(session.map(|_| ()));
389 }
390 }
391 Ok(None) => break,
392 Err(err) => log::warn!(err:?; "Invalid request"),
393 }
394 }
395 scope.await;
396 Ok(())
397 }
398
399 async fn handle_request(
402 &self,
403 request: fvolume::VolumeRequest,
404 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
405 match request {
406 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
407 Ok(info) => {
408 let max_transfer_size = info.max_transfer_size(self.block_size);
409 let (block_count, mut flags) = match info.as_ref() {
410 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
411 (*block_count, *device_flags)
412 }
413 DeviceInfo::Partition(partition_info) => {
414 let block_count = if let Some(range) =
415 partition_info.block_range.as_ref()
416 {
417 range.end - range.start
418 } else {
419 let volume_info = self.session_manager.get_volume_info().await?;
420 volume_info.0.slice_size * volume_info.1.partition_slice_count
421 / self.block_size as u64
422 };
423 (block_count, partition_info.device_flags)
424 }
425 };
426 if SM::SUPPORTS_DECOMPRESSION {
427 flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
428 }
429 responder.send(Ok(&fblock::BlockInfo {
430 block_count,
431 block_size: self.block_size,
432 max_transfer_size,
433 flags,
434 }))?;
435 }
436 Err(status) => responder.send(Err(status.into_raw()))?,
437 },
438 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
439 match self.device_info().await {
440 Ok(info) => {
441 return Ok(Some(self.session_manager.clone().open_session(
442 session.into_stream(),
443 OffsetMap::empty(info.max_transfer_blocks()),
444 self.block_size,
445 )));
446 }
447 Err(status) => session.close_with_epitaph(status)?,
448 }
449 }
450 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
451 session,
452 mapping,
453 control_handle: _,
454 } => match self.device_info().await {
455 Ok(info) => {
456 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
457 Ok(m) => m,
458 Err(status) => {
459 session.close_with_epitaph(status)?;
460 return Ok(None);
461 }
462 };
463 if let Some(max) = info.block_count() {
464 if initial_mapping.target_block_offset + initial_mapping.length > max {
465 log::warn!(
466 "Invalid mapping for session: {initial_mapping:?} (max {max})"
467 );
468 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
469 return Ok(None);
470 }
471 }
472 return Ok(Some(self.session_manager.clone().open_session(
473 session.into_stream(),
474 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
475 self.block_size,
476 )));
477 }
478 Err(status) => session.close_with_epitaph(status)?,
479 },
480 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
481 Ok(info) => {
482 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
483 let mut guid =
484 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
485 guid.value.copy_from_slice(&partition_info.type_guid);
486 responder.send(zx::sys::ZX_OK, Some(&guid))?;
487 } else {
488 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
489 }
490 }
491 Err(status) => {
492 responder.send(status.into_raw(), None)?;
493 }
494 },
495 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
496 match self.device_info().await {
497 Ok(info) => {
498 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
499 let mut guid =
500 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
501 guid.value.copy_from_slice(&partition_info.instance_guid);
502 responder.send(zx::sys::ZX_OK, Some(&guid))?;
503 } else {
504 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
505 }
506 }
507 Err(status) => {
508 responder.send(status.into_raw(), None)?;
509 }
510 }
511 }
512 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
513 Ok(info) => {
514 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
515 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
516 } else {
517 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
518 }
519 }
520 Err(status) => {
521 responder.send(status.into_raw(), None)?;
522 }
523 },
524 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
525 Ok(info) => {
526 if let DeviceInfo::Partition(info) = info.as_ref() {
527 let mut type_guid =
528 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
529 type_guid.value.copy_from_slice(&info.type_guid);
530 let mut instance_guid =
531 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
532 instance_guid.value.copy_from_slice(&info.instance_guid);
533 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
534 name: Some(info.name.clone()),
535 type_guid: Some(type_guid),
536 instance_guid: Some(instance_guid),
537 start_block_offset: info.block_range.as_ref().map(|range| range.start),
538 num_blocks: info
539 .block_range
540 .as_ref()
541 .map(|range| range.end - range.start),
542 flags: Some(info.flags),
543 ..Default::default()
544 }))?;
545 } else {
546 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
547 }
548 }
549 Err(status) => responder.send(Err(status.into_raw()))?,
550 },
551 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
552 match self.session_manager.query_slices(&start_slices).await {
553 Ok(mut results) => {
554 let results_len = results.len();
555 assert!(results_len <= 16);
556 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
557 responder.send(
558 zx::sys::ZX_OK,
559 &results.try_into().unwrap(),
560 results_len as u64,
561 )?;
562 }
563 Err(s) => {
564 responder.send(
565 s.into_raw(),
566 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
567 0,
568 )?;
569 }
570 }
571 }
572 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
573 match self.session_manager.get_volume_info().await {
574 Ok((manager_info, volume_info)) => {
575 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
576 }
577 Err(s) => responder.send(s.into_raw(), None, None)?,
578 }
579 }
580 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
581 responder.send(
582 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
583 .into_raw(),
584 )?;
585 }
586 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
587 responder.send(
588 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
589 .into_raw(),
590 )?;
591 }
592 fvolume::VolumeRequest::Destroy { responder, .. } => {
593 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
594 }
595 }
596 Ok(None)
597 }
598
599 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
600 self.session_manager.get_info().await
601 }
602}
603
604struct SessionHelper<SM: SessionManager> {
605 session_manager: Arc<SM>,
606 offset_map: OffsetMap,
607 block_size: u32,
608 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
609 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
610}
611
612struct VmoMapping {
613 base: usize,
614 size: usize,
615}
616
617impl VmoMapping {
618 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
619 let size = vmo.get_size().unwrap() as usize;
620 Ok(Arc::new(Self {
621 base: fuchsia_runtime::vmar_root_self()
622 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
623 .inspect_err(|error| {
624 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
625 })?,
626 size,
627 }))
628 }
629}
630
631impl Drop for VmoMapping {
632 fn drop(&mut self) {
633 unsafe {
635 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
636 }
637 }
638}
639
640impl<SM: SessionManager> SessionHelper<SM> {
641 fn new(
642 session_manager: Arc<SM>,
643 offset_map: OffsetMap,
644 block_size: u32,
645 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
646 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
647 Ok((
648 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
649 fifo,
650 ))
651 }
652
653 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
654 match request {
655 fblock::SessionRequest::GetFifo { responder } => {
656 let rights = zx::Rights::TRANSFER
657 | zx::Rights::READ
658 | zx::Rights::WRITE
659 | zx::Rights::SIGNAL
660 | zx::Rights::WAIT;
661 match self.peer_fifo.duplicate_handle(rights) {
662 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
663 Err(s) => responder.send(Err(s.into_raw()))?,
664 }
665 Ok(())
666 }
667 fblock::SessionRequest::AttachVmo { vmo, responder } => {
668 let vmo = Arc::new(vmo);
669 let vmo_id = {
670 let mut vmos = self.vmos.lock();
671 if vmos.len() == u16::MAX as usize {
672 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
673 return Ok(());
674 } else {
675 let vmo_id = match vmos.last_entry() {
676 None => 1,
677 Some(o) => {
678 o.key().checked_add(1).unwrap_or_else(|| {
679 let mut vmo_id = 1;
680 for (&id, _) in &*vmos {
682 if id > vmo_id {
683 break;
684 }
685 vmo_id = id + 1;
686 }
687 vmo_id
688 })
689 }
690 };
691 vmos.insert(vmo_id, (vmo.clone(), None));
692 vmo_id
693 }
694 };
695 self.session_manager.clone().on_attach_vmo(&vmo).await?;
696 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
697 Ok(())
698 }
699 fblock::SessionRequest::Close { responder } => {
700 responder.send(Ok(()))?;
701 Err(anyhow!("Closed"))
702 }
703 }
704 }
705
706 fn decode_fifo_request(
708 &self,
709 session: SM::Session,
710 request: &BlockFifoRequest,
711 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
712 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
713
714 let request_bytes = request.length as u64 * self.block_size as u64;
715
716 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
717 .ok_or(zx::Status::INVALID_ARGS)
718 .and_then(|code| {
719 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
720 if code != BlockOpcode::Read {
721 return Err(zx::Status::INVALID_ARGS);
722 }
723 if !SM::SUPPORTS_DECOMPRESSION {
724 return Err(zx::Status::NOT_SUPPORTED);
725 }
726 }
727 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
728 if request.length == 0 {
729 return Err(zx::Status::INVALID_ARGS);
730 }
731 if request.dev_offset.checked_add(request.length as u64).is_none()
733 || (code != BlockOpcode::Trim
734 && request_bytes.checked_add(request.vmo_offset).is_none())
735 {
736 return Err(zx::Status::OUT_OF_RANGE);
737 }
738 }
739 Ok(match code {
740 BlockOpcode::Read => Operation::Read {
741 device_block_offset: request.dev_offset,
742 block_count: request.length,
743 _unused: 0,
744 vmo_offset: request
745 .vmo_offset
746 .checked_mul(self.block_size as u64)
747 .ok_or(zx::Status::OUT_OF_RANGE)?,
748 options: ReadOptions {
749 inline_crypto_options: InlineCryptoOptions {
750 dun: request.dun,
751 slot: request.slot,
752 },
753 },
754 },
755 BlockOpcode::Write => {
756 let mut options = WriteOptions {
757 inline_crypto_options: InlineCryptoOptions {
758 dun: request.dun,
759 slot: request.slot,
760 },
761 ..WriteOptions::default()
762 };
763 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
764 options.flags |= WriteFlags::FORCE_ACCESS;
765 }
766 if flags.contains(BlockIoFlag::PRE_BARRIER) {
767 options.flags |= WriteFlags::PRE_BARRIER;
768 }
769 Operation::Write {
770 device_block_offset: request.dev_offset,
771 block_count: request.length,
772 _unused: 0,
773 options,
774 vmo_offset: request
775 .vmo_offset
776 .checked_mul(self.block_size as u64)
777 .ok_or(zx::Status::OUT_OF_RANGE)?,
778 }
779 }
780 BlockOpcode::Flush => Operation::Flush,
781 BlockOpcode::Trim => Operation::Trim {
782 device_block_offset: request.dev_offset,
783 block_count: request.length,
784 },
785 BlockOpcode::CloseVmo => Operation::CloseVmo,
786 })
787 });
788
789 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
790 GroupOrRequest::Group(request.group)
791 } else {
792 GroupOrRequest::Request(request.reqid)
793 };
794
795 let mut active_requests = self.session_manager.active_requests().0.lock();
796 let mut request_id = None;
797
798 if group_or_request.is_group() {
809 for (key, group) in &mut active_requests.requests {
813 if group.group_or_request == group_or_request {
814 if group.req_id.is_some() {
815 if group.status == zx::Status::OK {
817 group.status = zx::Status::INVALID_ARGS;
818 }
819 return Err(None);
821 }
822 if group.status == zx::Status::OK
824 && let Some(info) = &mut group.decompression_info
825 {
826 if let Ok(Operation::Read {
827 device_block_offset,
828 mut block_count,
829 options,
830 vmo_offset: 0,
831 ..
832 }) = operation
833 {
834 let remaining_bytes = info
835 .compressed_range
836 .end
837 .next_multiple_of(self.block_size as usize)
838 as u64
839 - info.bytes_so_far;
840 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
841 || request.total_compressed_bytes != 0
842 || request.uncompressed_bytes != 0
843 || request.compressed_prefix_bytes != 0
844 || (flags.contains(BlockIoFlag::GROUP_LAST)
845 && info.bytes_so_far + request_bytes
846 < info.compressed_range.end as u64)
847 || (!flags.contains(BlockIoFlag::GROUP_LAST)
848 && request_bytes >= remaining_bytes)
849 {
850 group.status = zx::Status::INVALID_ARGS;
851 } else {
852 if request_bytes > remaining_bytes {
861 block_count = (remaining_bytes / self.block_size as u64) as u32;
862 }
863
864 operation = Ok(Operation::ContinueDecompressedRead {
865 offset: info.bytes_so_far,
866 device_block_offset,
867 block_count,
868 options,
869 });
870
871 info.bytes_so_far += block_count as u64 * self.block_size as u64;
872 }
873 } else {
874 group.status = zx::Status::INVALID_ARGS;
875 }
876 }
877 if flags.contains(BlockIoFlag::GROUP_LAST) {
878 group.req_id = Some(request.reqid);
879 if group.status != zx::Status::OK {
882 operation = Err(group.status);
883 }
884 } else if group.status != zx::Status::OK {
885 return Err(None);
888 }
889 request_id = Some(RequestId(key));
890 group.count += 1;
891 break;
892 }
893 }
894 }
895
896 let is_single_request =
897 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
898
899 let mut decompression_info = None;
900 let vmo = match operation {
901 Ok(Operation::Read {
902 device_block_offset,
903 mut block_count,
904 options,
905 vmo_offset,
906 ..
907 }) => match self.vmos.lock().get_mut(&request.vmoid) {
908 Some((vmo, mapping)) => {
909 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
910 let compressed_range = request.compressed_prefix_bytes as usize
911 ..request.compressed_prefix_bytes as usize
912 + request.total_compressed_bytes as usize;
913 let required_buffer_size =
914 compressed_range.end.next_multiple_of(self.block_size as usize);
915
916 if compressed_range.start >= compressed_range.end
918 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
919 || (is_single_request && request_bytes < compressed_range.end as u64)
920 || (!is_single_request && request_bytes >= required_buffer_size as u64)
921 {
922 Err(zx::Status::INVALID_ARGS)
923 } else {
924 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
931 block_count =
932 (required_buffer_size / self.block_size as usize) as u32;
933 required_buffer_size as u64
934 } else {
935 request_bytes
936 };
937
938 match mapping {
940 Some(mapping) => Ok(mapping.clone()),
941 None => {
942 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
943 }
944 }
945 .and_then(|mapping| {
946 if vmo_offset
949 .checked_add(request.uncompressed_bytes as u64)
950 .is_some_and(|end| end <= mapping.size as u64)
951 {
952 Ok(mapping)
953 } else {
954 Err(zx::Status::OUT_OF_RANGE)
955 }
956 })
957 .map(|mapping| {
958 operation = Ok(Operation::StartDecompressedRead {
963 required_buffer_size,
964 device_block_offset,
965 block_count,
966 options,
967 });
968 decompression_info = Some(DecompressionInfo {
971 compressed_range,
972 bytes_so_far,
973 mapping,
974 uncompressed_range: vmo_offset
975 ..vmo_offset + request.uncompressed_bytes as u64,
976 buffer: None,
977 });
978 None
979 })
980 }
981 } else {
982 Ok(Some(vmo.clone()))
983 }
984 }
985 None => Err(zx::Status::IO),
986 },
987 Ok(Operation::Write { .. }) => self
988 .vmos
989 .lock()
990 .get(&request.vmoid)
991 .cloned()
992 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
993 Ok(Operation::CloseVmo) => {
994 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
995 let vmo_clone = vmo.clone();
996 Epoch::global().defer(move || drop(vmo_clone));
999 Ok(Some(vmo))
1000 })
1001 }
1002 _ => Ok(None),
1003 }
1004 .unwrap_or_else(|e| {
1005 operation = Err(e);
1006 None
1007 });
1008
1009 let trace_flow_id = NonZero::new(request.trace_flow_id);
1010 let request_id = request_id.unwrap_or_else(|| {
1011 RequestId(active_requests.requests.insert(ActiveRequest {
1012 session,
1013 group_or_request,
1014 trace_flow_id,
1015 _epoch_guard: Epoch::global().guard(),
1016 status: zx::Status::OK,
1017 count: 1,
1018 req_id: is_single_request.then_some(request.reqid),
1019 decompression_info,
1020 }))
1021 });
1022
1023 Ok(DecodedRequest {
1024 request_id,
1025 trace_flow_id,
1026 operation: operation.map_err(|status| {
1027 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1028 })?,
1029 vmo,
1030 })
1031 }
1032
1033 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1034 std::mem::take(&mut *self.vmos.lock())
1035 }
1036
1037 fn map_request(
1039 &self,
1040 mut request: DecodedRequest,
1041 active_request: &mut ActiveRequest<SM::Session>,
1042 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1043 if active_request.status != zx::Status::OK {
1044 return Err(zx::Status::BAD_STATE);
1045 }
1046 let mapping = self.offset_map.mapping();
1047 match (mapping, request.operation.blocks()) {
1048 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1049 return Err(zx::Status::OUT_OF_RANGE);
1050 }
1051 _ => {}
1052 }
1053 let remainder = request.operation.map(
1054 self.offset_map.mapping(),
1055 self.offset_map.max_transfer_blocks(),
1056 self.block_size,
1057 );
1058 if remainder.is_some() {
1059 active_request.count += 1;
1060 }
1061 static CACHE: AtomicU64 = AtomicU64::new(0);
1062 if let Some(context) =
1063 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
1064 {
1065 use fuchsia_trace::ArgValue;
1066 let trace_args = [
1067 ArgValue::of("request_id", request.request_id.0),
1068 ArgValue::of("opcode", request.operation.trace_label()),
1069 ];
1070 let _scope = fuchsia_trace::duration(
1071 c"storage",
1072 c"block_server::start_transaction",
1073 &trace_args,
1074 );
1075 if let Some(trace_flow_id) = active_request.trace_flow_id {
1076 fuchsia_trace::flow_step(
1077 &context,
1078 c"block_server::start_transaction",
1079 trace_flow_id.get().into(),
1080 &[],
1081 );
1082 }
1083 }
1084 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1085 Ok((request, remainder))
1086 }
1087
1088 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1089 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1090 }
1091}
1092
1093#[repr(transparent)]
1094#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1095pub struct RequestId(usize);
1096
1097#[derive(Clone, Debug)]
1098struct DecodedRequest {
1099 request_id: RequestId,
1100 trace_flow_id: TraceFlowId,
1101 operation: Operation,
1102 vmo: Option<Arc<zx::Vmo>>,
1103}
1104
1105pub type WriteFlags = block_protocol::WriteFlags;
1107pub type WriteOptions = block_protocol::WriteOptions;
1108pub type ReadOptions = block_protocol::ReadOptions;
1109
1110#[repr(C)]
1111#[derive(Clone, Debug, PartialEq, Eq)]
1112pub enum Operation {
1113 Read {
1118 device_block_offset: u64,
1119 block_count: u32,
1120 _unused: u32,
1121 vmo_offset: u64,
1122 options: ReadOptions,
1123 },
1124 Write {
1125 device_block_offset: u64,
1126 block_count: u32,
1127 _unused: u32,
1128 vmo_offset: u64,
1129 options: WriteOptions,
1130 },
1131 Flush,
1132 Trim {
1133 device_block_offset: u64,
1134 block_count: u32,
1135 },
1136 CloseVmo,
1138 StartDecompressedRead {
1139 required_buffer_size: usize,
1140 device_block_offset: u64,
1141 block_count: u32,
1142 options: ReadOptions,
1143 },
1144 ContinueDecompressedRead {
1145 offset: u64,
1146 device_block_offset: u64,
1147 block_count: u32,
1148 options: ReadOptions,
1149 },
1150}
1151
1152impl Operation {
1153 fn trace_label(&self) -> &'static str {
1154 match self {
1155 Operation::Read { .. } => "read",
1156 Operation::Write { .. } => "write",
1157 Operation::Flush { .. } => "flush",
1158 Operation::Trim { .. } => "trim",
1159 Operation::CloseVmo { .. } => "close_vmo",
1160 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1161 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1162 }
1163 }
1164
1165 fn blocks(&self) -> Option<(u64, u32)> {
1167 match self {
1168 Operation::Read { device_block_offset, block_count, .. }
1169 | Operation::Write { device_block_offset, block_count, .. }
1170 | Operation::Trim { device_block_offset, block_count, .. } => {
1171 Some((*device_block_offset, *block_count))
1172 }
1173 _ => None,
1174 }
1175 }
1176
1177 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1179 match self {
1180 Operation::Read { device_block_offset, block_count, .. }
1181 | Operation::Write { device_block_offset, block_count, .. }
1182 | Operation::Trim { device_block_offset, block_count, .. } => {
1183 Some((device_block_offset, block_count))
1184 }
1185 _ => None,
1186 }
1187 }
1188
1189 fn map(
1192 &mut self,
1193 mapping: Option<&BlockOffsetMapping>,
1194 max_blocks: Option<NonZero<u32>>,
1195 block_size: u32,
1196 ) -> Option<Self> {
1197 let mut max = match self {
1198 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1199 _ => None,
1200 };
1201 let (offset, length) = self.blocks_mut()?;
1202 let orig_offset = *offset;
1203 if let Some(mapping) = mapping {
1204 let delta = *offset - mapping.source_block_offset;
1205 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1206 *offset = mapping.target_block_offset + delta;
1207 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1208 max = match max {
1209 None => Some(mapping_max),
1210 Some(m) => Some(std::cmp::min(m, mapping_max)),
1211 };
1212 };
1213 if let Some(max) = max {
1214 if *length as u64 > max {
1215 let rem = (*length as u64 - max) as u32;
1216 *length = max as u32;
1217 return Some(match self {
1218 Operation::Read {
1219 device_block_offset: _,
1220 block_count: _,
1221 vmo_offset,
1222 _unused,
1223 options,
1224 } => Operation::Read {
1225 device_block_offset: orig_offset + max,
1226 block_count: rem,
1227 vmo_offset: *vmo_offset + max * block_size as u64,
1228 _unused: *_unused,
1229 options: *options,
1230 },
1231 Operation::Write {
1232 device_block_offset: _,
1233 block_count: _,
1234 _unused,
1235 vmo_offset,
1236 options,
1237 } => {
1238 let new_options = WriteOptions {
1240 flags: options.flags & !WriteFlags::PRE_BARRIER,
1241 ..*options
1242 };
1243
1244 Operation::Write {
1245 device_block_offset: orig_offset + max,
1246 block_count: rem,
1247 _unused: *_unused,
1248 vmo_offset: *vmo_offset + max * block_size as u64,
1249 options: new_options,
1250 }
1251 }
1252 Operation::Trim { device_block_offset: _, block_count: _ } => {
1253 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1254 }
1255 _ => unreachable!(),
1256 });
1257 }
1258 }
1259 None
1260 }
1261}
1262
1263#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1264pub enum GroupOrRequest {
1265 Group(u16),
1266 Request(u32),
1267}
1268
1269impl GroupOrRequest {
1270 fn is_group(&self) -> bool {
1271 matches!(self, Self::Group(_))
1272 }
1273
1274 fn group_id(&self) -> Option<u16> {
1275 match self {
1276 Self::Group(id) => Some(*id),
1277 Self::Request(_) => None,
1278 }
1279 }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use super::{
1285 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1286 TraceFlowId,
1287 };
1288 use assert_matches::assert_matches;
1289 use block_protocol::{
1290 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1291 WriteOptions,
1292 };
1293 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1294 use fuchsia_sync::Mutex;
1295 use futures::FutureExt as _;
1296 use futures::channel::oneshot;
1297 use futures::future::BoxFuture;
1298 use std::borrow::Cow;
1299 use std::future::poll_fn;
1300 use std::num::NonZero;
1301 use std::pin::pin;
1302 use std::sync::Arc;
1303 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1304 use std::task::{Context, Poll};
1305 use zx::{AsHandleRef as _, HandleBased as _};
1306 use {
1307 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1308 fuchsia_async as fasync,
1309 };
1310
1311 #[derive(Default)]
1312 struct MockInterface {
1313 read_hook: Option<
1314 Box<
1315 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1316 + Send
1317 + Sync,
1318 >,
1319 >,
1320 write_hook:
1321 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1322 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1323 }
1324
1325 impl super::async_interface::Interface for MockInterface {
1326 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1327 Ok(())
1328 }
1329
1330 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1331 Ok(Cow::Owned(test_device_info()))
1332 }
1333
1334 async fn read(
1335 &self,
1336 device_block_offset: u64,
1337 block_count: u32,
1338 vmo: &Arc<zx::Vmo>,
1339 vmo_offset: u64,
1340 _opts: ReadOptions,
1341 _trace_flow_id: TraceFlowId,
1342 ) -> Result<(), zx::Status> {
1343 if let Some(read_hook) = &self.read_hook {
1344 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1345 } else {
1346 unimplemented!();
1347 }
1348 }
1349
1350 async fn write(
1351 &self,
1352 device_block_offset: u64,
1353 _block_count: u32,
1354 _vmo: &Arc<zx::Vmo>,
1355 _vmo_offset: u64,
1356 _opts: WriteOptions,
1357 _trace_flow_id: TraceFlowId,
1358 ) -> Result<(), zx::Status> {
1359 if let Some(write_hook) = &self.write_hook {
1360 write_hook(device_block_offset).await
1361 } else {
1362 unimplemented!();
1363 }
1364 }
1365
1366 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1367 unreachable!();
1368 }
1369
1370 fn barrier(&self) -> Result<(), zx::Status> {
1371 if let Some(barrier_hook) = &self.barrier_hook {
1372 barrier_hook()
1373 } else {
1374 unimplemented!();
1375 }
1376 }
1377
1378 async fn trim(
1379 &self,
1380 _device_block_offset: u64,
1381 _block_count: u32,
1382 _trace_flow_id: TraceFlowId,
1383 ) -> Result<(), zx::Status> {
1384 unreachable!();
1385 }
1386
1387 async fn get_volume_info(
1388 &self,
1389 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1390 let () = std::future::pending().await;
1392 unreachable!();
1393 }
1394 }
1395
1396 const BLOCK_SIZE: u32 = 512;
1397 const MAX_TRANSFER_BLOCKS: u32 = 10;
1398
1399 fn test_device_info() -> DeviceInfo {
1400 DeviceInfo::Partition(PartitionInfo {
1401 device_flags: fblock::Flag::READONLY,
1402 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1403 block_range: Some(0..100),
1404 type_guid: [1; 16],
1405 instance_guid: [2; 16],
1406 name: "foo".to_string(),
1407 flags: 0xabcd,
1408 })
1409 }
1410
1411 #[fuchsia::test]
1412 async fn test_split_request_only_issues_single_barrier() {
1413 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1414 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1415 let barrier_called = Arc::new(AtomicU32::new(0));
1416 let barrier_called_clone = barrier_called.clone();
1417
1418 futures::join!(
1419 async move {
1420 let block_server = BlockServer::new(
1421 BLOCK_SIZE,
1422 Arc::new(MockInterface {
1423 barrier_hook: Some(Box::new(move || {
1424 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1425 Ok(())
1426 })),
1427 write_hook: Some(Box::new(move |_device_block_offset| {
1428 Box::pin(async move { Ok(()) })
1429 })),
1430 ..MockInterface::default()
1431 }),
1432 );
1433 block_server.handle_requests(stream).await.unwrap();
1434 },
1435 async move {
1436 let (session_proxy, server) = fidl::endpoints::create_proxy();
1437
1438 proxy.open_session(server).unwrap();
1439
1440 let vmo_id = session_proxy
1441 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1442 .await
1443 .unwrap()
1444 .unwrap();
1445 assert_ne!(vmo_id.id, 0);
1446
1447 let mut fifo =
1448 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1449 let (mut reader, mut writer) = fifo.async_io();
1450 writer
1453 .write_entries(&BlockFifoRequest {
1454 command: BlockFifoCommand {
1455 opcode: BlockOpcode::Write.into_primitive(),
1456 flags: BlockIoFlag::PRE_BARRIER.bits(),
1457 ..Default::default()
1458 },
1459 vmoid: vmo_id.id,
1460 dev_offset: 0,
1461 length: MAX_TRANSFER_BLOCKS + 1,
1462 vmo_offset: 6,
1463 ..Default::default()
1464 })
1465 .await
1466 .unwrap();
1467
1468 let mut response = BlockFifoResponse::default();
1469 reader.read_entries(&mut response).await.unwrap();
1470 assert_eq!(response.status, zx::sys::ZX_OK);
1471
1472 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1473
1474 std::mem::drop(proxy);
1475 }
1476 );
1477 }
1478
1479 #[fuchsia::test]
1480 async fn test_barriers_not_supported() {
1481 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1482 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1483
1484 futures::join!(
1485 async move {
1486 let block_server = BlockServer::new(
1487 BLOCK_SIZE,
1488 Arc::new(MockInterface {
1489 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1490 write_hook: Some(Box::new(move |_device_block_offset| {
1491 Box::pin(async move { Ok(()) })
1492 })),
1493 ..MockInterface::default()
1494 }),
1495 );
1496 block_server.handle_requests(stream).await.unwrap();
1497 },
1498 async move {
1499 let (session_proxy, server) = fidl::endpoints::create_proxy();
1500
1501 proxy.open_session(server).unwrap();
1502
1503 let vmo_id = session_proxy
1504 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1505 .await
1506 .unwrap()
1507 .unwrap();
1508 assert_ne!(vmo_id.id, 0);
1509
1510 let mut fifo =
1511 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1512 let (mut reader, mut writer) = fifo.async_io();
1513
1514 writer
1517 .write_entries(&BlockFifoRequest {
1518 command: BlockFifoCommand {
1519 opcode: BlockOpcode::Write.into_primitive(),
1520 flags: BlockIoFlag::PRE_BARRIER.bits(),
1521 ..Default::default()
1522 },
1523 vmoid: vmo_id.id,
1524 dev_offset: 0,
1525 length: MAX_TRANSFER_BLOCKS + 1,
1526 vmo_offset: 6,
1527 ..Default::default()
1528 })
1529 .await
1530 .unwrap();
1531
1532 let mut response = BlockFifoResponse::default();
1533 reader.read_entries(&mut response).await.unwrap();
1534 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1535
1536 std::mem::drop(proxy);
1537 }
1538 );
1539 }
1540
1541 #[fuchsia::test]
1542 async fn test_barriers_ordering() {
1543 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1544 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1545 let barrier_called = Arc::new(AtomicBool::new(false));
1546
1547 futures::join!(
1548 async move {
1549 let barrier_called_clone = barrier_called.clone();
1550 let block_server = BlockServer::new(
1551 BLOCK_SIZE,
1552 Arc::new(MockInterface {
1553 barrier_hook: Some(Box::new(move || {
1554 barrier_called.store(true, Ordering::Relaxed);
1555 Ok(())
1556 })),
1557 write_hook: Some(Box::new(move |device_block_offset| {
1558 let barrier_called = barrier_called_clone.clone();
1559 Box::pin(async move {
1560 if device_block_offset % 2 == 0 {
1562 fasync::Timer::new(fasync::MonotonicInstant::after(
1563 zx::MonotonicDuration::from_millis(200),
1564 ))
1565 .await;
1566 }
1567 assert!(barrier_called.load(Ordering::Relaxed));
1568 Ok(())
1569 })
1570 })),
1571 ..MockInterface::default()
1572 }),
1573 );
1574 block_server.handle_requests(stream).await.unwrap();
1575 },
1576 async move {
1577 let (session_proxy, server) = fidl::endpoints::create_proxy();
1578
1579 proxy.open_session(server).unwrap();
1580
1581 let vmo_id = session_proxy
1582 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1583 .await
1584 .unwrap()
1585 .unwrap();
1586 assert_ne!(vmo_id.id, 0);
1587
1588 let mut fifo =
1589 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1590 let (mut reader, mut writer) = fifo.async_io();
1591
1592 writer
1593 .write_entries(&BlockFifoRequest {
1594 command: BlockFifoCommand {
1595 opcode: BlockOpcode::Write.into_primitive(),
1596 flags: BlockIoFlag::PRE_BARRIER.bits(),
1597 ..Default::default()
1598 },
1599 vmoid: vmo_id.id,
1600 dev_offset: 0,
1601 length: 5,
1602 vmo_offset: 6,
1603 ..Default::default()
1604 })
1605 .await
1606 .unwrap();
1607
1608 for i in 0..10 {
1609 writer
1610 .write_entries(&BlockFifoRequest {
1611 command: BlockFifoCommand {
1612 opcode: BlockOpcode::Write.into_primitive(),
1613 ..Default::default()
1614 },
1615 vmoid: vmo_id.id,
1616 dev_offset: i + 1,
1617 length: 5,
1618 vmo_offset: 6,
1619 ..Default::default()
1620 })
1621 .await
1622 .unwrap();
1623 }
1624 for _ in 0..11 {
1625 let mut response = BlockFifoResponse::default();
1626 reader.read_entries(&mut response).await.unwrap();
1627 assert_eq!(response.status, zx::sys::ZX_OK);
1628 }
1629
1630 std::mem::drop(proxy);
1631 }
1632 );
1633 }
1634
1635 #[fuchsia::test]
1636 async fn test_info() {
1637 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1638
1639 futures::join!(
1640 async {
1641 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1642 block_server.handle_requests(stream).await.unwrap();
1643 },
1644 async {
1645 let expected_info = test_device_info();
1646 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1647 info
1648 } else {
1649 unreachable!()
1650 };
1651
1652 let block_info = proxy.get_info().await.unwrap().unwrap();
1653 assert_eq!(
1654 block_info.block_count,
1655 partition_info.block_range.as_ref().unwrap().end
1656 - partition_info.block_range.as_ref().unwrap().start
1657 );
1658 assert_eq!(
1659 block_info.flags,
1660 fblock::Flag::READONLY | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1661 );
1662
1663 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1664
1665 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1666 assert_eq!(status, zx::sys::ZX_OK);
1667 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1668
1669 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1670 assert_eq!(status, zx::sys::ZX_OK);
1671 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1672
1673 let (status, name) = proxy.get_name().await.unwrap();
1674 assert_eq!(status, zx::sys::ZX_OK);
1675 assert_eq!(name.as_ref(), Some(&partition_info.name));
1676
1677 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1678 assert_eq!(metadata.name, name);
1679 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1680 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1681 assert_eq!(
1682 metadata.start_block_offset,
1683 Some(partition_info.block_range.as_ref().unwrap().start)
1684 );
1685 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1686 assert_eq!(metadata.flags, Some(partition_info.flags));
1687
1688 std::mem::drop(proxy);
1689 }
1690 );
1691 }
1692
1693 #[fuchsia::test]
1694 async fn test_attach_vmo() {
1695 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1696
1697 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1698 let koid = vmo.get_koid().unwrap();
1699
1700 futures::join!(
1701 async {
1702 let block_server = BlockServer::new(
1703 BLOCK_SIZE,
1704 Arc::new(MockInterface {
1705 read_hook: Some(Box::new(move |_, _, vmo, _| {
1706 assert_eq!(vmo.get_koid().unwrap(), koid);
1707 Box::pin(async { Ok(()) })
1708 })),
1709 ..MockInterface::default()
1710 }),
1711 );
1712 block_server.handle_requests(stream).await.unwrap();
1713 },
1714 async move {
1715 let (session_proxy, server) = fidl::endpoints::create_proxy();
1716
1717 proxy.open_session(server).unwrap();
1718
1719 let vmo_id = session_proxy
1720 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1721 .await
1722 .unwrap()
1723 .unwrap();
1724 assert_ne!(vmo_id.id, 0);
1725
1726 let mut fifo =
1727 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1728 let (mut reader, mut writer) = fifo.async_io();
1729
1730 let mut count = 1;
1732 loop {
1733 match session_proxy
1734 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1735 .await
1736 .unwrap()
1737 {
1738 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1739 Err(e) => {
1740 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1741 break;
1742 }
1743 }
1744
1745 if count % 10 == 0 {
1747 writer
1748 .write_entries(&BlockFifoRequest {
1749 command: BlockFifoCommand {
1750 opcode: BlockOpcode::Read.into_primitive(),
1751 ..Default::default()
1752 },
1753 vmoid: vmo_id.id,
1754 length: 1,
1755 ..Default::default()
1756 })
1757 .await
1758 .unwrap();
1759
1760 let mut response = BlockFifoResponse::default();
1761 reader.read_entries(&mut response).await.unwrap();
1762 assert_eq!(response.status, zx::sys::ZX_OK);
1763 }
1764
1765 count += 1;
1766 }
1767
1768 assert_eq!(count, u16::MAX as u64);
1769
1770 writer
1772 .write_entries(&BlockFifoRequest {
1773 command: BlockFifoCommand {
1774 opcode: BlockOpcode::CloseVmo.into_primitive(),
1775 ..Default::default()
1776 },
1777 vmoid: vmo_id.id,
1778 ..Default::default()
1779 })
1780 .await
1781 .unwrap();
1782
1783 let mut response = BlockFifoResponse::default();
1784 reader.read_entries(&mut response).await.unwrap();
1785 assert_eq!(response.status, zx::sys::ZX_OK);
1786
1787 let new_vmo_id = session_proxy
1788 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1789 .await
1790 .unwrap()
1791 .unwrap();
1792 assert_eq!(new_vmo_id.id, vmo_id.id);
1794
1795 std::mem::drop(proxy);
1796 }
1797 );
1798 }
1799
1800 #[fuchsia::test]
1801 async fn test_close() {
1802 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1803
1804 let mut server = std::pin::pin!(
1805 async {
1806 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1807 block_server.handle_requests(stream).await.unwrap();
1808 }
1809 .fuse()
1810 );
1811
1812 let mut client = std::pin::pin!(
1813 async {
1814 let (session_proxy, server) = fidl::endpoints::create_proxy();
1815
1816 proxy.open_session(server).unwrap();
1817
1818 std::mem::drop(proxy);
1821
1822 session_proxy.close().await.unwrap().unwrap();
1823
1824 let _: () = std::future::pending().await;
1826 }
1827 .fuse()
1828 );
1829
1830 futures::select!(
1831 _ = server => {}
1832 _ = client => unreachable!(),
1833 );
1834 }
1835
1836 #[derive(Default)]
1837 struct IoMockInterface {
1838 do_checks: bool,
1839 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1840 return_errors: bool,
1841 }
1842
1843 #[derive(Debug)]
1844 enum ExpectedOp {
1845 Read(u64, u32, u64),
1846 Write(u64, u32, u64),
1847 Trim(u64, u32),
1848 Flush,
1849 }
1850
1851 impl super::async_interface::Interface for IoMockInterface {
1852 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1853 Ok(())
1854 }
1855
1856 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1857 Ok(Cow::Owned(test_device_info()))
1858 }
1859
1860 async fn read(
1861 &self,
1862 device_block_offset: u64,
1863 block_count: u32,
1864 _vmo: &Arc<zx::Vmo>,
1865 vmo_offset: u64,
1866 _opts: ReadOptions,
1867 _trace_flow_id: TraceFlowId,
1868 ) -> Result<(), zx::Status> {
1869 if self.return_errors {
1870 Err(zx::Status::INTERNAL)
1871 } else {
1872 if self.do_checks {
1873 assert_matches!(
1874 self.expected_op.lock().take(),
1875 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1876 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1877 "Read {device_block_offset} {block_count} {vmo_offset}"
1878 );
1879 }
1880 Ok(())
1881 }
1882 }
1883
1884 async fn write(
1885 &self,
1886 device_block_offset: u64,
1887 block_count: u32,
1888 _vmo: &Arc<zx::Vmo>,
1889 vmo_offset: u64,
1890 _write_opts: WriteOptions,
1891 _trace_flow_id: TraceFlowId,
1892 ) -> Result<(), zx::Status> {
1893 if self.return_errors {
1894 Err(zx::Status::NOT_SUPPORTED)
1895 } else {
1896 if self.do_checks {
1897 assert_matches!(
1898 self.expected_op.lock().take(),
1899 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1900 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1901 "Write {device_block_offset} {block_count} {vmo_offset}"
1902 );
1903 }
1904 Ok(())
1905 }
1906 }
1907
1908 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1909 if self.return_errors {
1910 Err(zx::Status::NO_RESOURCES)
1911 } else {
1912 if self.do_checks {
1913 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1914 }
1915 Ok(())
1916 }
1917 }
1918
1919 fn barrier(&self) -> Result<(), zx::Status> {
1920 unreachable!()
1921 }
1922
1923 async fn trim(
1924 &self,
1925 device_block_offset: u64,
1926 block_count: u32,
1927 _trace_flow_id: TraceFlowId,
1928 ) -> Result<(), zx::Status> {
1929 if self.return_errors {
1930 Err(zx::Status::NO_MEMORY)
1931 } else {
1932 if self.do_checks {
1933 assert_matches!(
1934 self.expected_op.lock().take(),
1935 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1936 block_count == b,
1937 "Trim {device_block_offset} {block_count}"
1938 );
1939 }
1940 Ok(())
1941 }
1942 }
1943 }
1944
1945 #[fuchsia::test]
1946 async fn test_io() {
1947 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1948
1949 let expected_op = Arc::new(Mutex::new(None));
1950 let expected_op_clone = expected_op.clone();
1951
1952 let server = async {
1953 let block_server = BlockServer::new(
1954 BLOCK_SIZE,
1955 Arc::new(IoMockInterface {
1956 return_errors: false,
1957 do_checks: true,
1958 expected_op: expected_op_clone,
1959 }),
1960 );
1961 block_server.handle_requests(stream).await.unwrap();
1962 };
1963
1964 let client = async move {
1965 let (session_proxy, server) = fidl::endpoints::create_proxy();
1966
1967 proxy.open_session(server).unwrap();
1968
1969 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1970 let vmo_id = session_proxy
1971 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1972 .await
1973 .unwrap()
1974 .unwrap();
1975
1976 let mut fifo =
1977 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1978 let (mut reader, mut writer) = fifo.async_io();
1979
1980 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1982 writer
1983 .write_entries(&BlockFifoRequest {
1984 command: BlockFifoCommand {
1985 opcode: BlockOpcode::Read.into_primitive(),
1986 ..Default::default()
1987 },
1988 vmoid: vmo_id.id,
1989 dev_offset: 1,
1990 length: 2,
1991 vmo_offset: 3,
1992 ..Default::default()
1993 })
1994 .await
1995 .unwrap();
1996
1997 let mut response = BlockFifoResponse::default();
1998 reader.read_entries(&mut response).await.unwrap();
1999 assert_eq!(response.status, zx::sys::ZX_OK);
2000
2001 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
2003 writer
2004 .write_entries(&BlockFifoRequest {
2005 command: BlockFifoCommand {
2006 opcode: BlockOpcode::Write.into_primitive(),
2007 ..Default::default()
2008 },
2009 vmoid: vmo_id.id,
2010 dev_offset: 4,
2011 length: 5,
2012 vmo_offset: 6,
2013 ..Default::default()
2014 })
2015 .await
2016 .unwrap();
2017
2018 let mut response = BlockFifoResponse::default();
2019 reader.read_entries(&mut response).await.unwrap();
2020 assert_eq!(response.status, zx::sys::ZX_OK);
2021
2022 *expected_op.lock() = Some(ExpectedOp::Flush);
2024 writer
2025 .write_entries(&BlockFifoRequest {
2026 command: BlockFifoCommand {
2027 opcode: BlockOpcode::Flush.into_primitive(),
2028 ..Default::default()
2029 },
2030 ..Default::default()
2031 })
2032 .await
2033 .unwrap();
2034
2035 reader.read_entries(&mut response).await.unwrap();
2036 assert_eq!(response.status, zx::sys::ZX_OK);
2037
2038 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
2040 writer
2041 .write_entries(&BlockFifoRequest {
2042 command: BlockFifoCommand {
2043 opcode: BlockOpcode::Trim.into_primitive(),
2044 ..Default::default()
2045 },
2046 dev_offset: 7,
2047 length: 8,
2048 ..Default::default()
2049 })
2050 .await
2051 .unwrap();
2052
2053 reader.read_entries(&mut response).await.unwrap();
2054 assert_eq!(response.status, zx::sys::ZX_OK);
2055
2056 std::mem::drop(proxy);
2057 };
2058
2059 futures::join!(server, client);
2060 }
2061
2062 #[fuchsia::test]
2063 async fn test_io_errors() {
2064 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2065
2066 futures::join!(
2067 async {
2068 let block_server = BlockServer::new(
2069 BLOCK_SIZE,
2070 Arc::new(IoMockInterface {
2071 return_errors: true,
2072 do_checks: false,
2073 expected_op: Arc::new(Mutex::new(None)),
2074 }),
2075 );
2076 block_server.handle_requests(stream).await.unwrap();
2077 },
2078 async move {
2079 let (session_proxy, server) = fidl::endpoints::create_proxy();
2080
2081 proxy.open_session(server).unwrap();
2082
2083 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2084 let vmo_id = session_proxy
2085 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2086 .await
2087 .unwrap()
2088 .unwrap();
2089
2090 let mut fifo =
2091 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2092 let (mut reader, mut writer) = fifo.async_io();
2093
2094 writer
2096 .write_entries(&BlockFifoRequest {
2097 command: BlockFifoCommand {
2098 opcode: BlockOpcode::Read.into_primitive(),
2099 ..Default::default()
2100 },
2101 vmoid: vmo_id.id,
2102 length: 1,
2103 reqid: 1,
2104 ..Default::default()
2105 })
2106 .await
2107 .unwrap();
2108
2109 let mut response = BlockFifoResponse::default();
2110 reader.read_entries(&mut response).await.unwrap();
2111 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2112
2113 writer
2115 .write_entries(&BlockFifoRequest {
2116 command: BlockFifoCommand {
2117 opcode: BlockOpcode::Write.into_primitive(),
2118 ..Default::default()
2119 },
2120 vmoid: vmo_id.id,
2121 length: 1,
2122 reqid: 2,
2123 ..Default::default()
2124 })
2125 .await
2126 .unwrap();
2127
2128 reader.read_entries(&mut response).await.unwrap();
2129 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2130
2131 writer
2133 .write_entries(&BlockFifoRequest {
2134 command: BlockFifoCommand {
2135 opcode: BlockOpcode::Flush.into_primitive(),
2136 ..Default::default()
2137 },
2138 reqid: 3,
2139 ..Default::default()
2140 })
2141 .await
2142 .unwrap();
2143
2144 reader.read_entries(&mut response).await.unwrap();
2145 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2146
2147 writer
2149 .write_entries(&BlockFifoRequest {
2150 command: BlockFifoCommand {
2151 opcode: BlockOpcode::Trim.into_primitive(),
2152 ..Default::default()
2153 },
2154 reqid: 4,
2155 length: 1,
2156 ..Default::default()
2157 })
2158 .await
2159 .unwrap();
2160
2161 reader.read_entries(&mut response).await.unwrap();
2162 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2163
2164 std::mem::drop(proxy);
2165 }
2166 );
2167 }
2168
2169 #[fuchsia::test]
2170 async fn test_invalid_args() {
2171 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2172
2173 futures::join!(
2174 async {
2175 let block_server = BlockServer::new(
2176 BLOCK_SIZE,
2177 Arc::new(IoMockInterface {
2178 return_errors: false,
2179 do_checks: false,
2180 expected_op: Arc::new(Mutex::new(None)),
2181 }),
2182 );
2183 block_server.handle_requests(stream).await.unwrap();
2184 },
2185 async move {
2186 let (session_proxy, server) = fidl::endpoints::create_proxy();
2187
2188 proxy.open_session(server).unwrap();
2189
2190 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2191 let vmo_id = session_proxy
2192 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2193 .await
2194 .unwrap()
2195 .unwrap();
2196
2197 let mut fifo =
2198 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2199
2200 async fn test(
2201 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2202 request: BlockFifoRequest,
2203 ) -> Result<(), zx::Status> {
2204 let (mut reader, mut writer) = fifo.async_io();
2205 writer.write_entries(&request).await.unwrap();
2206 let mut response = BlockFifoResponse::default();
2207 reader.read_entries(&mut response).await.unwrap();
2208 zx::Status::ok(response.status)
2209 }
2210
2211 let good_read_request = || BlockFifoRequest {
2214 command: BlockFifoCommand {
2215 opcode: BlockOpcode::Read.into_primitive(),
2216 ..Default::default()
2217 },
2218 length: 1,
2219 vmoid: vmo_id.id,
2220 ..Default::default()
2221 };
2222
2223 assert_eq!(
2224 test(
2225 &mut fifo,
2226 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2227 )
2228 .await,
2229 Err(zx::Status::IO)
2230 );
2231
2232 assert_eq!(
2233 test(
2234 &mut fifo,
2235 BlockFifoRequest {
2236 vmo_offset: 0xffff_ffff_ffff_ffff,
2237 ..good_read_request()
2238 }
2239 )
2240 .await,
2241 Err(zx::Status::OUT_OF_RANGE)
2242 );
2243
2244 assert_eq!(
2245 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2246 Err(zx::Status::INVALID_ARGS)
2247 );
2248
2249 let good_write_request = || BlockFifoRequest {
2252 command: BlockFifoCommand {
2253 opcode: BlockOpcode::Write.into_primitive(),
2254 ..Default::default()
2255 },
2256 length: 1,
2257 vmoid: vmo_id.id,
2258 ..Default::default()
2259 };
2260
2261 assert_eq!(
2262 test(
2263 &mut fifo,
2264 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2265 )
2266 .await,
2267 Err(zx::Status::IO)
2268 );
2269
2270 assert_eq!(
2271 test(
2272 &mut fifo,
2273 BlockFifoRequest {
2274 vmo_offset: 0xffff_ffff_ffff_ffff,
2275 ..good_write_request()
2276 }
2277 )
2278 .await,
2279 Err(zx::Status::OUT_OF_RANGE)
2280 );
2281
2282 assert_eq!(
2283 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2284 Err(zx::Status::INVALID_ARGS)
2285 );
2286
2287 assert_eq!(
2290 test(
2291 &mut fifo,
2292 BlockFifoRequest {
2293 command: BlockFifoCommand {
2294 opcode: BlockOpcode::CloseVmo.into_primitive(),
2295 ..Default::default()
2296 },
2297 vmoid: vmo_id.id + 1,
2298 ..Default::default()
2299 }
2300 )
2301 .await,
2302 Err(zx::Status::IO)
2303 );
2304
2305 std::mem::drop(proxy);
2306 }
2307 );
2308 }
2309
2310 #[fuchsia::test]
2311 async fn test_concurrent_requests() {
2312 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2313
2314 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2315 let waiting_readers_clone = waiting_readers.clone();
2316
2317 futures::join!(
2318 async move {
2319 let block_server = BlockServer::new(
2320 BLOCK_SIZE,
2321 Arc::new(MockInterface {
2322 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2323 let (tx, rx) = oneshot::channel();
2324 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2325 Box::pin(async move {
2326 let _ = rx.await;
2327 Ok(())
2328 })
2329 })),
2330 ..MockInterface::default()
2331 }),
2332 );
2333 block_server.handle_requests(stream).await.unwrap();
2334 },
2335 async move {
2336 let (session_proxy, server) = fidl::endpoints::create_proxy();
2337
2338 proxy.open_session(server).unwrap();
2339
2340 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2341 let vmo_id = session_proxy
2342 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2343 .await
2344 .unwrap()
2345 .unwrap();
2346
2347 let mut fifo =
2348 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2349 let (mut reader, mut writer) = fifo.async_io();
2350
2351 writer
2352 .write_entries(&BlockFifoRequest {
2353 command: BlockFifoCommand {
2354 opcode: BlockOpcode::Read.into_primitive(),
2355 ..Default::default()
2356 },
2357 reqid: 1,
2358 dev_offset: 1, vmoid: vmo_id.id,
2360 length: 1,
2361 ..Default::default()
2362 })
2363 .await
2364 .unwrap();
2365
2366 writer
2367 .write_entries(&BlockFifoRequest {
2368 command: BlockFifoCommand {
2369 opcode: BlockOpcode::Read.into_primitive(),
2370 ..Default::default()
2371 },
2372 reqid: 2,
2373 dev_offset: 2,
2374 vmoid: vmo_id.id,
2375 length: 1,
2376 ..Default::default()
2377 })
2378 .await
2379 .unwrap();
2380
2381 poll_fn(|cx: &mut Context<'_>| {
2383 if waiting_readers.lock().len() == 2 {
2384 Poll::Ready(())
2385 } else {
2386 cx.waker().wake_by_ref();
2388 Poll::Pending
2389 }
2390 })
2391 .await;
2392
2393 let mut response = BlockFifoResponse::default();
2394 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2395
2396 let (id, tx) = waiting_readers.lock().pop().unwrap();
2397 tx.send(()).unwrap();
2398
2399 reader.read_entries(&mut response).await.unwrap();
2400 assert_eq!(response.status, zx::sys::ZX_OK);
2401 assert_eq!(response.reqid, id);
2402
2403 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2404
2405 let (id, tx) = waiting_readers.lock().pop().unwrap();
2406 tx.send(()).unwrap();
2407
2408 reader.read_entries(&mut response).await.unwrap();
2409 assert_eq!(response.status, zx::sys::ZX_OK);
2410 assert_eq!(response.reqid, id);
2411 }
2412 );
2413 }
2414
2415 #[fuchsia::test]
2416 async fn test_groups() {
2417 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2418
2419 futures::join!(
2420 async move {
2421 let block_server = BlockServer::new(
2422 BLOCK_SIZE,
2423 Arc::new(MockInterface {
2424 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2425 ..MockInterface::default()
2426 }),
2427 );
2428 block_server.handle_requests(stream).await.unwrap();
2429 },
2430 async move {
2431 let (session_proxy, server) = fidl::endpoints::create_proxy();
2432
2433 proxy.open_session(server).unwrap();
2434
2435 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2436 let vmo_id = session_proxy
2437 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2438 .await
2439 .unwrap()
2440 .unwrap();
2441
2442 let mut fifo =
2443 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2444 let (mut reader, mut writer) = fifo.async_io();
2445
2446 writer
2447 .write_entries(&BlockFifoRequest {
2448 command: BlockFifoCommand {
2449 opcode: BlockOpcode::Read.into_primitive(),
2450 flags: BlockIoFlag::GROUP_ITEM.bits(),
2451 ..Default::default()
2452 },
2453 group: 1,
2454 vmoid: vmo_id.id,
2455 length: 1,
2456 ..Default::default()
2457 })
2458 .await
2459 .unwrap();
2460
2461 writer
2462 .write_entries(&BlockFifoRequest {
2463 command: BlockFifoCommand {
2464 opcode: BlockOpcode::Read.into_primitive(),
2465 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2466 ..Default::default()
2467 },
2468 reqid: 2,
2469 group: 1,
2470 vmoid: vmo_id.id,
2471 length: 1,
2472 ..Default::default()
2473 })
2474 .await
2475 .unwrap();
2476
2477 let mut response = BlockFifoResponse::default();
2478 reader.read_entries(&mut response).await.unwrap();
2479 assert_eq!(response.status, zx::sys::ZX_OK);
2480 assert_eq!(response.reqid, 2);
2481 assert_eq!(response.group, 1);
2482 }
2483 );
2484 }
2485
2486 #[fuchsia::test]
2487 async fn test_group_error() {
2488 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2489
2490 let counter = Arc::new(AtomicU64::new(0));
2491 let counter_clone = counter.clone();
2492
2493 futures::join!(
2494 async move {
2495 let block_server = BlockServer::new(
2496 BLOCK_SIZE,
2497 Arc::new(MockInterface {
2498 read_hook: Some(Box::new(move |_, _, _, _| {
2499 counter_clone.fetch_add(1, Ordering::Relaxed);
2500 Box::pin(async { Err(zx::Status::BAD_STATE) })
2501 })),
2502 ..MockInterface::default()
2503 }),
2504 );
2505 block_server.handle_requests(stream).await.unwrap();
2506 },
2507 async move {
2508 let (session_proxy, server) = fidl::endpoints::create_proxy();
2509
2510 proxy.open_session(server).unwrap();
2511
2512 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2513 let vmo_id = session_proxy
2514 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2515 .await
2516 .unwrap()
2517 .unwrap();
2518
2519 let mut fifo =
2520 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2521 let (mut reader, mut writer) = fifo.async_io();
2522
2523 writer
2524 .write_entries(&BlockFifoRequest {
2525 command: BlockFifoCommand {
2526 opcode: BlockOpcode::Read.into_primitive(),
2527 flags: BlockIoFlag::GROUP_ITEM.bits(),
2528 ..Default::default()
2529 },
2530 group: 1,
2531 vmoid: vmo_id.id,
2532 length: 1,
2533 ..Default::default()
2534 })
2535 .await
2536 .unwrap();
2537
2538 poll_fn(|cx: &mut Context<'_>| {
2540 if counter.load(Ordering::Relaxed) == 1 {
2541 Poll::Ready(())
2542 } else {
2543 cx.waker().wake_by_ref();
2545 Poll::Pending
2546 }
2547 })
2548 .await;
2549
2550 let mut response = BlockFifoResponse::default();
2551 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2552
2553 writer
2554 .write_entries(&BlockFifoRequest {
2555 command: BlockFifoCommand {
2556 opcode: BlockOpcode::Read.into_primitive(),
2557 flags: BlockIoFlag::GROUP_ITEM.bits(),
2558 ..Default::default()
2559 },
2560 group: 1,
2561 vmoid: vmo_id.id,
2562 length: 1,
2563 ..Default::default()
2564 })
2565 .await
2566 .unwrap();
2567
2568 writer
2569 .write_entries(&BlockFifoRequest {
2570 command: BlockFifoCommand {
2571 opcode: BlockOpcode::Read.into_primitive(),
2572 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2573 ..Default::default()
2574 },
2575 reqid: 2,
2576 group: 1,
2577 vmoid: vmo_id.id,
2578 length: 1,
2579 ..Default::default()
2580 })
2581 .await
2582 .unwrap();
2583
2584 reader.read_entries(&mut response).await.unwrap();
2585 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2586 assert_eq!(response.reqid, 2);
2587 assert_eq!(response.group, 1);
2588
2589 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2590
2591 assert_eq!(counter.load(Ordering::Relaxed), 1);
2593 }
2594 );
2595 }
2596
2597 #[fuchsia::test]
2598 async fn test_group_with_two_lasts() {
2599 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2600
2601 let (tx, rx) = oneshot::channel();
2602
2603 futures::join!(
2604 async move {
2605 let rx = Mutex::new(Some(rx));
2606 let block_server = BlockServer::new(
2607 BLOCK_SIZE,
2608 Arc::new(MockInterface {
2609 read_hook: Some(Box::new(move |_, _, _, _| {
2610 let rx = rx.lock().take().unwrap();
2611 Box::pin(async {
2612 let _ = rx.await;
2613 Ok(())
2614 })
2615 })),
2616 ..MockInterface::default()
2617 }),
2618 );
2619 block_server.handle_requests(stream).await.unwrap();
2620 },
2621 async move {
2622 let (session_proxy, server) = fidl::endpoints::create_proxy();
2623
2624 proxy.open_session(server).unwrap();
2625
2626 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2627 let vmo_id = session_proxy
2628 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2629 .await
2630 .unwrap()
2631 .unwrap();
2632
2633 let mut fifo =
2634 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2635 let (mut reader, mut writer) = fifo.async_io();
2636
2637 writer
2638 .write_entries(&BlockFifoRequest {
2639 command: BlockFifoCommand {
2640 opcode: BlockOpcode::Read.into_primitive(),
2641 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2642 ..Default::default()
2643 },
2644 reqid: 1,
2645 group: 1,
2646 vmoid: vmo_id.id,
2647 length: 1,
2648 ..Default::default()
2649 })
2650 .await
2651 .unwrap();
2652
2653 writer
2654 .write_entries(&BlockFifoRequest {
2655 command: BlockFifoCommand {
2656 opcode: BlockOpcode::Read.into_primitive(),
2657 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2658 ..Default::default()
2659 },
2660 reqid: 2,
2661 group: 1,
2662 vmoid: vmo_id.id,
2663 length: 1,
2664 ..Default::default()
2665 })
2666 .await
2667 .unwrap();
2668
2669 writer
2671 .write_entries(&BlockFifoRequest {
2672 command: BlockFifoCommand {
2673 opcode: BlockOpcode::CloseVmo.into_primitive(),
2674 ..Default::default()
2675 },
2676 reqid: 3,
2677 vmoid: vmo_id.id,
2678 ..Default::default()
2679 })
2680 .await
2681 .unwrap();
2682
2683 let mut response = BlockFifoResponse::default();
2685 reader.read_entries(&mut response).await.unwrap();
2686 assert_eq!(response.status, zx::sys::ZX_OK);
2687 assert_eq!(response.reqid, 3);
2688
2689 tx.send(()).unwrap();
2691
2692 let mut response = BlockFifoResponse::default();
2695 reader.read_entries(&mut response).await.unwrap();
2696 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2697 assert_eq!(response.reqid, 1);
2698 assert_eq!(response.group, 1);
2699 }
2700 );
2701 }
2702
2703 #[fuchsia::test(allow_stalls = false)]
2704 async fn test_requests_dont_block_sessions() {
2705 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2706
2707 let (tx, rx) = oneshot::channel();
2708
2709 fasync::Task::local(async move {
2710 let rx = Mutex::new(Some(rx));
2711 let block_server = BlockServer::new(
2712 BLOCK_SIZE,
2713 Arc::new(MockInterface {
2714 read_hook: Some(Box::new(move |_, _, _, _| {
2715 let rx = rx.lock().take().unwrap();
2716 Box::pin(async {
2717 let _ = rx.await;
2718 Ok(())
2719 })
2720 })),
2721 ..MockInterface::default()
2722 }),
2723 );
2724 block_server.handle_requests(stream).await.unwrap();
2725 })
2726 .detach();
2727
2728 let mut fut = pin!(async {
2729 let (session_proxy, server) = fidl::endpoints::create_proxy();
2730
2731 proxy.open_session(server).unwrap();
2732
2733 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2734 let vmo_id = session_proxy
2735 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2736 .await
2737 .unwrap()
2738 .unwrap();
2739
2740 let mut fifo =
2741 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2742 let (mut reader, mut writer) = fifo.async_io();
2743
2744 writer
2745 .write_entries(&BlockFifoRequest {
2746 command: BlockFifoCommand {
2747 opcode: BlockOpcode::Read.into_primitive(),
2748 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2749 ..Default::default()
2750 },
2751 reqid: 1,
2752 group: 1,
2753 vmoid: vmo_id.id,
2754 length: 1,
2755 ..Default::default()
2756 })
2757 .await
2758 .unwrap();
2759
2760 let mut response = BlockFifoResponse::default();
2761 reader.read_entries(&mut response).await.unwrap();
2762 assert_eq!(response.status, zx::sys::ZX_OK);
2763 });
2764
2765 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2767
2768 let mut fut2 = pin!(proxy.get_volume_info());
2769
2770 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2772
2773 let _ = tx.send(());
2776
2777 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2778 }
2779
2780 #[fuchsia::test]
2781 async fn test_request_flow_control() {
2782 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2783
2784 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2787 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2788 let event_clone = event.clone();
2789 futures::join!(
2790 async move {
2791 let block_server = BlockServer::new(
2792 BLOCK_SIZE,
2793 Arc::new(MockInterface {
2794 read_hook: Some(Box::new(move |_, _, _, _| {
2795 let event_clone = event_clone.clone();
2796 Box::pin(async move {
2797 if !event_clone.1.load(Ordering::SeqCst) {
2798 event_clone.0.listen().await;
2799 }
2800 Ok(())
2801 })
2802 })),
2803 ..MockInterface::default()
2804 }),
2805 );
2806 block_server.handle_requests(stream).await.unwrap();
2807 },
2808 async move {
2809 let (session_proxy, server) = fidl::endpoints::create_proxy();
2810
2811 proxy.open_session(server).unwrap();
2812
2813 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2814 let vmo_id = session_proxy
2815 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2816 .await
2817 .unwrap()
2818 .unwrap();
2819
2820 let mut fifo =
2821 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2822 let (mut reader, mut writer) = fifo.async_io();
2823
2824 for i in 0..MAX_REQUESTS {
2825 writer
2826 .write_entries(&BlockFifoRequest {
2827 command: BlockFifoCommand {
2828 opcode: BlockOpcode::Read.into_primitive(),
2829 ..Default::default()
2830 },
2831 reqid: (i + 1) as u32,
2832 dev_offset: i,
2833 vmoid: vmo_id.id,
2834 length: 1,
2835 ..Default::default()
2836 })
2837 .await
2838 .unwrap();
2839 }
2840 assert!(
2841 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2842 command: BlockFifoCommand {
2843 opcode: BlockOpcode::Read.into_primitive(),
2844 ..Default::default()
2845 },
2846 reqid: u32::MAX,
2847 dev_offset: MAX_REQUESTS,
2848 vmoid: vmo_id.id,
2849 length: 1,
2850 ..Default::default()
2851 })))
2852 .is_pending()
2853 );
2854 event.1.store(true, Ordering::SeqCst);
2856 event.0.notify(usize::MAX);
2857 let mut finished_reqids = vec![];
2859 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2860 let mut response = BlockFifoResponse::default();
2861 reader.read_entries(&mut response).await.unwrap();
2862 assert_eq!(response.status, zx::sys::ZX_OK);
2863 finished_reqids.push(response.reqid);
2864 writer
2865 .write_entries(&BlockFifoRequest {
2866 command: BlockFifoCommand {
2867 opcode: BlockOpcode::Read.into_primitive(),
2868 ..Default::default()
2869 },
2870 reqid: (i + 1) as u32,
2871 dev_offset: i,
2872 vmoid: vmo_id.id,
2873 length: 1,
2874 ..Default::default()
2875 })
2876 .await
2877 .unwrap();
2878 }
2879 let mut response = BlockFifoResponse::default();
2880 for _ in 0..MAX_REQUESTS {
2881 reader.read_entries(&mut response).await.unwrap();
2882 assert_eq!(response.status, zx::sys::ZX_OK);
2883 finished_reqids.push(response.reqid);
2884 }
2885 finished_reqids.sort();
2888 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2889 let mut i = 1;
2890 for reqid in finished_reqids {
2891 assert_eq!(reqid, i);
2892 i += 1;
2893 }
2894 }
2895 );
2896 }
2897
2898 #[fuchsia::test]
2899 async fn test_passthrough_io_with_fixed_map() {
2900 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2901
2902 let expected_op = Arc::new(Mutex::new(None));
2903 let expected_op_clone = expected_op.clone();
2904 futures::join!(
2905 async {
2906 let block_server = BlockServer::new(
2907 BLOCK_SIZE,
2908 Arc::new(IoMockInterface {
2909 return_errors: false,
2910 do_checks: true,
2911 expected_op: expected_op_clone,
2912 }),
2913 );
2914 block_server.handle_requests(stream).await.unwrap();
2915 },
2916 async move {
2917 let (session_proxy, server) = fidl::endpoints::create_proxy();
2918
2919 let mapping = fblock::BlockOffsetMapping {
2920 source_block_offset: 0,
2921 target_block_offset: 10,
2922 length: 20,
2923 };
2924 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2925
2926 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2927 let vmo_id = session_proxy
2928 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2929 .await
2930 .unwrap()
2931 .unwrap();
2932
2933 let mut fifo =
2934 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2935 let (mut reader, mut writer) = fifo.async_io();
2936
2937 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2939 writer
2940 .write_entries(&BlockFifoRequest {
2941 command: BlockFifoCommand {
2942 opcode: BlockOpcode::Read.into_primitive(),
2943 ..Default::default()
2944 },
2945 vmoid: vmo_id.id,
2946 dev_offset: 1,
2947 length: 2,
2948 vmo_offset: 3,
2949 ..Default::default()
2950 })
2951 .await
2952 .unwrap();
2953
2954 let mut response = BlockFifoResponse::default();
2955 reader.read_entries(&mut response).await.unwrap();
2956 assert_eq!(response.status, zx::sys::ZX_OK);
2957
2958 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2960 writer
2961 .write_entries(&BlockFifoRequest {
2962 command: BlockFifoCommand {
2963 opcode: BlockOpcode::Write.into_primitive(),
2964 ..Default::default()
2965 },
2966 vmoid: vmo_id.id,
2967 dev_offset: 4,
2968 length: 5,
2969 vmo_offset: 6,
2970 ..Default::default()
2971 })
2972 .await
2973 .unwrap();
2974
2975 reader.read_entries(&mut response).await.unwrap();
2976 assert_eq!(response.status, zx::sys::ZX_OK);
2977
2978 *expected_op.lock() = Some(ExpectedOp::Flush);
2980 writer
2981 .write_entries(&BlockFifoRequest {
2982 command: BlockFifoCommand {
2983 opcode: BlockOpcode::Flush.into_primitive(),
2984 ..Default::default()
2985 },
2986 ..Default::default()
2987 })
2988 .await
2989 .unwrap();
2990
2991 reader.read_entries(&mut response).await.unwrap();
2992 assert_eq!(response.status, zx::sys::ZX_OK);
2993
2994 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2996 writer
2997 .write_entries(&BlockFifoRequest {
2998 command: BlockFifoCommand {
2999 opcode: BlockOpcode::Trim.into_primitive(),
3000 ..Default::default()
3001 },
3002 dev_offset: 7,
3003 length: 3,
3004 ..Default::default()
3005 })
3006 .await
3007 .unwrap();
3008
3009 reader.read_entries(&mut response).await.unwrap();
3010 assert_eq!(response.status, zx::sys::ZX_OK);
3011
3012 *expected_op.lock() = None;
3014 writer
3015 .write_entries(&BlockFifoRequest {
3016 command: BlockFifoCommand {
3017 opcode: BlockOpcode::Read.into_primitive(),
3018 ..Default::default()
3019 },
3020 vmoid: vmo_id.id,
3021 dev_offset: 19,
3022 length: 2,
3023 vmo_offset: 3,
3024 ..Default::default()
3025 })
3026 .await
3027 .unwrap();
3028
3029 reader.read_entries(&mut response).await.unwrap();
3030 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3031
3032 std::mem::drop(proxy);
3033 }
3034 );
3035 }
3036
3037 #[fuchsia::test]
3038 fn operation_map() {
3039 fn expect_map_result(
3040 mut operation: Operation,
3041 mapping: Option<BlockOffsetMapping>,
3042 max_blocks: Option<NonZero<u32>>,
3043 expected_operations: Vec<Operation>,
3044 ) {
3045 let mut ops = vec![];
3046 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
3047 ops.push(operation);
3048 operation = remainder;
3049 }
3050 ops.push(operation);
3051 assert_eq!(ops, expected_operations);
3052 }
3053
3054 expect_map_result(
3056 Operation::Read {
3057 device_block_offset: 10,
3058 block_count: 200,
3059 _unused: 0,
3060 vmo_offset: 0,
3061 options: ReadOptions::default(),
3062 },
3063 None,
3064 None,
3065 vec![Operation::Read {
3066 device_block_offset: 10,
3067 block_count: 200,
3068 _unused: 0,
3069 vmo_offset: 0,
3070 options: ReadOptions::default(),
3071 }],
3072 );
3073
3074 expect_map_result(
3076 Operation::Read {
3077 device_block_offset: 10,
3078 block_count: 200,
3079 _unused: 0,
3080 vmo_offset: 0,
3081 options: ReadOptions::default(),
3082 },
3083 None,
3084 NonZero::new(120),
3085 vec![
3086 Operation::Read {
3087 device_block_offset: 10,
3088 block_count: 120,
3089 _unused: 0,
3090 vmo_offset: 0,
3091 options: ReadOptions::default(),
3092 },
3093 Operation::Read {
3094 device_block_offset: 130,
3095 block_count: 80,
3096 _unused: 0,
3097 vmo_offset: 120 * 512,
3098 options: ReadOptions::default(),
3099 },
3100 ],
3101 );
3102 expect_map_result(
3103 Operation::Write {
3104 device_block_offset: 10,
3105 block_count: 200,
3106 _unused: 0,
3107 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3108 vmo_offset: 0,
3109 },
3110 None,
3111 NonZero::new(120),
3112 vec![
3113 Operation::Write {
3114 device_block_offset: 10,
3115 block_count: 120,
3116 _unused: 0,
3117 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3118 vmo_offset: 0,
3119 },
3120 Operation::Write {
3121 device_block_offset: 130,
3122 block_count: 80,
3123 _unused: 0,
3124 options: WriteOptions::default(),
3125 vmo_offset: 120 * 512,
3126 },
3127 ],
3128 );
3129 expect_map_result(
3130 Operation::Trim { device_block_offset: 10, block_count: 200 },
3131 None,
3132 NonZero::new(120),
3133 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3134 );
3135
3136 expect_map_result(
3138 Operation::Read {
3139 device_block_offset: 10,
3140 block_count: 200,
3141 _unused: 0,
3142 vmo_offset: 0,
3143 options: ReadOptions::default(),
3144 },
3145 Some(BlockOffsetMapping {
3146 source_block_offset: 10,
3147 target_block_offset: 100,
3148 length: 200,
3149 }),
3150 NonZero::new(120),
3151 vec![
3152 Operation::Read {
3153 device_block_offset: 100,
3154 block_count: 120,
3155 _unused: 0,
3156 vmo_offset: 0,
3157 options: ReadOptions::default(),
3158 },
3159 Operation::Read {
3160 device_block_offset: 220,
3161 block_count: 80,
3162 _unused: 0,
3163 vmo_offset: 120 * 512,
3164 options: ReadOptions::default(),
3165 },
3166 ],
3167 );
3168 expect_map_result(
3169 Operation::Write {
3170 device_block_offset: 10,
3171 block_count: 200,
3172 _unused: 0,
3173 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3174 vmo_offset: 0,
3175 },
3176 Some(BlockOffsetMapping {
3177 source_block_offset: 10,
3178 target_block_offset: 100,
3179 length: 200,
3180 }),
3181 NonZero::new(120),
3182 vec![
3183 Operation::Write {
3184 device_block_offset: 100,
3185 block_count: 120,
3186 _unused: 0,
3187 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
3188 vmo_offset: 0,
3189 },
3190 Operation::Write {
3191 device_block_offset: 220,
3192 block_count: 80,
3193 _unused: 0,
3194 options: WriteOptions::default(),
3195 vmo_offset: 120 * 512,
3196 },
3197 ],
3198 );
3199 expect_map_result(
3200 Operation::Trim { device_block_offset: 10, block_count: 200 },
3201 Some(BlockOffsetMapping {
3202 source_block_offset: 10,
3203 target_block_offset: 100,
3204 length: 200,
3205 }),
3206 NonZero::new(120),
3207 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3208 );
3209 }
3210}