1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37 Block(BlockInfo),
38 Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42 pub fn device_flags(&self) -> fblock::Flag {
43 match self {
44 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
45 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
46 }
47 }
48
49 pub fn block_count(&self) -> Option<u64> {
50 match self {
51 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
52 Self::Partition(PartitionInfo { block_range, .. }) => {
53 block_range.as_ref().map(|range| range.end - range.start)
54 }
55 }
56 }
57
58 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
59 match self {
60 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
61 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
62 max_transfer_blocks.clone()
63 }
64 }
65 }
66
67 fn max_transfer_size(&self, block_size: u32) -> u32 {
68 if let Some(max_blocks) = self.max_transfer_blocks() {
69 max_blocks.get() * block_size
70 } else {
71 MAX_TRANSFER_UNBOUNDED
72 }
73 }
74}
75
76#[derive(Clone, Default)]
78pub struct BlockInfo {
79 pub device_flags: fblock::Flag,
80 pub block_count: u64,
81 pub max_transfer_blocks: Option<NonZero<u32>>,
82}
83
84#[derive(Clone, Default)]
86pub struct PartitionInfo {
87 pub device_flags: fblock::Flag,
89 pub max_transfer_blocks: Option<NonZero<u32>>,
90 pub block_range: Option<Range<u64>>,
94 pub type_guid: [u8; 16],
95 pub instance_guid: [u8; 16],
96 pub name: String,
97 pub flags: u64,
98}
99
100struct ActiveRequest<S> {
103 session: S,
104 group_or_request: GroupOrRequest,
105 trace_flow_id: TraceFlowId,
106 _epoch_guard: EpochGuard<'static>,
107 status: zx::Status,
108 count: u32,
109 req_id: Option<u32>,
110 decompression_info: Option<DecompressionInfo>,
111}
112
113struct DecompressionInfo {
114 compressed_range: Range<usize>,
116
117 uncompressed_range: Range<u64>,
119
120 bytes_so_far: u64,
121 mapping: Arc<VmoMapping>,
122 buffer: Option<Buffer<'static>>,
123}
124
125impl DecompressionInfo {
126 fn uncompressed_slice(&self) -> *mut [u8] {
128 std::ptr::slice_from_raw_parts_mut(
129 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
130 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
131 )
132 }
133}
134
135pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
136
137impl<S> Default for ActiveRequests<S> {
138 fn default() -> Self {
139 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
140 }
141}
142
143impl<S> ActiveRequests<S> {
144 fn complete_and_take_response(
145 &self,
146 request_id: RequestId,
147 status: zx::Status,
148 ) -> Option<(S, BlockFifoResponse)> {
149 self.0.lock().complete_and_take_response(request_id, status)
150 }
151
152 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
153 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
154 }
155}
156
157struct ActiveRequestsInner<S> {
158 requests: Slab<ActiveRequest<S>>,
159}
160
161impl<S> ActiveRequestsInner<S> {
163 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
165 let group = &mut self.requests[request_id.0];
166
167 group.count = group.count.checked_sub(1).unwrap();
168 if status != zx::Status::OK && group.status == zx::Status::OK {
169 group.status = status
170 }
171
172 fuchsia_trace::duration!(
173 "storage",
174 "block_server::finish_transaction",
175 "request_id" => request_id.0,
176 "group_completed" => group.count == 0,
177 "status" => status.into_raw());
178 if let Some(trace_flow_id) = group.trace_flow_id {
179 fuchsia_trace::flow_step!(
180 "storage",
181 "block_server::finish_request",
182 trace_flow_id.get().into()
183 );
184 }
185
186 if group.count == 0
187 && group.status == zx::Status::OK
188 && let Some(info) = &mut group.decompression_info
189 {
190 thread_local! {
191 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
192 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
193 }
194 DECOMPRESSOR.with(|decompressor| {
195 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
197 let mut decompressor = decompressor.borrow_mut();
198 if let Err(error) = decompressor.decompress_to_buffer(
199 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
200 target_slice,
201 ) {
202 log::warn!(error:?; "Decompression error");
203 group.status = zx::Status::IO_DATA_INTEGRITY;
204 };
205 });
206 }
207 }
208
209 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
211 let group = &self.requests[request_id.0];
212 match group.req_id {
213 Some(reqid) if group.count == 0 => {
214 let group = self.requests.remove(request_id.0);
215 Some((
216 group.session,
217 BlockFifoResponse {
218 status: group.status.into_raw(),
219 reqid,
220 group: group.group_or_request.group_id().unwrap_or(0),
221 ..Default::default()
222 },
223 ))
224 }
225 _ => None,
226 }
227 }
228
229 fn complete_and_take_response(
231 &mut self,
232 request_id: RequestId,
233 status: zx::Status,
234 ) -> Option<(S, BlockFifoResponse)> {
235 self.complete(request_id, status);
236 self.take_response(request_id)
237 }
238}
239
240pub struct BlockServer<SM> {
243 block_size: u32,
244 session_manager: Arc<SM>,
245}
246
247#[derive(Debug)]
249pub struct BlockOffsetMapping {
250 source_block_offset: u64,
251 target_block_offset: u64,
252 length: u64,
253}
254
255impl BlockOffsetMapping {
256 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
257 blocks.0 >= self.source_block_offset
258 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
259 }
260}
261
262impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
263 type Error = zx::Status;
264
265 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
266 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
267 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
268 Ok(Self {
269 source_block_offset: wire.source_block_offset,
270 target_block_offset: wire.target_block_offset,
271 length: wire.length,
272 })
273 }
274}
275
276pub struct OffsetMap {
278 mapping: Option<BlockOffsetMapping>,
279 max_transfer_blocks: Option<NonZero<u32>>,
280}
281
282impl OffsetMap {
283 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
285 Self { mapping: Some(mapping), max_transfer_blocks }
286 }
287
288 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
290 Self { mapping: None, max_transfer_blocks }
291 }
292
293 pub fn is_empty(&self) -> bool {
294 self.mapping.is_none()
295 }
296
297 fn mapping(&self) -> Option<&BlockOffsetMapping> {
298 self.mapping.as_ref()
299 }
300
301 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
302 self.max_transfer_blocks
303 }
304}
305
306pub trait SessionManager: 'static {
309 const SUPPORTS_DECOMPRESSION: bool;
310
311 type Session;
312
313 fn on_attach_vmo(
314 self: Arc<Self>,
315 vmo: &Arc<zx::Vmo>,
316 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
317
318 fn open_session(
323 self: Arc<Self>,
324 stream: fblock::SessionRequestStream,
325 offset_map: OffsetMap,
326 block_size: u32,
327 ) -> impl Future<Output = Result<(), Error>> + Send;
328
329 fn get_info(&self) -> Cow<'_, DeviceInfo>;
331
332 fn get_volume_info(
334 &self,
335 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
336 {
337 async { Err(zx::Status::NOT_SUPPORTED) }
338 }
339
340 fn query_slices(
342 &self,
343 _start_slices: &[u64],
344 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
345 async { Err(zx::Status::NOT_SUPPORTED) }
346 }
347
348 fn extend(
350 &self,
351 _start_slice: u64,
352 _slice_count: u64,
353 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
354 async { Err(zx::Status::NOT_SUPPORTED) }
355 }
356
357 fn shrink(
359 &self,
360 _start_slice: u64,
361 _slice_count: u64,
362 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363 async { Err(zx::Status::NOT_SUPPORTED) }
364 }
365
366 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
368}
369
370pub trait IntoSessionManager {
371 type SM: SessionManager;
372
373 fn into_session_manager(self) -> Arc<Self::SM>;
374}
375
376impl<SM: SessionManager> BlockServer<SM> {
377 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
378 Self { block_size, session_manager: session_manager.into_session_manager() }
379 }
380
381 pub fn session_manager(&self) -> &SM {
382 self.session_manager.as_ref()
383 }
384
385 pub async fn handle_requests(
387 &self,
388 mut requests: fvolume::VolumeRequestStream,
389 ) -> Result<(), Error> {
390 let scope = fasync::Scope::new();
391 loop {
392 match requests.try_next().await {
393 Ok(Some(request)) => {
394 if let Some(session) = self.handle_request(request).await? {
395 scope.spawn(session.map(|_| ()));
396 }
397 }
398 Ok(None) => break,
399 Err(err) => log::warn!(err:?; "Invalid request"),
400 }
401 }
402 scope.await;
403 Ok(())
404 }
405
406 async fn handle_request(
409 &self,
410 request: fvolume::VolumeRequest,
411 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
412 match request {
413 fvolume::VolumeRequest::GetInfo { responder } => {
414 let info = self.device_info();
415 let max_transfer_size = info.max_transfer_size(self.block_size);
416 let (block_count, mut flags) = match info.as_ref() {
417 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
418 (*block_count, *device_flags)
419 }
420 DeviceInfo::Partition(partition_info) => {
421 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
422 range.end - range.start
423 } else {
424 let volume_info = self.session_manager.get_volume_info().await?;
425 volume_info.0.slice_size * volume_info.1.partition_slice_count
426 / self.block_size as u64
427 };
428 (block_count, partition_info.device_flags)
429 }
430 };
431 if SM::SUPPORTS_DECOMPRESSION {
432 flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
433 }
434 responder.send(Ok(&fblock::BlockInfo {
435 block_count,
436 block_size: self.block_size,
437 max_transfer_size,
438 flags,
439 }))?;
440 }
441 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
442 let info = self.device_info();
443 return Ok(Some(self.session_manager.clone().open_session(
444 session.into_stream(),
445 OffsetMap::empty(info.max_transfer_blocks()),
446 self.block_size,
447 )));
448 }
449 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
450 session,
451 mapping,
452 control_handle: _,
453 } => {
454 let info = self.device_info();
455 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
456 Ok(m) => m,
457 Err(status) => {
458 session.close_with_epitaph(status)?;
459 return Ok(None);
460 }
461 };
462 if let Some(max) = info.block_count() {
463 if initial_mapping.target_block_offset + initial_mapping.length > max {
464 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
465 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
466 return Ok(None);
467 }
468 }
469 return Ok(Some(self.session_manager.clone().open_session(
470 session.into_stream(),
471 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
472 self.block_size,
473 )));
474 }
475 fvolume::VolumeRequest::GetTypeGuid { responder } => {
476 let info = self.device_info();
477 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
478 let mut guid =
479 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
480 guid.value.copy_from_slice(&partition_info.type_guid);
481 responder.send(zx::sys::ZX_OK, Some(&guid))?;
482 } else {
483 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
484 }
485 }
486 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
487 let info = self.device_info();
488 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
489 let mut guid =
490 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
491 guid.value.copy_from_slice(&partition_info.instance_guid);
492 responder.send(zx::sys::ZX_OK, Some(&guid))?;
493 } else {
494 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
495 }
496 }
497 fvolume::VolumeRequest::GetName { responder } => {
498 let info = self.device_info();
499 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
500 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
501 } else {
502 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
503 }
504 }
505 fvolume::VolumeRequest::GetMetadata { responder } => {
506 let info = self.device_info();
507 if let DeviceInfo::Partition(info) = info.as_ref() {
508 let mut type_guid =
509 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
510 type_guid.value.copy_from_slice(&info.type_guid);
511 let mut instance_guid =
512 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
513 instance_guid.value.copy_from_slice(&info.instance_guid);
514 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
515 name: Some(info.name.clone()),
516 type_guid: Some(type_guid),
517 instance_guid: Some(instance_guid),
518 start_block_offset: info.block_range.as_ref().map(|range| range.start),
519 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
520 flags: Some(info.flags),
521 ..Default::default()
522 }))?;
523 } else {
524 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
525 }
526 }
527 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
528 match self.session_manager.query_slices(&start_slices).await {
529 Ok(mut results) => {
530 let results_len = results.len();
531 assert!(results_len <= 16);
532 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
533 responder.send(
534 zx::sys::ZX_OK,
535 &results.try_into().unwrap(),
536 results_len as u64,
537 )?;
538 }
539 Err(s) => {
540 responder.send(
541 s.into_raw(),
542 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
543 0,
544 )?;
545 }
546 }
547 }
548 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
549 match self.session_manager.get_volume_info().await {
550 Ok((manager_info, volume_info)) => {
551 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
552 }
553 Err(s) => responder.send(s.into_raw(), None, None)?,
554 }
555 }
556 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
557 responder.send(
558 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
559 .into_raw(),
560 )?;
561 }
562 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
563 responder.send(
564 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
565 .into_raw(),
566 )?;
567 }
568 fvolume::VolumeRequest::Destroy { responder, .. } => {
569 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
570 }
571 }
572 Ok(None)
573 }
574
575 fn device_info(&self) -> Cow<'_, DeviceInfo> {
576 self.session_manager.get_info()
577 }
578}
579
580struct SessionHelper<SM: SessionManager> {
581 session_manager: Arc<SM>,
582 offset_map: OffsetMap,
583 block_size: u32,
584 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
585 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
586}
587
588struct VmoMapping {
589 base: usize,
590 size: usize,
591}
592
593impl VmoMapping {
594 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
595 let size = vmo.get_size().unwrap() as usize;
596 Ok(Arc::new(Self {
597 base: fuchsia_runtime::vmar_root_self()
598 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
599 .inspect_err(|error| {
600 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
601 })?,
602 size,
603 }))
604 }
605}
606
607impl Drop for VmoMapping {
608 fn drop(&mut self) {
609 unsafe {
611 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
612 }
613 }
614}
615
616impl<SM: SessionManager> SessionHelper<SM> {
617 fn new(
618 session_manager: Arc<SM>,
619 offset_map: OffsetMap,
620 block_size: u32,
621 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
622 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
623 Ok((
624 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
625 fifo,
626 ))
627 }
628
629 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
630 match request {
631 fblock::SessionRequest::GetFifo { responder } => {
632 let rights = zx::Rights::TRANSFER
633 | zx::Rights::READ
634 | zx::Rights::WRITE
635 | zx::Rights::SIGNAL
636 | zx::Rights::WAIT;
637 match self.peer_fifo.duplicate_handle(rights) {
638 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
639 Err(s) => responder.send(Err(s.into_raw()))?,
640 }
641 Ok(())
642 }
643 fblock::SessionRequest::AttachVmo { vmo, responder } => {
644 let vmo = Arc::new(vmo);
645 let vmo_id = {
646 let mut vmos = self.vmos.lock();
647 if vmos.len() == u16::MAX as usize {
648 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
649 return Ok(());
650 } else {
651 let vmo_id = match vmos.last_entry() {
652 None => 1,
653 Some(o) => {
654 o.key().checked_add(1).unwrap_or_else(|| {
655 let mut vmo_id = 1;
656 for (&id, _) in &*vmos {
658 if id > vmo_id {
659 break;
660 }
661 vmo_id = id + 1;
662 }
663 vmo_id
664 })
665 }
666 };
667 vmos.insert(vmo_id, (vmo.clone(), None));
668 vmo_id
669 }
670 };
671 self.session_manager.clone().on_attach_vmo(&vmo).await?;
672 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
673 Ok(())
674 }
675 fblock::SessionRequest::Close { responder } => {
676 responder.send(Ok(()))?;
677 Err(anyhow!("Closed"))
678 }
679 }
680 }
681
682 fn decode_fifo_request(
684 &self,
685 session: SM::Session,
686 request: &BlockFifoRequest,
687 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
688 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
689
690 let request_bytes = request.length as u64 * self.block_size as u64;
691
692 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
693 .ok_or(zx::Status::INVALID_ARGS)
694 .and_then(|code| {
695 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
696 if code != BlockOpcode::Read {
697 return Err(zx::Status::INVALID_ARGS);
698 }
699 if !SM::SUPPORTS_DECOMPRESSION {
700 return Err(zx::Status::NOT_SUPPORTED);
701 }
702 }
703 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
704 if request.length == 0 {
705 return Err(zx::Status::INVALID_ARGS);
706 }
707 if request.dev_offset.checked_add(request.length as u64).is_none()
709 || (code != BlockOpcode::Trim
710 && request_bytes.checked_add(request.vmo_offset).is_none())
711 {
712 return Err(zx::Status::OUT_OF_RANGE);
713 }
714 }
715 Ok(match code {
716 BlockOpcode::Read => Operation::Read {
717 device_block_offset: request.dev_offset,
718 block_count: request.length,
719 _unused: 0,
720 vmo_offset: request
721 .vmo_offset
722 .checked_mul(self.block_size as u64)
723 .ok_or(zx::Status::OUT_OF_RANGE)?,
724 options: ReadOptions {
725 inline_crypto_options: InlineCryptoOptions {
726 dun: request.dun,
727 slot: request.slot,
728 },
729 },
730 },
731 BlockOpcode::Write => {
732 let mut options = WriteOptions {
733 inline_crypto_options: InlineCryptoOptions {
734 dun: request.dun,
735 slot: request.slot,
736 },
737 ..WriteOptions::default()
738 };
739 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
740 options.flags |= WriteFlags::FORCE_ACCESS;
741 }
742 if flags.contains(BlockIoFlag::PRE_BARRIER) {
743 options.flags |= WriteFlags::PRE_BARRIER;
744 }
745 Operation::Write {
746 device_block_offset: request.dev_offset,
747 block_count: request.length,
748 _unused: 0,
749 options,
750 vmo_offset: request
751 .vmo_offset
752 .checked_mul(self.block_size as u64)
753 .ok_or(zx::Status::OUT_OF_RANGE)?,
754 }
755 }
756 BlockOpcode::Flush => Operation::Flush,
757 BlockOpcode::Trim => Operation::Trim {
758 device_block_offset: request.dev_offset,
759 block_count: request.length,
760 },
761 BlockOpcode::CloseVmo => Operation::CloseVmo,
762 })
763 });
764
765 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
766 GroupOrRequest::Group(request.group)
767 } else {
768 GroupOrRequest::Request(request.reqid)
769 };
770
771 let mut active_requests = self.session_manager.active_requests().0.lock();
772 let mut request_id = None;
773
774 if group_or_request.is_group() {
785 for (key, group) in &mut active_requests.requests {
789 if group.group_or_request == group_or_request {
790 if group.req_id.is_some() {
791 if group.status == zx::Status::OK {
793 group.status = zx::Status::INVALID_ARGS;
794 }
795 return Err(None);
797 }
798 if group.status == zx::Status::OK
800 && let Some(info) = &mut group.decompression_info
801 {
802 if let Ok(Operation::Read {
803 device_block_offset,
804 mut block_count,
805 options,
806 vmo_offset: 0,
807 ..
808 }) = operation
809 {
810 let remaining_bytes = info
811 .compressed_range
812 .end
813 .next_multiple_of(self.block_size as usize)
814 as u64
815 - info.bytes_so_far;
816 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
817 || request.total_compressed_bytes != 0
818 || request.uncompressed_bytes != 0
819 || request.compressed_prefix_bytes != 0
820 || (flags.contains(BlockIoFlag::GROUP_LAST)
821 && info.bytes_so_far + request_bytes
822 < info.compressed_range.end as u64)
823 || (!flags.contains(BlockIoFlag::GROUP_LAST)
824 && request_bytes >= remaining_bytes)
825 {
826 group.status = zx::Status::INVALID_ARGS;
827 } else {
828 if request_bytes > remaining_bytes {
837 block_count = (remaining_bytes / self.block_size as u64) as u32;
838 }
839
840 operation = Ok(Operation::ContinueDecompressedRead {
841 offset: info.bytes_so_far,
842 device_block_offset,
843 block_count,
844 options,
845 });
846
847 info.bytes_so_far += block_count as u64 * self.block_size as u64;
848 }
849 } else {
850 group.status = zx::Status::INVALID_ARGS;
851 }
852 }
853 if flags.contains(BlockIoFlag::GROUP_LAST) {
854 group.req_id = Some(request.reqid);
855 if group.status != zx::Status::OK {
858 operation = Err(group.status);
859 }
860 } else if group.status != zx::Status::OK {
861 return Err(None);
864 }
865 request_id = Some(RequestId(key));
866 group.count += 1;
867 break;
868 }
869 }
870 }
871
872 let is_single_request =
873 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
874
875 let mut decompression_info = None;
876 let vmo = match operation {
877 Ok(Operation::Read {
878 device_block_offset,
879 mut block_count,
880 options,
881 vmo_offset,
882 ..
883 }) => match self.vmos.lock().get_mut(&request.vmoid) {
884 Some((vmo, mapping)) => {
885 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
886 let compressed_range = request.compressed_prefix_bytes as usize
887 ..request.compressed_prefix_bytes as usize
888 + request.total_compressed_bytes as usize;
889 let required_buffer_size =
890 compressed_range.end.next_multiple_of(self.block_size as usize);
891
892 if compressed_range.start >= compressed_range.end
894 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
895 || (is_single_request && request_bytes < compressed_range.end as u64)
896 || (!is_single_request && request_bytes >= required_buffer_size as u64)
897 {
898 Err(zx::Status::INVALID_ARGS)
899 } else {
900 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
907 block_count =
908 (required_buffer_size / self.block_size as usize) as u32;
909 required_buffer_size as u64
910 } else {
911 request_bytes
912 };
913
914 match mapping {
916 Some(mapping) => Ok(mapping.clone()),
917 None => {
918 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
919 }
920 }
921 .and_then(|mapping| {
922 if vmo_offset
925 .checked_add(request.uncompressed_bytes as u64)
926 .is_some_and(|end| end <= mapping.size as u64)
927 {
928 Ok(mapping)
929 } else {
930 Err(zx::Status::OUT_OF_RANGE)
931 }
932 })
933 .map(|mapping| {
934 operation = Ok(Operation::StartDecompressedRead {
939 required_buffer_size,
940 device_block_offset,
941 block_count,
942 options,
943 });
944 decompression_info = Some(DecompressionInfo {
947 compressed_range,
948 bytes_so_far,
949 mapping,
950 uncompressed_range: vmo_offset
951 ..vmo_offset + request.uncompressed_bytes as u64,
952 buffer: None,
953 });
954 None
955 })
956 }
957 } else {
958 Ok(Some(vmo.clone()))
959 }
960 }
961 None => Err(zx::Status::IO),
962 },
963 Ok(Operation::Write { .. }) => self
964 .vmos
965 .lock()
966 .get(&request.vmoid)
967 .cloned()
968 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
969 Ok(Operation::CloseVmo) => {
970 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
971 let vmo_clone = vmo.clone();
972 Epoch::global().defer(move || drop(vmo_clone));
975 Ok(Some(vmo))
976 })
977 }
978 _ => Ok(None),
979 }
980 .unwrap_or_else(|e| {
981 operation = Err(e);
982 None
983 });
984
985 let trace_flow_id = NonZero::new(request.trace_flow_id);
986 let request_id = request_id.unwrap_or_else(|| {
987 RequestId(active_requests.requests.insert(ActiveRequest {
988 session,
989 group_or_request,
990 trace_flow_id,
991 _epoch_guard: Epoch::global().guard(),
992 status: zx::Status::OK,
993 count: 1,
994 req_id: is_single_request.then_some(request.reqid),
995 decompression_info,
996 }))
997 });
998
999 Ok(DecodedRequest {
1000 request_id,
1001 trace_flow_id,
1002 operation: operation.map_err(|status| {
1003 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1004 })?,
1005 vmo,
1006 })
1007 }
1008
1009 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1010 std::mem::take(&mut *self.vmos.lock())
1011 }
1012
1013 fn map_request(
1015 &self,
1016 mut request: DecodedRequest,
1017 active_request: &mut ActiveRequest<SM::Session>,
1018 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1019 if active_request.status != zx::Status::OK {
1020 return Err(zx::Status::BAD_STATE);
1021 }
1022 let mapping = self.offset_map.mapping();
1023 match (mapping, request.operation.blocks()) {
1024 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1025 return Err(zx::Status::OUT_OF_RANGE);
1026 }
1027 _ => {}
1028 }
1029 let remainder = request.operation.map(
1030 self.offset_map.mapping(),
1031 self.offset_map.max_transfer_blocks(),
1032 self.block_size,
1033 );
1034 if remainder.is_some() {
1035 active_request.count += 1;
1036 }
1037 static CACHE: AtomicU64 = AtomicU64::new(0);
1038 if let Some(context) =
1039 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1040 {
1041 use fuchsia_trace::ArgValue;
1042 let trace_args = [
1043 ArgValue::of("request_id", request.request_id.0),
1044 ArgValue::of("opcode", request.operation.trace_label()),
1045 ];
1046 let _scope =
1047 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1048 if let Some(trace_flow_id) = active_request.trace_flow_id {
1049 fuchsia_trace::flow_step(
1050 &context,
1051 "block_server::start_transaction",
1052 trace_flow_id.get().into(),
1053 &[],
1054 );
1055 }
1056 }
1057 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1058 Ok((request, remainder))
1059 }
1060
1061 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1062 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1063 }
1064}
1065
1066#[repr(transparent)]
1067#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1068pub struct RequestId(usize);
1069
1070#[derive(Clone, Debug)]
1071struct DecodedRequest {
1072 request_id: RequestId,
1073 trace_flow_id: TraceFlowId,
1074 operation: Operation,
1075 vmo: Option<Arc<zx::Vmo>>,
1076}
1077
1078pub type WriteFlags = block_protocol::WriteFlags;
1080pub type WriteOptions = block_protocol::WriteOptions;
1081pub type ReadOptions = block_protocol::ReadOptions;
1082
1083#[repr(C)]
1084#[derive(Clone, Debug, PartialEq, Eq)]
1085pub enum Operation {
1086 Read {
1091 device_block_offset: u64,
1092 block_count: u32,
1093 _unused: u32,
1094 vmo_offset: u64,
1095 options: ReadOptions,
1096 },
1097 Write {
1098 device_block_offset: u64,
1099 block_count: u32,
1100 _unused: u32,
1101 vmo_offset: u64,
1102 options: WriteOptions,
1103 },
1104 Flush,
1105 Trim {
1106 device_block_offset: u64,
1107 block_count: u32,
1108 },
1109 CloseVmo,
1111 StartDecompressedRead {
1112 required_buffer_size: usize,
1113 device_block_offset: u64,
1114 block_count: u32,
1115 options: ReadOptions,
1116 },
1117 ContinueDecompressedRead {
1118 offset: u64,
1119 device_block_offset: u64,
1120 block_count: u32,
1121 options: ReadOptions,
1122 },
1123}
1124
1125impl Operation {
1126 fn trace_label(&self) -> &'static str {
1127 match self {
1128 Operation::Read { .. } => "read",
1129 Operation::Write { .. } => "write",
1130 Operation::Flush { .. } => "flush",
1131 Operation::Trim { .. } => "trim",
1132 Operation::CloseVmo { .. } => "close_vmo",
1133 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1134 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1135 }
1136 }
1137
1138 fn blocks(&self) -> Option<(u64, u32)> {
1140 match self {
1141 Operation::Read { device_block_offset, block_count, .. }
1142 | Operation::Write { device_block_offset, block_count, .. }
1143 | Operation::Trim { device_block_offset, block_count, .. } => {
1144 Some((*device_block_offset, *block_count))
1145 }
1146 _ => None,
1147 }
1148 }
1149
1150 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1152 match self {
1153 Operation::Read { device_block_offset, block_count, .. }
1154 | Operation::Write { device_block_offset, block_count, .. }
1155 | Operation::Trim { device_block_offset, block_count, .. } => {
1156 Some((device_block_offset, block_count))
1157 }
1158 _ => None,
1159 }
1160 }
1161
1162 fn map(
1165 &mut self,
1166 mapping: Option<&BlockOffsetMapping>,
1167 max_blocks: Option<NonZero<u32>>,
1168 block_size: u32,
1169 ) -> Option<Self> {
1170 let mut max = match self {
1171 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1172 _ => None,
1173 };
1174 let (offset, length) = self.blocks_mut()?;
1175 let orig_offset = *offset;
1176 if let Some(mapping) = mapping {
1177 let delta = *offset - mapping.source_block_offset;
1178 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1179 *offset = mapping.target_block_offset + delta;
1180 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1181 max = match max {
1182 None => Some(mapping_max),
1183 Some(m) => Some(std::cmp::min(m, mapping_max)),
1184 };
1185 };
1186 if let Some(max) = max {
1187 if *length as u64 > max {
1188 let rem = (*length as u64 - max) as u32;
1189 *length = max as u32;
1190 return Some(match self {
1191 Operation::Read {
1192 device_block_offset: _,
1193 block_count: _,
1194 vmo_offset,
1195 _unused,
1196 options,
1197 } => Operation::Read {
1198 device_block_offset: orig_offset + max,
1199 block_count: rem,
1200 vmo_offset: *vmo_offset + max * block_size as u64,
1201 _unused: *_unused,
1202 options: *options,
1203 },
1204 Operation::Write {
1205 device_block_offset: _,
1206 block_count: _,
1207 _unused,
1208 vmo_offset,
1209 options,
1210 } => Operation::Write {
1211 device_block_offset: orig_offset + max,
1212 block_count: rem,
1213 _unused: *_unused,
1214 vmo_offset: *vmo_offset + max * block_size as u64,
1215 options: *options,
1216 },
1217 Operation::Trim { device_block_offset: _, block_count: _ } => {
1218 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1219 }
1220 _ => unreachable!(),
1221 });
1222 }
1223 }
1224 None
1225 }
1226
1227 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1229 if let Operation::Write { options, .. } = self {
1230 options.flags.contains(value)
1231 } else {
1232 false
1233 }
1234 }
1235
1236 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1238 if let Operation::Write { options, .. } = self {
1239 let result = options.flags.contains(value);
1240 options.flags.remove(value);
1241 result
1242 } else {
1243 false
1244 }
1245 }
1246}
1247
1248#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1249pub enum GroupOrRequest {
1250 Group(u16),
1251 Request(u32),
1252}
1253
1254impl GroupOrRequest {
1255 fn is_group(&self) -> bool {
1256 matches!(self, Self::Group(_))
1257 }
1258
1259 fn group_id(&self) -> Option<u16> {
1260 match self {
1261 Self::Group(id) => Some(*id),
1262 Self::Request(_) => None,
1263 }
1264 }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269 use super::{
1270 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1271 TraceFlowId,
1272 };
1273 use assert_matches::assert_matches;
1274 use block_protocol::{
1275 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1276 WriteOptions,
1277 };
1278 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1279 use fuchsia_sync::Mutex;
1280 use futures::FutureExt as _;
1281 use futures::channel::oneshot;
1282 use futures::future::BoxFuture;
1283 use std::borrow::Cow;
1284 use std::future::poll_fn;
1285 use std::num::NonZero;
1286 use std::pin::pin;
1287 use std::sync::Arc;
1288 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1289 use std::task::{Context, Poll};
1290 use zx::{AsHandleRef as _, HandleBased as _};
1291 use {
1292 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1293 fuchsia_async as fasync,
1294 };
1295
1296 #[derive(Default)]
1297 struct MockInterface {
1298 read_hook: Option<
1299 Box<
1300 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1301 + Send
1302 + Sync,
1303 >,
1304 >,
1305 write_hook:
1306 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1307 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1308 }
1309
1310 impl super::async_interface::Interface for MockInterface {
1311 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1312 Ok(())
1313 }
1314
1315 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1316 Cow::Owned(test_device_info())
1317 }
1318
1319 async fn read(
1320 &self,
1321 device_block_offset: u64,
1322 block_count: u32,
1323 vmo: &Arc<zx::Vmo>,
1324 vmo_offset: u64,
1325 _opts: ReadOptions,
1326 _trace_flow_id: TraceFlowId,
1327 ) -> Result<(), zx::Status> {
1328 if let Some(read_hook) = &self.read_hook {
1329 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1330 } else {
1331 unimplemented!();
1332 }
1333 }
1334
1335 async fn write(
1336 &self,
1337 device_block_offset: u64,
1338 _block_count: u32,
1339 _vmo: &Arc<zx::Vmo>,
1340 _vmo_offset: u64,
1341 opts: WriteOptions,
1342 _trace_flow_id: TraceFlowId,
1343 ) -> Result<(), zx::Status> {
1344 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1345 && let Some(barrier_hook) = &self.barrier_hook
1346 {
1347 barrier_hook()?;
1348 }
1349 if let Some(write_hook) = &self.write_hook {
1350 write_hook(device_block_offset).await
1351 } else {
1352 unimplemented!();
1353 }
1354 }
1355
1356 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1357 Ok(())
1358 }
1359
1360 async fn trim(
1361 &self,
1362 _device_block_offset: u64,
1363 _block_count: u32,
1364 _trace_flow_id: TraceFlowId,
1365 ) -> Result<(), zx::Status> {
1366 unreachable!();
1367 }
1368
1369 async fn get_volume_info(
1370 &self,
1371 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1372 let () = std::future::pending().await;
1374 unreachable!();
1375 }
1376 }
1377
1378 const BLOCK_SIZE: u32 = 512;
1379 const MAX_TRANSFER_BLOCKS: u32 = 10;
1380
1381 fn test_device_info() -> DeviceInfo {
1382 DeviceInfo::Partition(PartitionInfo {
1383 device_flags: fblock::Flag::READONLY
1384 | fblock::Flag::BARRIER_SUPPORT
1385 | fblock::Flag::FUA_SUPPORT,
1386 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1387 block_range: Some(0..100),
1388 type_guid: [1; 16],
1389 instance_guid: [2; 16],
1390 name: "foo".to_string(),
1391 flags: 0xabcd,
1392 })
1393 }
1394
1395 #[fuchsia::test]
1396 async fn test_barriers_ordering() {
1397 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1398 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1399 let barrier_called = Arc::new(AtomicBool::new(false));
1400
1401 futures::join!(
1402 async move {
1403 let barrier_called_clone = barrier_called.clone();
1404 let block_server = BlockServer::new(
1405 BLOCK_SIZE,
1406 Arc::new(MockInterface {
1407 barrier_hook: Some(Box::new(move || {
1408 barrier_called.store(true, Ordering::Relaxed);
1409 Ok(())
1410 })),
1411 write_hook: Some(Box::new(move |device_block_offset| {
1412 let barrier_called = barrier_called_clone.clone();
1413 Box::pin(async move {
1414 if device_block_offset % 2 == 0 {
1416 fasync::Timer::new(fasync::MonotonicInstant::after(
1417 zx::MonotonicDuration::from_millis(200),
1418 ))
1419 .await;
1420 }
1421 assert!(barrier_called.load(Ordering::Relaxed));
1422 Ok(())
1423 })
1424 })),
1425 ..MockInterface::default()
1426 }),
1427 );
1428 block_server.handle_requests(stream).await.unwrap();
1429 },
1430 async move {
1431 let (session_proxy, server) = fidl::endpoints::create_proxy();
1432
1433 proxy.open_session(server).unwrap();
1434
1435 let vmo_id = session_proxy
1436 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1437 .await
1438 .unwrap()
1439 .unwrap();
1440 assert_ne!(vmo_id.id, 0);
1441
1442 let mut fifo =
1443 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1444 let (mut reader, mut writer) = fifo.async_io();
1445
1446 writer
1447 .write_entries(&BlockFifoRequest {
1448 command: BlockFifoCommand {
1449 opcode: BlockOpcode::Write.into_primitive(),
1450 flags: BlockIoFlag::PRE_BARRIER.bits(),
1451 ..Default::default()
1452 },
1453 vmoid: vmo_id.id,
1454 dev_offset: 0,
1455 length: 5,
1456 vmo_offset: 6,
1457 ..Default::default()
1458 })
1459 .await
1460 .unwrap();
1461
1462 for i in 0..10 {
1463 writer
1464 .write_entries(&BlockFifoRequest {
1465 command: BlockFifoCommand {
1466 opcode: BlockOpcode::Write.into_primitive(),
1467 ..Default::default()
1468 },
1469 vmoid: vmo_id.id,
1470 dev_offset: i + 1,
1471 length: 5,
1472 vmo_offset: 6,
1473 ..Default::default()
1474 })
1475 .await
1476 .unwrap();
1477 }
1478 for _ in 0..11 {
1479 let mut response = BlockFifoResponse::default();
1480 reader.read_entries(&mut response).await.unwrap();
1481 assert_eq!(response.status, zx::sys::ZX_OK);
1482 }
1483
1484 std::mem::drop(proxy);
1485 }
1486 );
1487 }
1488
1489 #[fuchsia::test]
1490 async fn test_info() {
1491 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1492
1493 futures::join!(
1494 async {
1495 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1496 block_server.handle_requests(stream).await.unwrap();
1497 },
1498 async {
1499 let expected_info = test_device_info();
1500 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1501 info
1502 } else {
1503 unreachable!()
1504 };
1505
1506 let block_info = proxy.get_info().await.unwrap().unwrap();
1507 assert_eq!(
1508 block_info.block_count,
1509 partition_info.block_range.as_ref().unwrap().end
1510 - partition_info.block_range.as_ref().unwrap().start
1511 );
1512 assert_eq!(
1513 block_info.flags,
1514 fblock::Flag::READONLY
1515 | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1516 | fblock::Flag::BARRIER_SUPPORT
1517 | fblock::Flag::FUA_SUPPORT
1518 );
1519
1520 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1521
1522 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1523 assert_eq!(status, zx::sys::ZX_OK);
1524 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1525
1526 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1527 assert_eq!(status, zx::sys::ZX_OK);
1528 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1529
1530 let (status, name) = proxy.get_name().await.unwrap();
1531 assert_eq!(status, zx::sys::ZX_OK);
1532 assert_eq!(name.as_ref(), Some(&partition_info.name));
1533
1534 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1535 assert_eq!(metadata.name, name);
1536 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1537 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1538 assert_eq!(
1539 metadata.start_block_offset,
1540 Some(partition_info.block_range.as_ref().unwrap().start)
1541 );
1542 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1543 assert_eq!(metadata.flags, Some(partition_info.flags));
1544
1545 std::mem::drop(proxy);
1546 }
1547 );
1548 }
1549
1550 #[fuchsia::test]
1551 async fn test_attach_vmo() {
1552 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1553
1554 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1555 let koid = vmo.get_koid().unwrap();
1556
1557 futures::join!(
1558 async {
1559 let block_server = BlockServer::new(
1560 BLOCK_SIZE,
1561 Arc::new(MockInterface {
1562 read_hook: Some(Box::new(move |_, _, vmo, _| {
1563 assert_eq!(vmo.get_koid().unwrap(), koid);
1564 Box::pin(async { Ok(()) })
1565 })),
1566 ..MockInterface::default()
1567 }),
1568 );
1569 block_server.handle_requests(stream).await.unwrap();
1570 },
1571 async move {
1572 let (session_proxy, server) = fidl::endpoints::create_proxy();
1573
1574 proxy.open_session(server).unwrap();
1575
1576 let vmo_id = session_proxy
1577 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1578 .await
1579 .unwrap()
1580 .unwrap();
1581 assert_ne!(vmo_id.id, 0);
1582
1583 let mut fifo =
1584 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1585 let (mut reader, mut writer) = fifo.async_io();
1586
1587 let mut count = 1;
1589 loop {
1590 match session_proxy
1591 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1592 .await
1593 .unwrap()
1594 {
1595 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1596 Err(e) => {
1597 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1598 break;
1599 }
1600 }
1601
1602 if count % 10 == 0 {
1604 writer
1605 .write_entries(&BlockFifoRequest {
1606 command: BlockFifoCommand {
1607 opcode: BlockOpcode::Read.into_primitive(),
1608 ..Default::default()
1609 },
1610 vmoid: vmo_id.id,
1611 length: 1,
1612 ..Default::default()
1613 })
1614 .await
1615 .unwrap();
1616
1617 let mut response = BlockFifoResponse::default();
1618 reader.read_entries(&mut response).await.unwrap();
1619 assert_eq!(response.status, zx::sys::ZX_OK);
1620 }
1621
1622 count += 1;
1623 }
1624
1625 assert_eq!(count, u16::MAX as u64);
1626
1627 writer
1629 .write_entries(&BlockFifoRequest {
1630 command: BlockFifoCommand {
1631 opcode: BlockOpcode::CloseVmo.into_primitive(),
1632 ..Default::default()
1633 },
1634 vmoid: vmo_id.id,
1635 ..Default::default()
1636 })
1637 .await
1638 .unwrap();
1639
1640 let mut response = BlockFifoResponse::default();
1641 reader.read_entries(&mut response).await.unwrap();
1642 assert_eq!(response.status, zx::sys::ZX_OK);
1643
1644 let new_vmo_id = session_proxy
1645 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1646 .await
1647 .unwrap()
1648 .unwrap();
1649 assert_eq!(new_vmo_id.id, vmo_id.id);
1651
1652 std::mem::drop(proxy);
1653 }
1654 );
1655 }
1656
1657 #[fuchsia::test]
1658 async fn test_close() {
1659 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1660
1661 let mut server = std::pin::pin!(
1662 async {
1663 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1664 block_server.handle_requests(stream).await.unwrap();
1665 }
1666 .fuse()
1667 );
1668
1669 let mut client = std::pin::pin!(
1670 async {
1671 let (session_proxy, server) = fidl::endpoints::create_proxy();
1672
1673 proxy.open_session(server).unwrap();
1674
1675 std::mem::drop(proxy);
1678
1679 session_proxy.close().await.unwrap().unwrap();
1680
1681 let _: () = std::future::pending().await;
1683 }
1684 .fuse()
1685 );
1686
1687 futures::select!(
1688 _ = server => {}
1689 _ = client => unreachable!(),
1690 );
1691 }
1692
1693 #[derive(Default)]
1694 struct IoMockInterface {
1695 do_checks: bool,
1696 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1697 return_errors: bool,
1698 }
1699
1700 #[derive(Debug)]
1701 enum ExpectedOp {
1702 Read(u64, u32, u64),
1703 Write(u64, u32, u64),
1704 Trim(u64, u32),
1705 Flush,
1706 }
1707
1708 impl super::async_interface::Interface for IoMockInterface {
1709 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1710 Ok(())
1711 }
1712
1713 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1714 Cow::Owned(test_device_info())
1715 }
1716
1717 async fn read(
1718 &self,
1719 device_block_offset: u64,
1720 block_count: u32,
1721 _vmo: &Arc<zx::Vmo>,
1722 vmo_offset: u64,
1723 _opts: ReadOptions,
1724 _trace_flow_id: TraceFlowId,
1725 ) -> Result<(), zx::Status> {
1726 if self.return_errors {
1727 Err(zx::Status::INTERNAL)
1728 } else {
1729 if self.do_checks {
1730 assert_matches!(
1731 self.expected_op.lock().take(),
1732 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1733 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1734 "Read {device_block_offset} {block_count} {vmo_offset}"
1735 );
1736 }
1737 Ok(())
1738 }
1739 }
1740
1741 async fn write(
1742 &self,
1743 device_block_offset: u64,
1744 block_count: u32,
1745 _vmo: &Arc<zx::Vmo>,
1746 vmo_offset: u64,
1747 _write_opts: WriteOptions,
1748 _trace_flow_id: TraceFlowId,
1749 ) -> Result<(), zx::Status> {
1750 if self.return_errors {
1751 Err(zx::Status::NOT_SUPPORTED)
1752 } else {
1753 if self.do_checks {
1754 assert_matches!(
1755 self.expected_op.lock().take(),
1756 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1757 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1758 "Write {device_block_offset} {block_count} {vmo_offset}"
1759 );
1760 }
1761 Ok(())
1762 }
1763 }
1764
1765 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1766 if self.return_errors {
1767 Err(zx::Status::NO_RESOURCES)
1768 } else {
1769 if self.do_checks {
1770 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1771 }
1772 Ok(())
1773 }
1774 }
1775
1776 async fn trim(
1777 &self,
1778 device_block_offset: u64,
1779 block_count: u32,
1780 _trace_flow_id: TraceFlowId,
1781 ) -> Result<(), zx::Status> {
1782 if self.return_errors {
1783 Err(zx::Status::NO_MEMORY)
1784 } else {
1785 if self.do_checks {
1786 assert_matches!(
1787 self.expected_op.lock().take(),
1788 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1789 block_count == b,
1790 "Trim {device_block_offset} {block_count}"
1791 );
1792 }
1793 Ok(())
1794 }
1795 }
1796 }
1797
1798 #[fuchsia::test]
1799 async fn test_io() {
1800 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1801
1802 let expected_op = Arc::new(Mutex::new(None));
1803 let expected_op_clone = expected_op.clone();
1804
1805 let server = async {
1806 let block_server = BlockServer::new(
1807 BLOCK_SIZE,
1808 Arc::new(IoMockInterface {
1809 return_errors: false,
1810 do_checks: true,
1811 expected_op: expected_op_clone,
1812 }),
1813 );
1814 block_server.handle_requests(stream).await.unwrap();
1815 };
1816
1817 let client = async move {
1818 let (session_proxy, server) = fidl::endpoints::create_proxy();
1819
1820 proxy.open_session(server).unwrap();
1821
1822 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1823 let vmo_id = session_proxy
1824 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1825 .await
1826 .unwrap()
1827 .unwrap();
1828
1829 let mut fifo =
1830 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1831 let (mut reader, mut writer) = fifo.async_io();
1832
1833 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1835 writer
1836 .write_entries(&BlockFifoRequest {
1837 command: BlockFifoCommand {
1838 opcode: BlockOpcode::Read.into_primitive(),
1839 ..Default::default()
1840 },
1841 vmoid: vmo_id.id,
1842 dev_offset: 1,
1843 length: 2,
1844 vmo_offset: 3,
1845 ..Default::default()
1846 })
1847 .await
1848 .unwrap();
1849
1850 let mut response = BlockFifoResponse::default();
1851 reader.read_entries(&mut response).await.unwrap();
1852 assert_eq!(response.status, zx::sys::ZX_OK);
1853
1854 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1856 writer
1857 .write_entries(&BlockFifoRequest {
1858 command: BlockFifoCommand {
1859 opcode: BlockOpcode::Write.into_primitive(),
1860 ..Default::default()
1861 },
1862 vmoid: vmo_id.id,
1863 dev_offset: 4,
1864 length: 5,
1865 vmo_offset: 6,
1866 ..Default::default()
1867 })
1868 .await
1869 .unwrap();
1870
1871 let mut response = BlockFifoResponse::default();
1872 reader.read_entries(&mut response).await.unwrap();
1873 assert_eq!(response.status, zx::sys::ZX_OK);
1874
1875 *expected_op.lock() = Some(ExpectedOp::Flush);
1877 writer
1878 .write_entries(&BlockFifoRequest {
1879 command: BlockFifoCommand {
1880 opcode: BlockOpcode::Flush.into_primitive(),
1881 ..Default::default()
1882 },
1883 ..Default::default()
1884 })
1885 .await
1886 .unwrap();
1887
1888 reader.read_entries(&mut response).await.unwrap();
1889 assert_eq!(response.status, zx::sys::ZX_OK);
1890
1891 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1893 writer
1894 .write_entries(&BlockFifoRequest {
1895 command: BlockFifoCommand {
1896 opcode: BlockOpcode::Trim.into_primitive(),
1897 ..Default::default()
1898 },
1899 dev_offset: 7,
1900 length: 8,
1901 ..Default::default()
1902 })
1903 .await
1904 .unwrap();
1905
1906 reader.read_entries(&mut response).await.unwrap();
1907 assert_eq!(response.status, zx::sys::ZX_OK);
1908
1909 std::mem::drop(proxy);
1910 };
1911
1912 futures::join!(server, client);
1913 }
1914
1915 #[fuchsia::test]
1916 async fn test_io_errors() {
1917 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1918
1919 futures::join!(
1920 async {
1921 let block_server = BlockServer::new(
1922 BLOCK_SIZE,
1923 Arc::new(IoMockInterface {
1924 return_errors: true,
1925 do_checks: false,
1926 expected_op: Arc::new(Mutex::new(None)),
1927 }),
1928 );
1929 block_server.handle_requests(stream).await.unwrap();
1930 },
1931 async move {
1932 let (session_proxy, server) = fidl::endpoints::create_proxy();
1933
1934 proxy.open_session(server).unwrap();
1935
1936 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1937 let vmo_id = session_proxy
1938 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1939 .await
1940 .unwrap()
1941 .unwrap();
1942
1943 let mut fifo =
1944 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1945 let (mut reader, mut writer) = fifo.async_io();
1946
1947 writer
1949 .write_entries(&BlockFifoRequest {
1950 command: BlockFifoCommand {
1951 opcode: BlockOpcode::Read.into_primitive(),
1952 ..Default::default()
1953 },
1954 vmoid: vmo_id.id,
1955 length: 1,
1956 reqid: 1,
1957 ..Default::default()
1958 })
1959 .await
1960 .unwrap();
1961
1962 let mut response = BlockFifoResponse::default();
1963 reader.read_entries(&mut response).await.unwrap();
1964 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1965
1966 writer
1968 .write_entries(&BlockFifoRequest {
1969 command: BlockFifoCommand {
1970 opcode: BlockOpcode::Write.into_primitive(),
1971 ..Default::default()
1972 },
1973 vmoid: vmo_id.id,
1974 length: 1,
1975 reqid: 2,
1976 ..Default::default()
1977 })
1978 .await
1979 .unwrap();
1980
1981 reader.read_entries(&mut response).await.unwrap();
1982 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1983
1984 writer
1986 .write_entries(&BlockFifoRequest {
1987 command: BlockFifoCommand {
1988 opcode: BlockOpcode::Flush.into_primitive(),
1989 ..Default::default()
1990 },
1991 reqid: 3,
1992 ..Default::default()
1993 })
1994 .await
1995 .unwrap();
1996
1997 reader.read_entries(&mut response).await.unwrap();
1998 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1999
2000 writer
2002 .write_entries(&BlockFifoRequest {
2003 command: BlockFifoCommand {
2004 opcode: BlockOpcode::Trim.into_primitive(),
2005 ..Default::default()
2006 },
2007 reqid: 4,
2008 length: 1,
2009 ..Default::default()
2010 })
2011 .await
2012 .unwrap();
2013
2014 reader.read_entries(&mut response).await.unwrap();
2015 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2016
2017 std::mem::drop(proxy);
2018 }
2019 );
2020 }
2021
2022 #[fuchsia::test]
2023 async fn test_invalid_args() {
2024 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2025
2026 futures::join!(
2027 async {
2028 let block_server = BlockServer::new(
2029 BLOCK_SIZE,
2030 Arc::new(IoMockInterface {
2031 return_errors: false,
2032 do_checks: false,
2033 expected_op: Arc::new(Mutex::new(None)),
2034 }),
2035 );
2036 block_server.handle_requests(stream).await.unwrap();
2037 },
2038 async move {
2039 let (session_proxy, server) = fidl::endpoints::create_proxy();
2040
2041 proxy.open_session(server).unwrap();
2042
2043 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2044 let vmo_id = session_proxy
2045 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2046 .await
2047 .unwrap()
2048 .unwrap();
2049
2050 let mut fifo =
2051 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2052
2053 async fn test(
2054 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2055 request: BlockFifoRequest,
2056 ) -> Result<(), zx::Status> {
2057 let (mut reader, mut writer) = fifo.async_io();
2058 writer.write_entries(&request).await.unwrap();
2059 let mut response = BlockFifoResponse::default();
2060 reader.read_entries(&mut response).await.unwrap();
2061 zx::Status::ok(response.status)
2062 }
2063
2064 let good_read_request = || BlockFifoRequest {
2067 command: BlockFifoCommand {
2068 opcode: BlockOpcode::Read.into_primitive(),
2069 ..Default::default()
2070 },
2071 length: 1,
2072 vmoid: vmo_id.id,
2073 ..Default::default()
2074 };
2075
2076 assert_eq!(
2077 test(
2078 &mut fifo,
2079 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2080 )
2081 .await,
2082 Err(zx::Status::IO)
2083 );
2084
2085 assert_eq!(
2086 test(
2087 &mut fifo,
2088 BlockFifoRequest {
2089 vmo_offset: 0xffff_ffff_ffff_ffff,
2090 ..good_read_request()
2091 }
2092 )
2093 .await,
2094 Err(zx::Status::OUT_OF_RANGE)
2095 );
2096
2097 assert_eq!(
2098 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2099 Err(zx::Status::INVALID_ARGS)
2100 );
2101
2102 let good_write_request = || BlockFifoRequest {
2105 command: BlockFifoCommand {
2106 opcode: BlockOpcode::Write.into_primitive(),
2107 ..Default::default()
2108 },
2109 length: 1,
2110 vmoid: vmo_id.id,
2111 ..Default::default()
2112 };
2113
2114 assert_eq!(
2115 test(
2116 &mut fifo,
2117 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2118 )
2119 .await,
2120 Err(zx::Status::IO)
2121 );
2122
2123 assert_eq!(
2124 test(
2125 &mut fifo,
2126 BlockFifoRequest {
2127 vmo_offset: 0xffff_ffff_ffff_ffff,
2128 ..good_write_request()
2129 }
2130 )
2131 .await,
2132 Err(zx::Status::OUT_OF_RANGE)
2133 );
2134
2135 assert_eq!(
2136 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2137 Err(zx::Status::INVALID_ARGS)
2138 );
2139
2140 assert_eq!(
2143 test(
2144 &mut fifo,
2145 BlockFifoRequest {
2146 command: BlockFifoCommand {
2147 opcode: BlockOpcode::CloseVmo.into_primitive(),
2148 ..Default::default()
2149 },
2150 vmoid: vmo_id.id + 1,
2151 ..Default::default()
2152 }
2153 )
2154 .await,
2155 Err(zx::Status::IO)
2156 );
2157
2158 std::mem::drop(proxy);
2159 }
2160 );
2161 }
2162
2163 #[fuchsia::test]
2164 async fn test_concurrent_requests() {
2165 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2166
2167 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2168 let waiting_readers_clone = waiting_readers.clone();
2169
2170 futures::join!(
2171 async move {
2172 let block_server = BlockServer::new(
2173 BLOCK_SIZE,
2174 Arc::new(MockInterface {
2175 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2176 let (tx, rx) = oneshot::channel();
2177 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2178 Box::pin(async move {
2179 let _ = rx.await;
2180 Ok(())
2181 })
2182 })),
2183 ..MockInterface::default()
2184 }),
2185 );
2186 block_server.handle_requests(stream).await.unwrap();
2187 },
2188 async move {
2189 let (session_proxy, server) = fidl::endpoints::create_proxy();
2190
2191 proxy.open_session(server).unwrap();
2192
2193 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2194 let vmo_id = session_proxy
2195 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2196 .await
2197 .unwrap()
2198 .unwrap();
2199
2200 let mut fifo =
2201 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2202 let (mut reader, mut writer) = fifo.async_io();
2203
2204 writer
2205 .write_entries(&BlockFifoRequest {
2206 command: BlockFifoCommand {
2207 opcode: BlockOpcode::Read.into_primitive(),
2208 ..Default::default()
2209 },
2210 reqid: 1,
2211 dev_offset: 1, vmoid: vmo_id.id,
2213 length: 1,
2214 ..Default::default()
2215 })
2216 .await
2217 .unwrap();
2218
2219 writer
2220 .write_entries(&BlockFifoRequest {
2221 command: BlockFifoCommand {
2222 opcode: BlockOpcode::Read.into_primitive(),
2223 ..Default::default()
2224 },
2225 reqid: 2,
2226 dev_offset: 2,
2227 vmoid: vmo_id.id,
2228 length: 1,
2229 ..Default::default()
2230 })
2231 .await
2232 .unwrap();
2233
2234 poll_fn(|cx: &mut Context<'_>| {
2236 if waiting_readers.lock().len() == 2 {
2237 Poll::Ready(())
2238 } else {
2239 cx.waker().wake_by_ref();
2241 Poll::Pending
2242 }
2243 })
2244 .await;
2245
2246 let mut response = BlockFifoResponse::default();
2247 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2248
2249 let (id, tx) = waiting_readers.lock().pop().unwrap();
2250 tx.send(()).unwrap();
2251
2252 reader.read_entries(&mut response).await.unwrap();
2253 assert_eq!(response.status, zx::sys::ZX_OK);
2254 assert_eq!(response.reqid, id);
2255
2256 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2257
2258 let (id, tx) = waiting_readers.lock().pop().unwrap();
2259 tx.send(()).unwrap();
2260
2261 reader.read_entries(&mut response).await.unwrap();
2262 assert_eq!(response.status, zx::sys::ZX_OK);
2263 assert_eq!(response.reqid, id);
2264 }
2265 );
2266 }
2267
2268 #[fuchsia::test]
2269 async fn test_groups() {
2270 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2271
2272 futures::join!(
2273 async move {
2274 let block_server = BlockServer::new(
2275 BLOCK_SIZE,
2276 Arc::new(MockInterface {
2277 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2278 ..MockInterface::default()
2279 }),
2280 );
2281 block_server.handle_requests(stream).await.unwrap();
2282 },
2283 async move {
2284 let (session_proxy, server) = fidl::endpoints::create_proxy();
2285
2286 proxy.open_session(server).unwrap();
2287
2288 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2289 let vmo_id = session_proxy
2290 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2291 .await
2292 .unwrap()
2293 .unwrap();
2294
2295 let mut fifo =
2296 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2297 let (mut reader, mut writer) = fifo.async_io();
2298
2299 writer
2300 .write_entries(&BlockFifoRequest {
2301 command: BlockFifoCommand {
2302 opcode: BlockOpcode::Read.into_primitive(),
2303 flags: BlockIoFlag::GROUP_ITEM.bits(),
2304 ..Default::default()
2305 },
2306 group: 1,
2307 vmoid: vmo_id.id,
2308 length: 1,
2309 ..Default::default()
2310 })
2311 .await
2312 .unwrap();
2313
2314 writer
2315 .write_entries(&BlockFifoRequest {
2316 command: BlockFifoCommand {
2317 opcode: BlockOpcode::Read.into_primitive(),
2318 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2319 ..Default::default()
2320 },
2321 reqid: 2,
2322 group: 1,
2323 vmoid: vmo_id.id,
2324 length: 1,
2325 ..Default::default()
2326 })
2327 .await
2328 .unwrap();
2329
2330 let mut response = BlockFifoResponse::default();
2331 reader.read_entries(&mut response).await.unwrap();
2332 assert_eq!(response.status, zx::sys::ZX_OK);
2333 assert_eq!(response.reqid, 2);
2334 assert_eq!(response.group, 1);
2335 }
2336 );
2337 }
2338
2339 #[fuchsia::test]
2340 async fn test_group_error() {
2341 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2342
2343 let counter = Arc::new(AtomicU64::new(0));
2344 let counter_clone = counter.clone();
2345
2346 futures::join!(
2347 async move {
2348 let block_server = BlockServer::new(
2349 BLOCK_SIZE,
2350 Arc::new(MockInterface {
2351 read_hook: Some(Box::new(move |_, _, _, _| {
2352 counter_clone.fetch_add(1, Ordering::Relaxed);
2353 Box::pin(async { Err(zx::Status::BAD_STATE) })
2354 })),
2355 ..MockInterface::default()
2356 }),
2357 );
2358 block_server.handle_requests(stream).await.unwrap();
2359 },
2360 async move {
2361 let (session_proxy, server) = fidl::endpoints::create_proxy();
2362
2363 proxy.open_session(server).unwrap();
2364
2365 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2366 let vmo_id = session_proxy
2367 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2368 .await
2369 .unwrap()
2370 .unwrap();
2371
2372 let mut fifo =
2373 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2374 let (mut reader, mut writer) = fifo.async_io();
2375
2376 writer
2377 .write_entries(&BlockFifoRequest {
2378 command: BlockFifoCommand {
2379 opcode: BlockOpcode::Read.into_primitive(),
2380 flags: BlockIoFlag::GROUP_ITEM.bits(),
2381 ..Default::default()
2382 },
2383 group: 1,
2384 vmoid: vmo_id.id,
2385 length: 1,
2386 ..Default::default()
2387 })
2388 .await
2389 .unwrap();
2390
2391 poll_fn(|cx: &mut Context<'_>| {
2393 if counter.load(Ordering::Relaxed) == 1 {
2394 Poll::Ready(())
2395 } else {
2396 cx.waker().wake_by_ref();
2398 Poll::Pending
2399 }
2400 })
2401 .await;
2402
2403 let mut response = BlockFifoResponse::default();
2404 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2405
2406 writer
2407 .write_entries(&BlockFifoRequest {
2408 command: BlockFifoCommand {
2409 opcode: BlockOpcode::Read.into_primitive(),
2410 flags: BlockIoFlag::GROUP_ITEM.bits(),
2411 ..Default::default()
2412 },
2413 group: 1,
2414 vmoid: vmo_id.id,
2415 length: 1,
2416 ..Default::default()
2417 })
2418 .await
2419 .unwrap();
2420
2421 writer
2422 .write_entries(&BlockFifoRequest {
2423 command: BlockFifoCommand {
2424 opcode: BlockOpcode::Read.into_primitive(),
2425 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2426 ..Default::default()
2427 },
2428 reqid: 2,
2429 group: 1,
2430 vmoid: vmo_id.id,
2431 length: 1,
2432 ..Default::default()
2433 })
2434 .await
2435 .unwrap();
2436
2437 reader.read_entries(&mut response).await.unwrap();
2438 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2439 assert_eq!(response.reqid, 2);
2440 assert_eq!(response.group, 1);
2441
2442 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2443
2444 assert_eq!(counter.load(Ordering::Relaxed), 1);
2446 }
2447 );
2448 }
2449
2450 #[fuchsia::test]
2451 async fn test_group_with_two_lasts() {
2452 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2453
2454 let (tx, rx) = oneshot::channel();
2455
2456 futures::join!(
2457 async move {
2458 let rx = Mutex::new(Some(rx));
2459 let block_server = BlockServer::new(
2460 BLOCK_SIZE,
2461 Arc::new(MockInterface {
2462 read_hook: Some(Box::new(move |_, _, _, _| {
2463 let rx = rx.lock().take().unwrap();
2464 Box::pin(async {
2465 let _ = rx.await;
2466 Ok(())
2467 })
2468 })),
2469 ..MockInterface::default()
2470 }),
2471 );
2472 block_server.handle_requests(stream).await.unwrap();
2473 },
2474 async move {
2475 let (session_proxy, server) = fidl::endpoints::create_proxy();
2476
2477 proxy.open_session(server).unwrap();
2478
2479 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2480 let vmo_id = session_proxy
2481 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2482 .await
2483 .unwrap()
2484 .unwrap();
2485
2486 let mut fifo =
2487 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2488 let (mut reader, mut writer) = fifo.async_io();
2489
2490 writer
2491 .write_entries(&BlockFifoRequest {
2492 command: BlockFifoCommand {
2493 opcode: BlockOpcode::Read.into_primitive(),
2494 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2495 ..Default::default()
2496 },
2497 reqid: 1,
2498 group: 1,
2499 vmoid: vmo_id.id,
2500 length: 1,
2501 ..Default::default()
2502 })
2503 .await
2504 .unwrap();
2505
2506 writer
2507 .write_entries(&BlockFifoRequest {
2508 command: BlockFifoCommand {
2509 opcode: BlockOpcode::Read.into_primitive(),
2510 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2511 ..Default::default()
2512 },
2513 reqid: 2,
2514 group: 1,
2515 vmoid: vmo_id.id,
2516 length: 1,
2517 ..Default::default()
2518 })
2519 .await
2520 .unwrap();
2521
2522 writer
2524 .write_entries(&BlockFifoRequest {
2525 command: BlockFifoCommand {
2526 opcode: BlockOpcode::CloseVmo.into_primitive(),
2527 ..Default::default()
2528 },
2529 reqid: 3,
2530 vmoid: vmo_id.id,
2531 ..Default::default()
2532 })
2533 .await
2534 .unwrap();
2535
2536 let mut response = BlockFifoResponse::default();
2538 reader.read_entries(&mut response).await.unwrap();
2539 assert_eq!(response.status, zx::sys::ZX_OK);
2540 assert_eq!(response.reqid, 3);
2541
2542 tx.send(()).unwrap();
2544
2545 let mut response = BlockFifoResponse::default();
2548 reader.read_entries(&mut response).await.unwrap();
2549 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2550 assert_eq!(response.reqid, 1);
2551 assert_eq!(response.group, 1);
2552 }
2553 );
2554 }
2555
2556 #[fuchsia::test(allow_stalls = false)]
2557 async fn test_requests_dont_block_sessions() {
2558 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2559
2560 let (tx, rx) = oneshot::channel();
2561
2562 fasync::Task::local(async move {
2563 let rx = Mutex::new(Some(rx));
2564 let block_server = BlockServer::new(
2565 BLOCK_SIZE,
2566 Arc::new(MockInterface {
2567 read_hook: Some(Box::new(move |_, _, _, _| {
2568 let rx = rx.lock().take().unwrap();
2569 Box::pin(async {
2570 let _ = rx.await;
2571 Ok(())
2572 })
2573 })),
2574 ..MockInterface::default()
2575 }),
2576 );
2577 block_server.handle_requests(stream).await.unwrap();
2578 })
2579 .detach();
2580
2581 let mut fut = pin!(async {
2582 let (session_proxy, server) = fidl::endpoints::create_proxy();
2583
2584 proxy.open_session(server).unwrap();
2585
2586 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2587 let vmo_id = session_proxy
2588 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2589 .await
2590 .unwrap()
2591 .unwrap();
2592
2593 let mut fifo =
2594 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2595 let (mut reader, mut writer) = fifo.async_io();
2596
2597 writer
2598 .write_entries(&BlockFifoRequest {
2599 command: BlockFifoCommand {
2600 opcode: BlockOpcode::Read.into_primitive(),
2601 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2602 ..Default::default()
2603 },
2604 reqid: 1,
2605 group: 1,
2606 vmoid: vmo_id.id,
2607 length: 1,
2608 ..Default::default()
2609 })
2610 .await
2611 .unwrap();
2612
2613 let mut response = BlockFifoResponse::default();
2614 reader.read_entries(&mut response).await.unwrap();
2615 assert_eq!(response.status, zx::sys::ZX_OK);
2616 });
2617
2618 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2620
2621 let mut fut2 = pin!(proxy.get_volume_info());
2622
2623 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2625
2626 let _ = tx.send(());
2629
2630 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2631 }
2632
2633 #[fuchsia::test]
2634 async fn test_request_flow_control() {
2635 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2636
2637 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2640 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2641 let event_clone = event.clone();
2642 futures::join!(
2643 async move {
2644 let block_server = BlockServer::new(
2645 BLOCK_SIZE,
2646 Arc::new(MockInterface {
2647 read_hook: Some(Box::new(move |_, _, _, _| {
2648 let event_clone = event_clone.clone();
2649 Box::pin(async move {
2650 if !event_clone.1.load(Ordering::SeqCst) {
2651 event_clone.0.listen().await;
2652 }
2653 Ok(())
2654 })
2655 })),
2656 ..MockInterface::default()
2657 }),
2658 );
2659 block_server.handle_requests(stream).await.unwrap();
2660 },
2661 async move {
2662 let (session_proxy, server) = fidl::endpoints::create_proxy();
2663
2664 proxy.open_session(server).unwrap();
2665
2666 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2667 let vmo_id = session_proxy
2668 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2669 .await
2670 .unwrap()
2671 .unwrap();
2672
2673 let mut fifo =
2674 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2675 let (mut reader, mut writer) = fifo.async_io();
2676
2677 for i in 0..MAX_REQUESTS {
2678 writer
2679 .write_entries(&BlockFifoRequest {
2680 command: BlockFifoCommand {
2681 opcode: BlockOpcode::Read.into_primitive(),
2682 ..Default::default()
2683 },
2684 reqid: (i + 1) as u32,
2685 dev_offset: i,
2686 vmoid: vmo_id.id,
2687 length: 1,
2688 ..Default::default()
2689 })
2690 .await
2691 .unwrap();
2692 }
2693 assert!(
2694 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2695 command: BlockFifoCommand {
2696 opcode: BlockOpcode::Read.into_primitive(),
2697 ..Default::default()
2698 },
2699 reqid: u32::MAX,
2700 dev_offset: MAX_REQUESTS,
2701 vmoid: vmo_id.id,
2702 length: 1,
2703 ..Default::default()
2704 })))
2705 .is_pending()
2706 );
2707 event.1.store(true, Ordering::SeqCst);
2709 event.0.notify(usize::MAX);
2710 let mut finished_reqids = vec![];
2712 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2713 let mut response = BlockFifoResponse::default();
2714 reader.read_entries(&mut response).await.unwrap();
2715 assert_eq!(response.status, zx::sys::ZX_OK);
2716 finished_reqids.push(response.reqid);
2717 writer
2718 .write_entries(&BlockFifoRequest {
2719 command: BlockFifoCommand {
2720 opcode: BlockOpcode::Read.into_primitive(),
2721 ..Default::default()
2722 },
2723 reqid: (i + 1) as u32,
2724 dev_offset: i,
2725 vmoid: vmo_id.id,
2726 length: 1,
2727 ..Default::default()
2728 })
2729 .await
2730 .unwrap();
2731 }
2732 let mut response = BlockFifoResponse::default();
2733 for _ in 0..MAX_REQUESTS {
2734 reader.read_entries(&mut response).await.unwrap();
2735 assert_eq!(response.status, zx::sys::ZX_OK);
2736 finished_reqids.push(response.reqid);
2737 }
2738 finished_reqids.sort();
2741 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2742 let mut i = 1;
2743 for reqid in finished_reqids {
2744 assert_eq!(reqid, i);
2745 i += 1;
2746 }
2747 }
2748 );
2749 }
2750
2751 #[fuchsia::test]
2752 async fn test_passthrough_io_with_fixed_map() {
2753 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2754
2755 let expected_op = Arc::new(Mutex::new(None));
2756 let expected_op_clone = expected_op.clone();
2757 futures::join!(
2758 async {
2759 let block_server = BlockServer::new(
2760 BLOCK_SIZE,
2761 Arc::new(IoMockInterface {
2762 return_errors: false,
2763 do_checks: true,
2764 expected_op: expected_op_clone,
2765 }),
2766 );
2767 block_server.handle_requests(stream).await.unwrap();
2768 },
2769 async move {
2770 let (session_proxy, server) = fidl::endpoints::create_proxy();
2771
2772 let mapping = fblock::BlockOffsetMapping {
2773 source_block_offset: 0,
2774 target_block_offset: 10,
2775 length: 20,
2776 };
2777 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2778
2779 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2780 let vmo_id = session_proxy
2781 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2782 .await
2783 .unwrap()
2784 .unwrap();
2785
2786 let mut fifo =
2787 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2788 let (mut reader, mut writer) = fifo.async_io();
2789
2790 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2792 writer
2793 .write_entries(&BlockFifoRequest {
2794 command: BlockFifoCommand {
2795 opcode: BlockOpcode::Read.into_primitive(),
2796 ..Default::default()
2797 },
2798 vmoid: vmo_id.id,
2799 dev_offset: 1,
2800 length: 2,
2801 vmo_offset: 3,
2802 ..Default::default()
2803 })
2804 .await
2805 .unwrap();
2806
2807 let mut response = BlockFifoResponse::default();
2808 reader.read_entries(&mut response).await.unwrap();
2809 assert_eq!(response.status, zx::sys::ZX_OK);
2810
2811 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2813 writer
2814 .write_entries(&BlockFifoRequest {
2815 command: BlockFifoCommand {
2816 opcode: BlockOpcode::Write.into_primitive(),
2817 ..Default::default()
2818 },
2819 vmoid: vmo_id.id,
2820 dev_offset: 4,
2821 length: 5,
2822 vmo_offset: 6,
2823 ..Default::default()
2824 })
2825 .await
2826 .unwrap();
2827
2828 reader.read_entries(&mut response).await.unwrap();
2829 assert_eq!(response.status, zx::sys::ZX_OK);
2830
2831 *expected_op.lock() = Some(ExpectedOp::Flush);
2833 writer
2834 .write_entries(&BlockFifoRequest {
2835 command: BlockFifoCommand {
2836 opcode: BlockOpcode::Flush.into_primitive(),
2837 ..Default::default()
2838 },
2839 ..Default::default()
2840 })
2841 .await
2842 .unwrap();
2843
2844 reader.read_entries(&mut response).await.unwrap();
2845 assert_eq!(response.status, zx::sys::ZX_OK);
2846
2847 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2849 writer
2850 .write_entries(&BlockFifoRequest {
2851 command: BlockFifoCommand {
2852 opcode: BlockOpcode::Trim.into_primitive(),
2853 ..Default::default()
2854 },
2855 dev_offset: 7,
2856 length: 3,
2857 ..Default::default()
2858 })
2859 .await
2860 .unwrap();
2861
2862 reader.read_entries(&mut response).await.unwrap();
2863 assert_eq!(response.status, zx::sys::ZX_OK);
2864
2865 *expected_op.lock() = None;
2867 writer
2868 .write_entries(&BlockFifoRequest {
2869 command: BlockFifoCommand {
2870 opcode: BlockOpcode::Read.into_primitive(),
2871 ..Default::default()
2872 },
2873 vmoid: vmo_id.id,
2874 dev_offset: 19,
2875 length: 2,
2876 vmo_offset: 3,
2877 ..Default::default()
2878 })
2879 .await
2880 .unwrap();
2881
2882 reader.read_entries(&mut response).await.unwrap();
2883 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2884
2885 std::mem::drop(proxy);
2886 }
2887 );
2888 }
2889
2890 #[fuchsia::test]
2891 fn operation_map() {
2892 fn expect_map_result(
2893 mut operation: Operation,
2894 mapping: Option<BlockOffsetMapping>,
2895 max_blocks: Option<NonZero<u32>>,
2896 expected_operations: Vec<Operation>,
2897 ) {
2898 let mut ops = vec![];
2899 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2900 ops.push(operation);
2901 operation = remainder;
2902 }
2903 ops.push(operation);
2904 assert_eq!(ops, expected_operations);
2905 }
2906
2907 expect_map_result(
2909 Operation::Read {
2910 device_block_offset: 10,
2911 block_count: 200,
2912 _unused: 0,
2913 vmo_offset: 0,
2914 options: ReadOptions::default(),
2915 },
2916 None,
2917 None,
2918 vec![Operation::Read {
2919 device_block_offset: 10,
2920 block_count: 200,
2921 _unused: 0,
2922 vmo_offset: 0,
2923 options: ReadOptions::default(),
2924 }],
2925 );
2926
2927 expect_map_result(
2929 Operation::Read {
2930 device_block_offset: 10,
2931 block_count: 200,
2932 _unused: 0,
2933 vmo_offset: 0,
2934 options: ReadOptions::default(),
2935 },
2936 None,
2937 NonZero::new(120),
2938 vec![
2939 Operation::Read {
2940 device_block_offset: 10,
2941 block_count: 120,
2942 _unused: 0,
2943 vmo_offset: 0,
2944 options: ReadOptions::default(),
2945 },
2946 Operation::Read {
2947 device_block_offset: 130,
2948 block_count: 80,
2949 _unused: 0,
2950 vmo_offset: 120 * 512,
2951 options: ReadOptions::default(),
2952 },
2953 ],
2954 );
2955 expect_map_result(
2956 Operation::Trim { device_block_offset: 10, block_count: 200 },
2957 None,
2958 NonZero::new(120),
2959 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2960 );
2961
2962 expect_map_result(
2964 Operation::Read {
2965 device_block_offset: 10,
2966 block_count: 200,
2967 _unused: 0,
2968 vmo_offset: 0,
2969 options: ReadOptions::default(),
2970 },
2971 Some(BlockOffsetMapping {
2972 source_block_offset: 10,
2973 target_block_offset: 100,
2974 length: 200,
2975 }),
2976 NonZero::new(120),
2977 vec![
2978 Operation::Read {
2979 device_block_offset: 100,
2980 block_count: 120,
2981 _unused: 0,
2982 vmo_offset: 0,
2983 options: ReadOptions::default(),
2984 },
2985 Operation::Read {
2986 device_block_offset: 220,
2987 block_count: 80,
2988 _unused: 0,
2989 vmo_offset: 120 * 512,
2990 options: ReadOptions::default(),
2991 },
2992 ],
2993 );
2994 expect_map_result(
2995 Operation::Trim { device_block_offset: 10, block_count: 200 },
2996 Some(BlockOffsetMapping {
2997 source_block_offset: 10,
2998 target_block_offset: 100,
2999 length: 200,
3000 }),
3001 NonZero::new(120),
3002 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3003 );
3004 }
3005
3006 #[fuchsia::test]
3008 async fn test_pre_barrier_flush_failure() {
3009 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3010
3011 struct NoBarrierInterface;
3012 impl super::async_interface::Interface for NoBarrierInterface {
3013 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3014 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3015 device_flags: fblock::Flag::empty(), max_transfer_blocks: NonZero::new(100),
3017 block_range: Some(0..100),
3018 type_guid: [0; 16],
3019 instance_guid: [0; 16],
3020 name: "test".to_string(),
3021 flags: 0,
3022 }))
3023 }
3024 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3025 Ok(())
3026 }
3027 async fn read(
3028 &self,
3029 _: u64,
3030 _: u32,
3031 _: &Arc<zx::Vmo>,
3032 _: u64,
3033 _: ReadOptions,
3034 _: TraceFlowId,
3035 ) -> Result<(), zx::Status> {
3036 unreachable!()
3037 }
3038 async fn write(
3039 &self,
3040 _: u64,
3041 _: u32,
3042 _: &Arc<zx::Vmo>,
3043 _: u64,
3044 _: WriteOptions,
3045 _: TraceFlowId,
3046 ) -> Result<(), zx::Status> {
3047 panic!("Write should not be called");
3048 }
3049 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3050 Err(zx::Status::IO)
3051 }
3052 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3053 unreachable!()
3054 }
3055 }
3056
3057 futures::join!(
3058 async move {
3059 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3060 block_server.handle_requests(stream).await.unwrap();
3061 },
3062 async move {
3063 let (session_proxy, server) = fidl::endpoints::create_proxy();
3064 proxy.open_session(server).unwrap();
3065 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3066 let vmo_id = session_proxy
3067 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3068 .await
3069 .unwrap()
3070 .unwrap();
3071
3072 let mut fifo =
3073 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3074 let (mut reader, mut writer) = fifo.async_io();
3075
3076 writer
3077 .write_entries(&BlockFifoRequest {
3078 command: BlockFifoCommand {
3079 opcode: BlockOpcode::Write.into_primitive(),
3080 flags: BlockIoFlag::PRE_BARRIER.bits(),
3081 ..Default::default()
3082 },
3083 vmoid: vmo_id.id,
3084 length: 1,
3085 ..Default::default()
3086 })
3087 .await
3088 .unwrap();
3089
3090 let mut response = BlockFifoResponse::default();
3091 reader.read_entries(&mut response).await.unwrap();
3092 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3093 }
3094 );
3095 }
3096
3097 #[fuchsia::test]
3100 async fn test_post_barrier_write_failure() {
3101 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3102
3103 struct NoBarrierInterface;
3104 impl super::async_interface::Interface for NoBarrierInterface {
3105 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3106 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3107 device_flags: fblock::Flag::empty(), max_transfer_blocks: NonZero::new(100),
3109 block_range: Some(0..100),
3110 type_guid: [0; 16],
3111 instance_guid: [0; 16],
3112 name: "test".to_string(),
3113 flags: 0,
3114 }))
3115 }
3116 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3117 Ok(())
3118 }
3119 async fn read(
3120 &self,
3121 _: u64,
3122 _: u32,
3123 _: &Arc<zx::Vmo>,
3124 _: u64,
3125 _: ReadOptions,
3126 _: TraceFlowId,
3127 ) -> Result<(), zx::Status> {
3128 unreachable!()
3129 }
3130 async fn write(
3131 &self,
3132 _: u64,
3133 _: u32,
3134 _: &Arc<zx::Vmo>,
3135 _: u64,
3136 _: WriteOptions,
3137 _: TraceFlowId,
3138 ) -> Result<(), zx::Status> {
3139 Err(zx::Status::IO)
3140 }
3141 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3142 panic!("Flush should not be called")
3143 }
3144 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3145 unreachable!()
3146 }
3147 }
3148
3149 futures::join!(
3150 async move {
3151 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3152 block_server.handle_requests(stream).await.unwrap();
3153 },
3154 async move {
3155 let (session_proxy, server) = fidl::endpoints::create_proxy();
3156 proxy.open_session(server).unwrap();
3157 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3158 let vmo_id = session_proxy
3159 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3160 .await
3161 .unwrap()
3162 .unwrap();
3163
3164 let mut fifo =
3165 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3166 let (mut reader, mut writer) = fifo.async_io();
3167
3168 writer
3169 .write_entries(&BlockFifoRequest {
3170 command: BlockFifoCommand {
3171 opcode: BlockOpcode::Write.into_primitive(),
3172 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3173 ..Default::default()
3174 },
3175 vmoid: vmo_id.id,
3176 length: 1,
3177 ..Default::default()
3178 })
3179 .await
3180 .unwrap();
3181
3182 let mut response = BlockFifoResponse::default();
3183 reader.read_entries(&mut response).await.unwrap();
3184 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3185 }
3186 );
3187 }
3188}