1use anyhow::Error;
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fidl_fuchsia_storage_block as fblock;
8use fuchsia_async as fasync;
9use fuchsia_async::epoch::{Epoch, EpochGuard};
10use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
11use futures::{Future, FutureExt as _, TryStreamExt as _};
12use slab::Slab;
13use std::borrow::{Borrow, Cow};
14use std::collections::BTreeMap;
15use std::num::NonZero;
16use std::ops::Range;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use storage_device::buffer::Buffer;
20
21pub mod async_interface;
22pub mod c_interface;
23pub mod callback_interface;
24
25#[cfg(test)]
26mod decompression_tests;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn label(&self) -> &str {
40 match self {
41 Self::Block(BlockInfo { .. }) => "",
42 Self::Partition(PartitionInfo { name, .. }) => name,
43 }
44 }
45 pub fn device_flags(&self) -> fblock::DeviceFlag {
46 match self {
47 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
48 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
49 }
50 }
51
52 pub fn block_count(&self) -> Option<u64> {
53 match self {
54 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
55 Self::Partition(PartitionInfo { block_range, .. }) => {
56 block_range.as_ref().map(|range| range.end - range.start)
57 }
58 }
59 }
60
61 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
62 match self {
63 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
64 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
65 max_transfer_blocks.clone()
66 }
67 }
68 }
69
70 fn max_transfer_size(&self, block_size: u32) -> u32 {
71 if let Some(max_blocks) = self.max_transfer_blocks() {
72 max_blocks.get() * block_size
73 } else {
74 MAX_TRANSFER_UNBOUNDED
75 }
76 }
77}
78
79#[derive(Clone, Default)]
81pub struct BlockInfo {
82 pub device_flags: fblock::DeviceFlag,
83 pub block_count: u64,
84 pub max_transfer_blocks: Option<NonZero<u32>>,
85}
86
87#[derive(Clone, Default)]
89pub struct PartitionInfo {
90 pub device_flags: fblock::DeviceFlag,
92 pub max_transfer_blocks: Option<NonZero<u32>>,
93 pub block_range: Option<Range<u64>>,
97 pub type_guid: [u8; 16],
98 pub instance_guid: [u8; 16],
99 pub name: String,
100 pub flags: u64,
101}
102
103struct ActiveRequest<S> {
106 session: S,
107 group_or_request: GroupOrRequest,
108 trace_flow_id: TraceFlowId,
109 _epoch_guard: EpochGuard<'static>,
110 status: zx::Status,
111 count: u32,
112 req_id: Option<u32>,
113 decompression_info: Option<DecompressionInfo>,
114}
115
116struct DecompressionInfo {
117 compressed_range: Range<usize>,
119
120 uncompressed_range: Range<u64>,
122
123 bytes_so_far: u64,
124 mapping: Arc<VmoMapping>,
125 buffer: Option<Buffer<'static>>,
126}
127
128impl DecompressionInfo {
129 fn uncompressed_slice(&self) -> *mut [u8] {
131 std::ptr::slice_from_raw_parts_mut(
132 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
133 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
134 )
135 }
136}
137
138pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
139
140impl<S> Default for ActiveRequests<S> {
141 fn default() -> Self {
142 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
143 }
144}
145
146impl<S> ActiveRequests<S> {
147 fn complete_and_take_response(
148 &self,
149 request_id: RequestId,
150 status: zx::Status,
151 ) -> Option<(S, BlockFifoResponse)> {
152 self.0.lock().complete_and_take_response(request_id, status)
153 }
154
155 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
156 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
157 }
158}
159
160struct ActiveRequestsInner<S> {
161 requests: Slab<ActiveRequest<S>>,
162}
163
164impl<S> ActiveRequestsInner<S> {
166 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
168 let group = &mut self.requests[request_id.0];
169
170 group.count = group.count.checked_sub(1).unwrap();
171 if status != zx::Status::OK && group.status == zx::Status::OK {
172 group.status = status
173 }
174
175 fuchsia_trace::duration!(
176 "storage",
177 "block_server::finish_transaction",
178 "request_id" => request_id.0,
179 "group_completed" => group.count == 0,
180 "status" => status.into_raw());
181 if let Some(trace_flow_id) = group.trace_flow_id {
182 fuchsia_trace::flow_step!(
183 "storage",
184 "block_server::finish_request",
185 trace_flow_id.get().into()
186 );
187 }
188
189 if group.count == 0
190 && group.status == zx::Status::OK
191 && let Some(info) = &mut group.decompression_info
192 {
193 thread_local! {
194 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
195 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
196 }
197 DECOMPRESSOR.with(|decompressor| {
198 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
200 let mut decompressor = decompressor.borrow_mut();
201 if let Err(error) = decompressor.decompress_to_buffer(
202 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
203 target_slice,
204 ) {
205 log::warn!(error:?; "Decompression error");
206 group.status = zx::Status::IO_DATA_INTEGRITY;
207 };
208 });
209 }
210 }
211
212 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
214 let group = &self.requests[request_id.0];
215 match group.req_id {
216 Some(reqid) if group.count == 0 => {
217 let group = self.requests.remove(request_id.0);
218 Some((
219 group.session,
220 BlockFifoResponse {
221 status: group.status.into_raw(),
222 reqid,
223 group: group.group_or_request.group_id().unwrap_or(0),
224 ..Default::default()
225 },
226 ))
227 }
228 _ => None,
229 }
230 }
231
232 fn complete_and_take_response(
234 &mut self,
235 request_id: RequestId,
236 status: zx::Status,
237 ) -> Option<(S, BlockFifoResponse)> {
238 self.complete(request_id, status);
239 self.take_response(request_id)
240 }
241}
242
243pub struct BlockServer<SM: SessionManager> {
246 block_size: u32,
247 orchestrator: Arc<SM::Orchestrator>,
248}
249
250#[derive(Debug)]
252pub struct BlockOffsetMapping {
253 source_block_offset: u64,
254 target_block_offset: u64,
255 length: u64,
256}
257
258impl BlockOffsetMapping {
259 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
260 blocks.0 >= self.source_block_offset
261 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
262 }
263}
264
265impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
266 type Error = zx::Status;
267
268 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
269 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
270 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271 Ok(Self {
272 source_block_offset: wire.source_block_offset,
273 target_block_offset: wire.target_block_offset,
274 length: wire.length,
275 })
276 }
277}
278
279pub struct OffsetMap {
281 mapping: Option<BlockOffsetMapping>,
282 max_transfer_blocks: Option<NonZero<u32>>,
283}
284
285impl OffsetMap {
286 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
288 Self { mapping: Some(mapping), max_transfer_blocks }
289 }
290
291 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
293 Self { mapping: None, max_transfer_blocks }
294 }
295
296 pub fn is_empty(&self) -> bool {
297 self.mapping.is_none()
298 }
299
300 fn mapping(&self) -> Option<&BlockOffsetMapping> {
301 self.mapping.as_ref()
302 }
303
304 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
305 self.max_transfer_blocks
306 }
307}
308
309pub trait SessionManager: 'static {
312 type Orchestrator: Borrow<Self> + Send + Sync;
317
318 const SUPPORTS_DECOMPRESSION: bool;
319
320 type Session;
321
322 fn on_attach_vmo(
323 orchestrator: Arc<Self::Orchestrator>,
324 vmo: &Arc<zx::Vmo>,
325 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
326
327 fn open_session(
332 orchestrator: Arc<Self::Orchestrator>,
333 stream: fblock::SessionRequestStream,
334 offset_map: OffsetMap,
335 block_size: u32,
336 ) -> impl Future<Output = Result<(), Error>> + Send;
337
338 fn get_info(&self) -> Cow<'_, DeviceInfo>;
340
341 fn get_volume_info(
343 &self,
344 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
345 {
346 async { Err(zx::Status::NOT_SUPPORTED) }
347 }
348
349 fn query_slices(
351 &self,
352 _start_slices: &[u64],
353 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
354 async { Err(zx::Status::NOT_SUPPORTED) }
355 }
356
357 fn extend(
359 &self,
360 _start_slice: u64,
361 _slice_count: u64,
362 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363 async { Err(zx::Status::NOT_SUPPORTED) }
364 }
365
366 fn shrink(
368 &self,
369 _start_slice: u64,
370 _slice_count: u64,
371 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
372 async { Err(zx::Status::NOT_SUPPORTED) }
373 }
374
375 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
377}
378
379pub trait IntoOrchestrator {
383 type SM: SessionManager;
384
385 fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
386}
387
388impl<SM: SessionManager> BlockServer<SM> {
389 pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
390 Self { block_size, orchestrator: orchestrator.into_orchestrator() }
391 }
392
393 pub fn session_manager(&self) -> &SM {
394 self.orchestrator.as_ref().borrow()
395 }
396
397 pub async fn handle_requests(
399 &self,
400 mut requests: fblock::BlockRequestStream,
401 ) -> Result<(), Error> {
402 let scope = fasync::Scope::new();
403 loop {
404 match requests.try_next().await {
405 Ok(Some(request)) => {
406 if let Some(session) = self.handle_request(request).await? {
407 scope.spawn(session.map(|_| ()));
408 }
409 }
410 Ok(None) => break,
411 Err(err) => log::warn!(err:?; "Invalid request"),
412 }
413 }
414 scope.await;
415 Ok(())
416 }
417
418 async fn handle_request(
421 &self,
422 request: fblock::BlockRequest,
423 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
424 match request {
425 fblock::BlockRequest::GetInfo { responder } => {
426 let info = self.device_info();
427 let max_transfer_size = info.max_transfer_size(self.block_size);
428 let (block_count, mut flags) = match info.as_ref() {
429 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
430 (*block_count, *device_flags)
431 }
432 DeviceInfo::Partition(partition_info) => {
433 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
434 range.end - range.start
435 } else {
436 let volume_info = self.session_manager().get_volume_info().await?;
437 volume_info.0.slice_size * volume_info.1.partition_slice_count
438 / self.block_size as u64
439 };
440 (block_count, partition_info.device_flags)
441 }
442 };
443 if SM::SUPPORTS_DECOMPRESSION {
444 flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
445 }
446 responder.send(Ok(&fblock::BlockInfo {
447 block_count,
448 block_size: self.block_size,
449 max_transfer_size,
450 flags,
451 }))?;
452 }
453 fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
454 let info = self.device_info();
455 return Ok(Some(SM::open_session(
456 self.orchestrator.clone(),
457 session.into_stream(),
458 OffsetMap::empty(info.max_transfer_blocks()),
459 self.block_size,
460 )));
461 }
462 fblock::BlockRequest::OpenSessionWithOffsetMap {
463 session,
464 mapping,
465 control_handle: _,
466 } => {
467 let info = self.device_info();
468 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
469 Ok(m) => m,
470 Err(status) => {
471 session.close_with_epitaph(status)?;
472 return Ok(None);
473 }
474 };
475 if let Some(max) = info.block_count() {
476 if initial_mapping.target_block_offset + initial_mapping.length > max {
477 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
478 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
479 return Ok(None);
480 }
481 }
482 return Ok(Some(SM::open_session(
483 self.orchestrator.clone(),
484 session.into_stream(),
485 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
486 self.block_size,
487 )));
488 }
489 fblock::BlockRequest::GetTypeGuid { responder } => {
490 let info = self.device_info();
491 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
492 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
493 guid.value.copy_from_slice(&partition_info.type_guid);
494 responder.send(zx::sys::ZX_OK, Some(&guid))?;
495 } else {
496 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497 }
498 }
499 fblock::BlockRequest::GetInstanceGuid { responder } => {
500 let info = self.device_info();
501 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
502 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503 guid.value.copy_from_slice(&partition_info.instance_guid);
504 responder.send(zx::sys::ZX_OK, Some(&guid))?;
505 } else {
506 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
507 }
508 }
509 fblock::BlockRequest::GetName { responder } => {
510 let info = self.device_info();
511 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
512 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
513 } else {
514 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
515 }
516 }
517 fblock::BlockRequest::GetMetadata { responder } => {
518 let info = self.device_info();
519 if let DeviceInfo::Partition(info) = info.as_ref() {
520 let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
521 type_guid.value.copy_from_slice(&info.type_guid);
522 let mut instance_guid =
523 fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
524 instance_guid.value.copy_from_slice(&info.instance_guid);
525 responder.send(Ok(&fblock::BlockGetMetadataResponse {
526 name: Some(info.name.clone()),
527 type_guid: Some(type_guid),
528 instance_guid: Some(instance_guid),
529 start_block_offset: info.block_range.as_ref().map(|range| range.start),
530 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
531 flags: Some(info.flags),
532 ..Default::default()
533 }))?;
534 } else {
535 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
536 }
537 }
538 fblock::BlockRequest::QuerySlices { responder, start_slices } => {
539 match self.session_manager().query_slices(&start_slices).await {
540 Ok(mut results) => {
541 let results_len = results.len();
542 assert!(results_len <= 16);
543 results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
544 responder.send(
545 zx::sys::ZX_OK,
546 &results.try_into().unwrap(),
547 results_len as u64,
548 )?;
549 }
550 Err(s) => {
551 responder.send(
552 s.into_raw(),
553 &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
554 0,
555 )?;
556 }
557 }
558 }
559 fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
560 match self.session_manager().get_volume_info().await {
561 Ok((manager_info, volume_info)) => {
562 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
563 }
564 Err(s) => responder.send(s.into_raw(), None, None)?,
565 }
566 }
567 fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
568 responder.send(
569 zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
570 .into_raw(),
571 )?;
572 }
573 fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
574 responder.send(
575 zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
576 .into_raw(),
577 )?;
578 }
579 fblock::BlockRequest::Destroy { responder, .. } => {
580 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
581 }
582 }
583 Ok(None)
584 }
585
586 fn device_info(&self) -> Cow<'_, DeviceInfo> {
587 self.session_manager().get_info()
588 }
589}
590
591struct SessionHelper<SM: SessionManager> {
592 orchestrator: Arc<SM::Orchestrator>,
593 offset_map: OffsetMap,
594 block_size: u32,
595 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
596 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
597}
598
599struct VmoMapping {
600 base: usize,
601 size: usize,
602}
603
604impl VmoMapping {
605 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
606 let size = vmo.get_size().unwrap() as usize;
607 Ok(Arc::new(Self {
608 base: fuchsia_runtime::vmar_root_self()
609 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
610 .inspect_err(|error| {
611 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
612 })?,
613 size,
614 }))
615 }
616}
617
618impl Drop for VmoMapping {
619 fn drop(&mut self) {
620 unsafe {
622 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
623 }
624 }
625}
626
627enum HandleRequestResult {
628 Ok,
630 Closed(Box<dyn FnOnce() + Send + 'static>),
634}
635
636impl<SM: SessionManager> SessionHelper<SM> {
637 fn new(
638 orchestrator: Arc<SM::Orchestrator>,
639 offset_map: OffsetMap,
640 block_size: u32,
641 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
642 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
643 Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
644 }
645
646 fn session_manager(&self) -> &SM {
647 self.orchestrator.as_ref().borrow()
648 }
649
650 async fn handle_request(
651 &self,
652 request: fblock::SessionRequest,
653 ) -> Result<HandleRequestResult, Error> {
654 match request {
655 fblock::SessionRequest::GetFifo { responder } => {
656 let rights = zx::Rights::TRANSFER
657 | zx::Rights::READ
658 | zx::Rights::WRITE
659 | zx::Rights::SIGNAL
660 | zx::Rights::WAIT;
661 match self.peer_fifo.duplicate_handle(rights) {
662 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
663 Err(s) => responder.send(Err(s.into_raw()))?,
664 }
665 Ok(HandleRequestResult::Ok)
666 }
667 fblock::SessionRequest::AttachVmo { vmo, responder } => {
668 let vmo = Arc::new(vmo);
669 let vmo_id = {
670 let mut vmos = self.vmos.lock();
671 if vmos.len() == u16::MAX as usize {
672 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
673 return Ok(HandleRequestResult::Ok);
674 } else {
675 let vmo_id = match vmos.last_entry() {
676 None => 1,
677 Some(o) => {
678 o.key().checked_add(1).unwrap_or_else(|| {
679 let mut vmo_id = 1;
680 for (&id, _) in &*vmos {
682 if id > vmo_id {
683 break;
684 }
685 vmo_id = id + 1;
686 }
687 vmo_id
688 })
689 }
690 };
691 vmos.insert(vmo_id, (vmo.clone(), None));
692 vmo_id
693 }
694 };
695 SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
696 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
697 Ok(HandleRequestResult::Ok)
698 }
699 fblock::SessionRequest::Close { responder } => {
700 Ok(HandleRequestResult::Closed(Box::new(move || {
701 if let Err(err) = responder.send(Ok(())) {
702 log::warn!(err:?; "Error sending close response");
703 }
704 })))
705 }
706 }
707 }
708
709 fn decode_fifo_request(
711 &self,
712 session: SM::Session,
713 request: &BlockFifoRequest,
714 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
715 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
716
717 let request_bytes = request.length as u64 * self.block_size as u64;
718
719 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
720 .ok_or(zx::Status::INVALID_ARGS)
721 .and_then(|code| {
722 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
723 if code != BlockOpcode::Read {
724 return Err(zx::Status::INVALID_ARGS);
725 }
726 if !SM::SUPPORTS_DECOMPRESSION {
727 return Err(zx::Status::NOT_SUPPORTED);
728 }
729 }
730 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
731 if request.length == 0 {
732 return Err(zx::Status::INVALID_ARGS);
733 }
734 if request.dev_offset.checked_add(request.length as u64).is_none()
736 || (code != BlockOpcode::Trim
737 && request_bytes.checked_add(request.vmo_offset).is_none())
738 {
739 return Err(zx::Status::OUT_OF_RANGE);
740 }
741 }
742 Ok(match code {
743 BlockOpcode::Read => Operation::Read {
744 device_block_offset: request.dev_offset,
745 block_count: request.length,
746 _unused: 0,
747 vmo_offset: request
748 .vmo_offset
749 .checked_mul(self.block_size as u64)
750 .ok_or(zx::Status::OUT_OF_RANGE)?,
751 options: ReadOptions {
752 inline_crypto: InlineCryptoOptions {
753 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
754 dun: request.dun,
755 slot: request.slot,
756 },
757 },
758 },
759 BlockOpcode::Write => {
760 let mut options = WriteOptions {
761 inline_crypto: InlineCryptoOptions {
762 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
763 dun: request.dun,
764 slot: request.slot,
765 },
766 ..WriteOptions::default()
767 };
768 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
769 options.flags |= WriteFlags::FORCE_ACCESS;
770 }
771 if flags.contains(BlockIoFlag::PRE_BARRIER) {
772 options.flags |= WriteFlags::PRE_BARRIER;
773 }
774 Operation::Write {
775 device_block_offset: request.dev_offset,
776 block_count: request.length,
777 _unused: 0,
778 options,
779 vmo_offset: request
780 .vmo_offset
781 .checked_mul(self.block_size as u64)
782 .ok_or(zx::Status::OUT_OF_RANGE)?,
783 }
784 }
785 BlockOpcode::Flush => Operation::Flush,
786 BlockOpcode::Trim => Operation::Trim {
787 device_block_offset: request.dev_offset,
788 block_count: request.length,
789 },
790 BlockOpcode::CloseVmo => Operation::CloseVmo,
791 })
792 });
793
794 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
795 GroupOrRequest::Group(request.group)
796 } else {
797 GroupOrRequest::Request(request.reqid)
798 };
799
800 let mut active_requests = self.session_manager().active_requests().0.lock();
801 let mut request_id = None;
802
803 if group_or_request.is_group() {
814 for (key, group) in &mut active_requests.requests {
818 if group.group_or_request == group_or_request {
819 if group.req_id.is_some() {
820 if group.status == zx::Status::OK {
822 group.status = zx::Status::INVALID_ARGS;
823 }
824 return Err(None);
826 }
827 if group.status == zx::Status::OK
829 && let Some(info) = &mut group.decompression_info
830 {
831 if let Ok(Operation::Read {
832 device_block_offset,
833 mut block_count,
834 options,
835 vmo_offset: 0,
836 ..
837 }) = operation
838 {
839 let remaining_bytes = info
840 .compressed_range
841 .end
842 .next_multiple_of(self.block_size as usize)
843 as u64
844 - info.bytes_so_far;
845 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
846 || request.total_compressed_bytes != 0
847 || request.uncompressed_bytes != 0
848 || request.compressed_prefix_bytes != 0
849 || (flags.contains(BlockIoFlag::GROUP_LAST)
850 && info.bytes_so_far + request_bytes
851 < info.compressed_range.end as u64)
852 || (!flags.contains(BlockIoFlag::GROUP_LAST)
853 && request_bytes >= remaining_bytes)
854 {
855 group.status = zx::Status::INVALID_ARGS;
856 } else {
857 if request_bytes > remaining_bytes {
866 block_count = (remaining_bytes / self.block_size as u64) as u32;
867 }
868
869 operation = Ok(Operation::ContinueDecompressedRead {
870 offset: info.bytes_so_far,
871 device_block_offset,
872 block_count,
873 options,
874 });
875
876 info.bytes_so_far += block_count as u64 * self.block_size as u64;
877 }
878 } else {
879 group.status = zx::Status::INVALID_ARGS;
880 }
881 }
882 if flags.contains(BlockIoFlag::GROUP_LAST) {
883 group.req_id = Some(request.reqid);
884 if group.status != zx::Status::OK {
887 operation = Err(group.status);
888 }
889 } else if group.status != zx::Status::OK {
890 return Err(None);
893 }
894 request_id = Some(RequestId(key));
895 group.count += 1;
896 break;
897 }
898 }
899 }
900
901 let is_single_request =
902 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
903
904 let mut decompression_info = None;
905 let vmo = match operation {
906 Ok(Operation::Read {
907 device_block_offset,
908 mut block_count,
909 options,
910 vmo_offset,
911 ..
912 }) => match self.vmos.lock().get_mut(&request.vmoid) {
913 Some((vmo, mapping)) => {
914 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
915 let compressed_range = request.compressed_prefix_bytes as usize
916 ..request.compressed_prefix_bytes as usize
917 + request.total_compressed_bytes as usize;
918 let required_buffer_size =
919 compressed_range.end.next_multiple_of(self.block_size as usize);
920
921 if compressed_range.start >= compressed_range.end
923 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
924 || (is_single_request && request_bytes < compressed_range.end as u64)
925 || (!is_single_request && request_bytes >= required_buffer_size as u64)
926 {
927 Err(zx::Status::INVALID_ARGS)
928 } else {
929 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
936 block_count =
937 (required_buffer_size / self.block_size as usize) as u32;
938 required_buffer_size as u64
939 } else {
940 request_bytes
941 };
942
943 match mapping {
945 Some(mapping) => Ok(mapping.clone()),
946 None => {
947 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
948 }
949 }
950 .and_then(|mapping| {
951 if vmo_offset
954 .checked_add(request.uncompressed_bytes as u64)
955 .is_some_and(|end| end <= mapping.size as u64)
956 {
957 Ok(mapping)
958 } else {
959 Err(zx::Status::OUT_OF_RANGE)
960 }
961 })
962 .map(|mapping| {
963 operation = Ok(Operation::StartDecompressedRead {
968 required_buffer_size,
969 device_block_offset,
970 block_count,
971 options,
972 });
973 decompression_info = Some(DecompressionInfo {
976 compressed_range,
977 bytes_so_far,
978 mapping,
979 uncompressed_range: vmo_offset
980 ..vmo_offset + request.uncompressed_bytes as u64,
981 buffer: None,
982 });
983 None
984 })
985 }
986 } else {
987 Ok(Some(vmo.clone()))
988 }
989 }
990 None => Err(zx::Status::IO),
991 },
992 Ok(Operation::Write { .. }) => self
993 .vmos
994 .lock()
995 .get(&request.vmoid)
996 .cloned()
997 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
998 Ok(Operation::CloseVmo) => {
999 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
1000 let vmo_clone = vmo.clone();
1001 Epoch::global().defer(move || drop(vmo_clone));
1004 Ok(Some(vmo))
1005 })
1006 }
1007 _ => Ok(None),
1008 }
1009 .unwrap_or_else(|e| {
1010 operation = Err(e);
1011 None
1012 });
1013
1014 let trace_flow_id = NonZero::new(request.trace_flow_id);
1015 let request_id = request_id.unwrap_or_else(|| {
1016 RequestId(active_requests.requests.insert(ActiveRequest {
1017 session,
1018 group_or_request,
1019 trace_flow_id,
1020 _epoch_guard: Epoch::global().guard(),
1021 status: zx::Status::OK,
1022 count: 1,
1023 req_id: is_single_request.then_some(request.reqid),
1024 decompression_info,
1025 }))
1026 });
1027
1028 Ok(DecodedRequest {
1029 request_id,
1030 trace_flow_id,
1031 operation: operation.map_err(|status| {
1032 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1033 })?,
1034 vmo,
1035 })
1036 }
1037
1038 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1039 std::mem::take(&mut *self.vmos.lock())
1040 }
1041
1042 fn map_request(
1044 &self,
1045 mut request: DecodedRequest,
1046 active_request: &mut ActiveRequest<SM::Session>,
1047 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1048 if active_request.status != zx::Status::OK {
1049 return Err(zx::Status::BAD_STATE);
1050 }
1051 let mapping = self.offset_map.mapping();
1052 match (mapping, request.operation.blocks()) {
1053 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1054 return Err(zx::Status::OUT_OF_RANGE);
1055 }
1056 _ => {}
1057 }
1058 let remainder = request.operation.map(
1059 self.offset_map.mapping(),
1060 self.offset_map.max_transfer_blocks(),
1061 self.block_size,
1062 );
1063 if remainder.is_some() {
1064 active_request.count += 1;
1065 }
1066 static CACHE: AtomicU64 = AtomicU64::new(0);
1067 if let Some(context) =
1068 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1069 {
1070 use fuchsia_trace::ArgValue;
1071 let trace_args = [
1072 ArgValue::of("request_id", request.request_id.0),
1073 ArgValue::of("opcode", request.operation.trace_label()),
1074 ];
1075 let _scope =
1076 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1077 if let Some(trace_flow_id) = active_request.trace_flow_id {
1078 fuchsia_trace::flow_step(
1079 &context,
1080 "block_server::start_transaction",
1081 trace_flow_id.get().into(),
1082 &[],
1083 );
1084 }
1085 }
1086 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1087 Ok((request, remainder))
1088 }
1089
1090 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1095 self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1096 }
1097
1098 fn close_active_groups(&self, pred: impl Fn(&SM::Session) -> bool) {
1111 self.session_manager().active_requests().0.lock().requests.retain(|_, request| {
1112 if !pred(&request.session) || request.req_id.is_some() {
1113 return true;
1114 }
1115 request.req_id = Some(u32::MAX);
1118 request.count > 0
1119 });
1120 }
1121}
1122
1123#[repr(transparent)]
1124#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1125pub struct RequestId(usize);
1126
1127#[derive(Clone, Debug)]
1128struct DecodedRequest {
1129 request_id: RequestId,
1130 trace_flow_id: TraceFlowId,
1131 operation: Operation,
1132 vmo: Option<Arc<zx::Vmo>>,
1133}
1134
1135pub type WriteFlags = block_protocol::WriteFlags;
1137pub type WriteOptions = block_protocol::WriteOptions;
1138pub type ReadOptions = block_protocol::ReadOptions;
1139pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1140
1141#[repr(C)]
1142#[derive(Clone, Debug, PartialEq, Eq)]
1143pub enum Operation {
1144 Read {
1149 device_block_offset: u64,
1150 block_count: u32,
1151 _unused: u32,
1152 vmo_offset: u64,
1153 options: ReadOptions,
1154 },
1155 Write {
1156 device_block_offset: u64,
1157 block_count: u32,
1158 _unused: u32,
1159 vmo_offset: u64,
1160 options: WriteOptions,
1161 },
1162 Flush,
1163 Trim {
1164 device_block_offset: u64,
1165 block_count: u32,
1166 },
1167 CloseVmo,
1169 StartDecompressedRead {
1171 required_buffer_size: usize,
1172 device_block_offset: u64,
1173 block_count: u32,
1174 options: ReadOptions,
1175 },
1176 ContinueDecompressedRead {
1178 offset: u64,
1179 device_block_offset: u64,
1180 block_count: u32,
1181 options: ReadOptions,
1182 },
1183}
1184
1185impl Operation {
1186 fn trace_label(&self) -> &'static str {
1187 match self {
1188 Operation::Read { .. } => "read",
1189 Operation::Write { .. } => "write",
1190 Operation::Flush { .. } => "flush",
1191 Operation::Trim { .. } => "trim",
1192 Operation::CloseVmo { .. } => "close_vmo",
1193 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1194 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1195 }
1196 }
1197
1198 fn blocks(&self) -> Option<(u64, u32)> {
1200 match self {
1201 Operation::Read { device_block_offset, block_count, .. }
1202 | Operation::Write { device_block_offset, block_count, .. }
1203 | Operation::Trim { device_block_offset, block_count, .. } => {
1204 Some((*device_block_offset, *block_count))
1205 }
1206 _ => None,
1207 }
1208 }
1209
1210 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1212 match self {
1213 Operation::Read { device_block_offset, block_count, .. }
1214 | Operation::Write { device_block_offset, block_count, .. }
1215 | Operation::Trim { device_block_offset, block_count, .. } => {
1216 Some((device_block_offset, block_count))
1217 }
1218 _ => None,
1219 }
1220 }
1221
1222 fn map(
1225 &mut self,
1226 mapping: Option<&BlockOffsetMapping>,
1227 max_blocks: Option<NonZero<u32>>,
1228 block_size: u32,
1229 ) -> Option<Self> {
1230 let mut max = match self {
1231 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1232 _ => None,
1233 };
1234 let (offset, length) = self.blocks_mut()?;
1235 let orig_offset = *offset;
1236 if let Some(mapping) = mapping {
1237 let delta = *offset - mapping.source_block_offset;
1238 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1239 *offset = mapping.target_block_offset + delta;
1240 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1241 max = match max {
1242 None => Some(mapping_max),
1243 Some(m) => Some(std::cmp::min(m, mapping_max)),
1244 };
1245 };
1246 if let Some(max) = max {
1247 if *length as u64 > max {
1248 let rem = (*length as u64 - max) as u32;
1249 *length = max as u32;
1250 return Some(match self {
1251 Operation::Read {
1252 device_block_offset: _,
1253 block_count: _,
1254 vmo_offset,
1255 _unused,
1256 options,
1257 } => {
1258 let mut options = *options;
1259 options.inline_crypto.dun += max as u32;
1260 Operation::Read {
1261 device_block_offset: orig_offset + max,
1262 block_count: rem,
1263 vmo_offset: *vmo_offset + max * block_size as u64,
1264 _unused: *_unused,
1265 options: options,
1266 }
1267 }
1268 Operation::Write {
1269 device_block_offset: _,
1270 block_count: _,
1271 _unused,
1272 vmo_offset,
1273 options,
1274 } => {
1275 let mut options = *options;
1276 options.inline_crypto.dun += max as u32;
1277 Operation::Write {
1278 device_block_offset: orig_offset + max,
1279 block_count: rem,
1280 _unused: *_unused,
1281 vmo_offset: *vmo_offset + max * block_size as u64,
1282 options: options,
1283 }
1284 }
1285 Operation::Trim { device_block_offset: _, block_count: _ } => {
1286 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1287 }
1288 _ => unreachable!(),
1289 });
1290 }
1291 }
1292 None
1293 }
1294
1295 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1297 if let Operation::Write { options, .. } = self {
1298 options.flags.contains(value)
1299 } else {
1300 false
1301 }
1302 }
1303
1304 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1306 if let Operation::Write { options, .. } = self {
1307 let result = options.flags.contains(value);
1308 options.flags.remove(value);
1309 result
1310 } else {
1311 false
1312 }
1313 }
1314}
1315
1316#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1317pub enum GroupOrRequest {
1318 Group(u16),
1319 Request(u32),
1320}
1321
1322impl GroupOrRequest {
1323 fn is_group(&self) -> bool {
1324 matches!(self, Self::Group(_))
1325 }
1326
1327 fn group_id(&self) -> Option<u16> {
1328 match self {
1329 Self::Group(id) => Some(*id),
1330 Self::Request(_) => None,
1331 }
1332 }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use super::{
1338 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1339 TraceFlowId,
1340 };
1341 use assert_matches::assert_matches;
1342 use block_protocol::{
1343 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1344 WriteFlags, WriteOptions,
1345 };
1346 use fidl_fuchsia_storage_block as fblock;
1347 use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1348 use fuchsia_async as fasync;
1349 use fuchsia_sync::Mutex;
1350 use futures::FutureExt as _;
1351 use futures::channel::oneshot;
1352 use futures::future::BoxFuture;
1353 use std::borrow::Cow;
1354 use std::future::poll_fn;
1355 use std::num::NonZero;
1356 use std::pin::pin;
1357 use std::sync::Arc;
1358 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1359 use std::task::{Context, Poll};
1360
1361 #[derive(Default)]
1362 struct MockInterface {
1363 read_hook: Option<
1364 Box<
1365 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1366 + Send
1367 + Sync,
1368 >,
1369 >,
1370 write_hook:
1371 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1372 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1373 }
1374
1375 impl super::async_interface::Interface for MockInterface {
1376 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1377 Ok(())
1378 }
1379
1380 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1381 Cow::Owned(test_device_info())
1382 }
1383
1384 async fn read(
1385 &self,
1386 device_block_offset: u64,
1387 block_count: u32,
1388 vmo: &Arc<zx::Vmo>,
1389 vmo_offset: u64,
1390 _opts: ReadOptions,
1391 _trace_flow_id: TraceFlowId,
1392 ) -> Result<(), zx::Status> {
1393 if let Some(read_hook) = &self.read_hook {
1394 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1395 } else {
1396 unimplemented!();
1397 }
1398 }
1399
1400 async fn write(
1401 &self,
1402 device_block_offset: u64,
1403 _block_count: u32,
1404 _vmo: &Arc<zx::Vmo>,
1405 _vmo_offset: u64,
1406 opts: WriteOptions,
1407 _trace_flow_id: TraceFlowId,
1408 ) -> Result<(), zx::Status> {
1409 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1410 && let Some(barrier_hook) = &self.barrier_hook
1411 {
1412 barrier_hook()?;
1413 }
1414 if let Some(write_hook) = &self.write_hook {
1415 write_hook(device_block_offset).await
1416 } else {
1417 unimplemented!();
1418 }
1419 }
1420
1421 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1422 Ok(())
1423 }
1424
1425 async fn trim(
1426 &self,
1427 _device_block_offset: u64,
1428 _block_count: u32,
1429 _trace_flow_id: TraceFlowId,
1430 ) -> Result<(), zx::Status> {
1431 unreachable!();
1432 }
1433
1434 async fn get_volume_info(
1435 &self,
1436 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1437 let () = std::future::pending().await;
1439 unreachable!();
1440 }
1441 }
1442
1443 const BLOCK_SIZE: u32 = 512;
1444 const MAX_TRANSFER_BLOCKS: u32 = 10;
1445
1446 fn test_device_info() -> DeviceInfo {
1447 DeviceInfo::Partition(PartitionInfo {
1448 device_flags: fblock::DeviceFlag::READONLY
1449 | fblock::DeviceFlag::BARRIER_SUPPORT
1450 | fblock::DeviceFlag::FUA_SUPPORT,
1451 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1452 block_range: Some(0..100),
1453 type_guid: [1; 16],
1454 instance_guid: [2; 16],
1455 name: "foo".to_string(),
1456 flags: 0xabcd,
1457 })
1458 }
1459
1460 #[fuchsia::test]
1461 async fn test_barriers_ordering() {
1462 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1463 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1464 let barrier_called = Arc::new(AtomicBool::new(false));
1465
1466 futures::join!(
1467 async move {
1468 let barrier_called_clone = barrier_called.clone();
1469 let block_server = BlockServer::new(
1470 BLOCK_SIZE,
1471 Arc::new(MockInterface {
1472 barrier_hook: Some(Box::new(move || {
1473 barrier_called.store(true, Ordering::Relaxed);
1474 Ok(())
1475 })),
1476 write_hook: Some(Box::new(move |device_block_offset| {
1477 let barrier_called = barrier_called_clone.clone();
1478 Box::pin(async move {
1479 if device_block_offset % 2 == 0 {
1481 fasync::Timer::new(fasync::MonotonicInstant::after(
1482 zx::MonotonicDuration::from_millis(200),
1483 ))
1484 .await;
1485 }
1486 assert!(barrier_called.load(Ordering::Relaxed));
1487 Ok(())
1488 })
1489 })),
1490 ..MockInterface::default()
1491 }),
1492 );
1493 block_server.handle_requests(stream).await.unwrap();
1494 },
1495 async move {
1496 let (session_proxy, server) = fidl::endpoints::create_proxy();
1497
1498 proxy.open_session(server).unwrap();
1499
1500 let vmo_id = session_proxy
1501 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1502 .await
1503 .unwrap()
1504 .unwrap();
1505 assert_ne!(vmo_id.id, 0);
1506
1507 let mut fifo =
1508 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1509 let (mut reader, mut writer) = fifo.async_io();
1510
1511 writer
1512 .write_entries(&BlockFifoRequest {
1513 command: BlockFifoCommand {
1514 opcode: BlockOpcode::Write.into_primitive(),
1515 flags: BlockIoFlag::PRE_BARRIER.bits(),
1516 ..Default::default()
1517 },
1518 vmoid: vmo_id.id,
1519 dev_offset: 0,
1520 length: 5,
1521 vmo_offset: 6,
1522 ..Default::default()
1523 })
1524 .await
1525 .unwrap();
1526
1527 for i in 0..10 {
1528 writer
1529 .write_entries(&BlockFifoRequest {
1530 command: BlockFifoCommand {
1531 opcode: BlockOpcode::Write.into_primitive(),
1532 ..Default::default()
1533 },
1534 vmoid: vmo_id.id,
1535 dev_offset: i + 1,
1536 length: 5,
1537 vmo_offset: 6,
1538 ..Default::default()
1539 })
1540 .await
1541 .unwrap();
1542 }
1543 for _ in 0..11 {
1544 let mut response = BlockFifoResponse::default();
1545 reader.read_entries(&mut response).await.unwrap();
1546 assert_eq!(response.status, zx::sys::ZX_OK);
1547 }
1548
1549 std::mem::drop(proxy);
1550 }
1551 );
1552 }
1553
1554 #[fuchsia::test]
1555 async fn test_info() {
1556 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1557
1558 futures::join!(
1559 async {
1560 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1561 block_server.handle_requests(stream).await.unwrap();
1562 },
1563 async {
1564 let expected_info = test_device_info();
1565 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1566 info
1567 } else {
1568 unreachable!()
1569 };
1570
1571 let block_info = proxy.get_info().await.unwrap().unwrap();
1572 assert_eq!(
1573 block_info.block_count,
1574 partition_info.block_range.as_ref().unwrap().end
1575 - partition_info.block_range.as_ref().unwrap().start
1576 );
1577 assert_eq!(
1578 block_info.flags,
1579 fblock::DeviceFlag::READONLY
1580 | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1581 | fblock::DeviceFlag::BARRIER_SUPPORT
1582 | fblock::DeviceFlag::FUA_SUPPORT
1583 );
1584
1585 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1586
1587 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1588 assert_eq!(status, zx::sys::ZX_OK);
1589 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1590
1591 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1592 assert_eq!(status, zx::sys::ZX_OK);
1593 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1594
1595 let (status, name) = proxy.get_name().await.unwrap();
1596 assert_eq!(status, zx::sys::ZX_OK);
1597 assert_eq!(name.as_ref(), Some(&partition_info.name));
1598
1599 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1600 assert_eq!(metadata.name, name);
1601 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1602 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1603 assert_eq!(
1604 metadata.start_block_offset,
1605 Some(partition_info.block_range.as_ref().unwrap().start)
1606 );
1607 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1608 assert_eq!(metadata.flags, Some(partition_info.flags));
1609
1610 std::mem::drop(proxy);
1611 }
1612 );
1613 }
1614
1615 #[fuchsia::test]
1616 async fn test_attach_vmo() {
1617 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1618
1619 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1620 let koid = vmo.koid().unwrap();
1621
1622 futures::join!(
1623 async {
1624 let block_server = BlockServer::new(
1625 BLOCK_SIZE,
1626 Arc::new(MockInterface {
1627 read_hook: Some(Box::new(move |_, _, vmo, _| {
1628 assert_eq!(vmo.koid().unwrap(), koid);
1629 Box::pin(async { Ok(()) })
1630 })),
1631 ..MockInterface::default()
1632 }),
1633 );
1634 block_server.handle_requests(stream).await.unwrap();
1635 },
1636 async move {
1637 let (session_proxy, server) = fidl::endpoints::create_proxy();
1638
1639 proxy.open_session(server).unwrap();
1640
1641 let vmo_id = session_proxy
1642 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1643 .await
1644 .unwrap()
1645 .unwrap();
1646 assert_ne!(vmo_id.id, 0);
1647
1648 let mut fifo =
1649 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1650 let (mut reader, mut writer) = fifo.async_io();
1651
1652 let mut count = 1;
1654 loop {
1655 match session_proxy
1656 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1657 .await
1658 .unwrap()
1659 {
1660 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1661 Err(e) => {
1662 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1663 break;
1664 }
1665 }
1666
1667 if count % 10 == 0 {
1669 writer
1670 .write_entries(&BlockFifoRequest {
1671 command: BlockFifoCommand {
1672 opcode: BlockOpcode::Read.into_primitive(),
1673 ..Default::default()
1674 },
1675 vmoid: vmo_id.id,
1676 length: 1,
1677 ..Default::default()
1678 })
1679 .await
1680 .unwrap();
1681
1682 let mut response = BlockFifoResponse::default();
1683 reader.read_entries(&mut response).await.unwrap();
1684 assert_eq!(response.status, zx::sys::ZX_OK);
1685 }
1686
1687 count += 1;
1688 }
1689
1690 assert_eq!(count, u16::MAX as u64);
1691
1692 writer
1694 .write_entries(&BlockFifoRequest {
1695 command: BlockFifoCommand {
1696 opcode: BlockOpcode::CloseVmo.into_primitive(),
1697 ..Default::default()
1698 },
1699 vmoid: vmo_id.id,
1700 ..Default::default()
1701 })
1702 .await
1703 .unwrap();
1704
1705 let mut response = BlockFifoResponse::default();
1706 reader.read_entries(&mut response).await.unwrap();
1707 assert_eq!(response.status, zx::sys::ZX_OK);
1708
1709 let new_vmo_id = session_proxy
1710 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1711 .await
1712 .unwrap()
1713 .unwrap();
1714 assert_eq!(new_vmo_id.id, vmo_id.id);
1716
1717 std::mem::drop(proxy);
1718 }
1719 );
1720 }
1721
1722 #[fuchsia::test]
1723 async fn test_close() {
1724 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1725
1726 let mut server = std::pin::pin!(
1727 async {
1728 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1729 block_server.handle_requests(stream).await.unwrap();
1730 }
1731 .fuse()
1732 );
1733
1734 let mut client = std::pin::pin!(
1735 async {
1736 let (session_proxy, server) = fidl::endpoints::create_proxy();
1737
1738 proxy.open_session(server).unwrap();
1739
1740 std::mem::drop(proxy);
1743
1744 session_proxy.close().await.unwrap().unwrap();
1745
1746 let _: () = std::future::pending().await;
1748 }
1749 .fuse()
1750 );
1751
1752 futures::select!(
1753 _ = server => {}
1754 _ = client => unreachable!(),
1755 );
1756 }
1757
1758 #[derive(Default)]
1759 struct IoMockInterface {
1760 do_checks: bool,
1761 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1762 return_errors: bool,
1763 }
1764
1765 #[derive(Debug)]
1766 enum ExpectedOp {
1767 Read(u64, u32, u64),
1768 Write(u64, u32, u64),
1769 Trim(u64, u32),
1770 Flush,
1771 }
1772
1773 impl super::async_interface::Interface for IoMockInterface {
1774 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1775 Ok(())
1776 }
1777
1778 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1779 Cow::Owned(test_device_info())
1780 }
1781
1782 async fn read(
1783 &self,
1784 device_block_offset: u64,
1785 block_count: u32,
1786 _vmo: &Arc<zx::Vmo>,
1787 vmo_offset: u64,
1788 _opts: ReadOptions,
1789 _trace_flow_id: TraceFlowId,
1790 ) -> Result<(), zx::Status> {
1791 if self.return_errors {
1792 Err(zx::Status::INTERNAL)
1793 } else {
1794 if self.do_checks {
1795 assert_matches!(
1796 self.expected_op.lock().take(),
1797 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1798 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1799 "Read {device_block_offset} {block_count} {vmo_offset}"
1800 );
1801 }
1802 Ok(())
1803 }
1804 }
1805
1806 async fn write(
1807 &self,
1808 device_block_offset: u64,
1809 block_count: u32,
1810 _vmo: &Arc<zx::Vmo>,
1811 vmo_offset: u64,
1812 _write_opts: WriteOptions,
1813 _trace_flow_id: TraceFlowId,
1814 ) -> Result<(), zx::Status> {
1815 if self.return_errors {
1816 Err(zx::Status::NOT_SUPPORTED)
1817 } else {
1818 if self.do_checks {
1819 assert_matches!(
1820 self.expected_op.lock().take(),
1821 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1822 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1823 "Write {device_block_offset} {block_count} {vmo_offset}"
1824 );
1825 }
1826 Ok(())
1827 }
1828 }
1829
1830 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1831 if self.return_errors {
1832 Err(zx::Status::NO_RESOURCES)
1833 } else {
1834 if self.do_checks {
1835 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1836 }
1837 Ok(())
1838 }
1839 }
1840
1841 async fn trim(
1842 &self,
1843 device_block_offset: u64,
1844 block_count: u32,
1845 _trace_flow_id: TraceFlowId,
1846 ) -> Result<(), zx::Status> {
1847 if self.return_errors {
1848 Err(zx::Status::NO_MEMORY)
1849 } else {
1850 if self.do_checks {
1851 assert_matches!(
1852 self.expected_op.lock().take(),
1853 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1854 block_count == b,
1855 "Trim {device_block_offset} {block_count}"
1856 );
1857 }
1858 Ok(())
1859 }
1860 }
1861 }
1862
1863 #[fuchsia::test]
1864 async fn test_io() {
1865 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1866
1867 let expected_op = Arc::new(Mutex::new(None));
1868 let expected_op_clone = expected_op.clone();
1869
1870 let server = async {
1871 let block_server = BlockServer::new(
1872 BLOCK_SIZE,
1873 Arc::new(IoMockInterface {
1874 return_errors: false,
1875 do_checks: true,
1876 expected_op: expected_op_clone,
1877 }),
1878 );
1879 block_server.handle_requests(stream).await.unwrap();
1880 };
1881
1882 let client = async move {
1883 let (session_proxy, server) = fidl::endpoints::create_proxy();
1884
1885 proxy.open_session(server).unwrap();
1886
1887 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1888 let vmo_id = session_proxy
1889 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1890 .await
1891 .unwrap()
1892 .unwrap();
1893
1894 let mut fifo =
1895 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1896 let (mut reader, mut writer) = fifo.async_io();
1897
1898 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1900 writer
1901 .write_entries(&BlockFifoRequest {
1902 command: BlockFifoCommand {
1903 opcode: BlockOpcode::Read.into_primitive(),
1904 ..Default::default()
1905 },
1906 vmoid: vmo_id.id,
1907 dev_offset: 1,
1908 length: 2,
1909 vmo_offset: 3,
1910 ..Default::default()
1911 })
1912 .await
1913 .unwrap();
1914
1915 let mut response = BlockFifoResponse::default();
1916 reader.read_entries(&mut response).await.unwrap();
1917 assert_eq!(response.status, zx::sys::ZX_OK);
1918
1919 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1921 writer
1922 .write_entries(&BlockFifoRequest {
1923 command: BlockFifoCommand {
1924 opcode: BlockOpcode::Write.into_primitive(),
1925 ..Default::default()
1926 },
1927 vmoid: vmo_id.id,
1928 dev_offset: 4,
1929 length: 5,
1930 vmo_offset: 6,
1931 ..Default::default()
1932 })
1933 .await
1934 .unwrap();
1935
1936 let mut response = BlockFifoResponse::default();
1937 reader.read_entries(&mut response).await.unwrap();
1938 assert_eq!(response.status, zx::sys::ZX_OK);
1939
1940 *expected_op.lock() = Some(ExpectedOp::Flush);
1942 writer
1943 .write_entries(&BlockFifoRequest {
1944 command: BlockFifoCommand {
1945 opcode: BlockOpcode::Flush.into_primitive(),
1946 ..Default::default()
1947 },
1948 ..Default::default()
1949 })
1950 .await
1951 .unwrap();
1952
1953 reader.read_entries(&mut response).await.unwrap();
1954 assert_eq!(response.status, zx::sys::ZX_OK);
1955
1956 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1958 writer
1959 .write_entries(&BlockFifoRequest {
1960 command: BlockFifoCommand {
1961 opcode: BlockOpcode::Trim.into_primitive(),
1962 ..Default::default()
1963 },
1964 dev_offset: 7,
1965 length: 8,
1966 ..Default::default()
1967 })
1968 .await
1969 .unwrap();
1970
1971 reader.read_entries(&mut response).await.unwrap();
1972 assert_eq!(response.status, zx::sys::ZX_OK);
1973
1974 std::mem::drop(proxy);
1975 };
1976
1977 futures::join!(server, client);
1978 }
1979
1980 #[fuchsia::test]
1981 async fn test_io_errors() {
1982 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1983
1984 futures::join!(
1985 async {
1986 let block_server = BlockServer::new(
1987 BLOCK_SIZE,
1988 Arc::new(IoMockInterface {
1989 return_errors: true,
1990 do_checks: false,
1991 expected_op: Arc::new(Mutex::new(None)),
1992 }),
1993 );
1994 block_server.handle_requests(stream).await.unwrap();
1995 },
1996 async move {
1997 let (session_proxy, server) = fidl::endpoints::create_proxy();
1998
1999 proxy.open_session(server).unwrap();
2000
2001 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2002 let vmo_id = session_proxy
2003 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2004 .await
2005 .unwrap()
2006 .unwrap();
2007
2008 let mut fifo =
2009 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2010 let (mut reader, mut writer) = fifo.async_io();
2011
2012 writer
2014 .write_entries(&BlockFifoRequest {
2015 command: BlockFifoCommand {
2016 opcode: BlockOpcode::Read.into_primitive(),
2017 ..Default::default()
2018 },
2019 vmoid: vmo_id.id,
2020 length: 1,
2021 reqid: 1,
2022 ..Default::default()
2023 })
2024 .await
2025 .unwrap();
2026
2027 let mut response = BlockFifoResponse::default();
2028 reader.read_entries(&mut response).await.unwrap();
2029 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2030
2031 writer
2033 .write_entries(&BlockFifoRequest {
2034 command: BlockFifoCommand {
2035 opcode: BlockOpcode::Write.into_primitive(),
2036 ..Default::default()
2037 },
2038 vmoid: vmo_id.id,
2039 length: 1,
2040 reqid: 2,
2041 ..Default::default()
2042 })
2043 .await
2044 .unwrap();
2045
2046 reader.read_entries(&mut response).await.unwrap();
2047 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2048
2049 writer
2051 .write_entries(&BlockFifoRequest {
2052 command: BlockFifoCommand {
2053 opcode: BlockOpcode::Flush.into_primitive(),
2054 ..Default::default()
2055 },
2056 reqid: 3,
2057 ..Default::default()
2058 })
2059 .await
2060 .unwrap();
2061
2062 reader.read_entries(&mut response).await.unwrap();
2063 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2064
2065 writer
2067 .write_entries(&BlockFifoRequest {
2068 command: BlockFifoCommand {
2069 opcode: BlockOpcode::Trim.into_primitive(),
2070 ..Default::default()
2071 },
2072 reqid: 4,
2073 length: 1,
2074 ..Default::default()
2075 })
2076 .await
2077 .unwrap();
2078
2079 reader.read_entries(&mut response).await.unwrap();
2080 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2081
2082 std::mem::drop(proxy);
2083 }
2084 );
2085 }
2086
2087 #[fuchsia::test]
2088 async fn test_invalid_args() {
2089 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2090
2091 futures::join!(
2092 async {
2093 let block_server = BlockServer::new(
2094 BLOCK_SIZE,
2095 Arc::new(IoMockInterface {
2096 return_errors: false,
2097 do_checks: false,
2098 expected_op: Arc::new(Mutex::new(None)),
2099 }),
2100 );
2101 block_server.handle_requests(stream).await.unwrap();
2102 },
2103 async move {
2104 let (session_proxy, server) = fidl::endpoints::create_proxy();
2105
2106 proxy.open_session(server).unwrap();
2107
2108 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2109 let vmo_id = session_proxy
2110 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2111 .await
2112 .unwrap()
2113 .unwrap();
2114
2115 let mut fifo =
2116 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2117
2118 async fn test(
2119 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2120 request: BlockFifoRequest,
2121 ) -> Result<(), zx::Status> {
2122 let (mut reader, mut writer) = fifo.async_io();
2123 writer.write_entries(&request).await.unwrap();
2124 let mut response = BlockFifoResponse::default();
2125 reader.read_entries(&mut response).await.unwrap();
2126 zx::Status::ok(response.status)
2127 }
2128
2129 let good_read_request = || BlockFifoRequest {
2132 command: BlockFifoCommand {
2133 opcode: BlockOpcode::Read.into_primitive(),
2134 ..Default::default()
2135 },
2136 length: 1,
2137 vmoid: vmo_id.id,
2138 ..Default::default()
2139 };
2140
2141 assert_eq!(
2142 test(
2143 &mut fifo,
2144 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2145 )
2146 .await,
2147 Err(zx::Status::IO)
2148 );
2149
2150 assert_eq!(
2151 test(
2152 &mut fifo,
2153 BlockFifoRequest {
2154 vmo_offset: 0xffff_ffff_ffff_ffff,
2155 ..good_read_request()
2156 }
2157 )
2158 .await,
2159 Err(zx::Status::OUT_OF_RANGE)
2160 );
2161
2162 assert_eq!(
2163 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2164 Err(zx::Status::INVALID_ARGS)
2165 );
2166
2167 let good_write_request = || BlockFifoRequest {
2170 command: BlockFifoCommand {
2171 opcode: BlockOpcode::Write.into_primitive(),
2172 ..Default::default()
2173 },
2174 length: 1,
2175 vmoid: vmo_id.id,
2176 ..Default::default()
2177 };
2178
2179 assert_eq!(
2180 test(
2181 &mut fifo,
2182 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2183 )
2184 .await,
2185 Err(zx::Status::IO)
2186 );
2187
2188 assert_eq!(
2189 test(
2190 &mut fifo,
2191 BlockFifoRequest {
2192 vmo_offset: 0xffff_ffff_ffff_ffff,
2193 ..good_write_request()
2194 }
2195 )
2196 .await,
2197 Err(zx::Status::OUT_OF_RANGE)
2198 );
2199
2200 assert_eq!(
2201 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2202 Err(zx::Status::INVALID_ARGS)
2203 );
2204
2205 assert_eq!(
2208 test(
2209 &mut fifo,
2210 BlockFifoRequest {
2211 command: BlockFifoCommand {
2212 opcode: BlockOpcode::CloseVmo.into_primitive(),
2213 ..Default::default()
2214 },
2215 vmoid: vmo_id.id + 1,
2216 ..Default::default()
2217 }
2218 )
2219 .await,
2220 Err(zx::Status::IO)
2221 );
2222
2223 std::mem::drop(proxy);
2224 }
2225 );
2226 }
2227
2228 #[fuchsia::test]
2229 async fn test_concurrent_requests() {
2230 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2231
2232 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2233 let waiting_readers_clone = waiting_readers.clone();
2234
2235 futures::join!(
2236 async move {
2237 let block_server = BlockServer::new(
2238 BLOCK_SIZE,
2239 Arc::new(MockInterface {
2240 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2241 let (tx, rx) = oneshot::channel();
2242 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2243 Box::pin(async move {
2244 let _ = rx.await;
2245 Ok(())
2246 })
2247 })),
2248 ..MockInterface::default()
2249 }),
2250 );
2251 block_server.handle_requests(stream).await.unwrap();
2252 },
2253 async move {
2254 let (session_proxy, server) = fidl::endpoints::create_proxy();
2255
2256 proxy.open_session(server).unwrap();
2257
2258 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2259 let vmo_id = session_proxy
2260 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2261 .await
2262 .unwrap()
2263 .unwrap();
2264
2265 let mut fifo =
2266 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2267 let (mut reader, mut writer) = fifo.async_io();
2268
2269 writer
2270 .write_entries(&BlockFifoRequest {
2271 command: BlockFifoCommand {
2272 opcode: BlockOpcode::Read.into_primitive(),
2273 ..Default::default()
2274 },
2275 reqid: 1,
2276 dev_offset: 1, vmoid: vmo_id.id,
2278 length: 1,
2279 ..Default::default()
2280 })
2281 .await
2282 .unwrap();
2283
2284 writer
2285 .write_entries(&BlockFifoRequest {
2286 command: BlockFifoCommand {
2287 opcode: BlockOpcode::Read.into_primitive(),
2288 ..Default::default()
2289 },
2290 reqid: 2,
2291 dev_offset: 2,
2292 vmoid: vmo_id.id,
2293 length: 1,
2294 ..Default::default()
2295 })
2296 .await
2297 .unwrap();
2298
2299 poll_fn(|cx: &mut Context<'_>| {
2301 if waiting_readers.lock().len() == 2 {
2302 Poll::Ready(())
2303 } else {
2304 cx.waker().wake_by_ref();
2306 Poll::Pending
2307 }
2308 })
2309 .await;
2310
2311 let mut response = BlockFifoResponse::default();
2312 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2313
2314 let (id, tx) = waiting_readers.lock().pop().unwrap();
2315 tx.send(()).unwrap();
2316
2317 reader.read_entries(&mut response).await.unwrap();
2318 assert_eq!(response.status, zx::sys::ZX_OK);
2319 assert_eq!(response.reqid, id);
2320
2321 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2322
2323 let (id, tx) = waiting_readers.lock().pop().unwrap();
2324 tx.send(()).unwrap();
2325
2326 reader.read_entries(&mut response).await.unwrap();
2327 assert_eq!(response.status, zx::sys::ZX_OK);
2328 assert_eq!(response.reqid, id);
2329 }
2330 );
2331 }
2332
2333 #[fuchsia::test]
2334 async fn test_session_close_is_synchronous() {
2335 use futures::{FutureExt as _, StreamExt as _};
2336
2337 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2338
2339 let (start_tx, mut start_rx) = futures::channel::mpsc::channel(1);
2340 let (finish_tx, finish_rx) = futures::channel::oneshot::channel();
2341 let finish_rx = Arc::new(Mutex::new(Some(finish_rx)));
2342
2343 futures::join!(
2344 async move {
2345 let block_server = BlockServer::new(
2346 BLOCK_SIZE,
2347 Arc::new(MockInterface {
2348 read_hook: Some(Box::new(move |_, _, _, _| {
2349 let mut start_tx = start_tx.clone();
2350 let finish_rx = finish_rx.lock().take().unwrap();
2351 Box::pin(async move {
2352 start_tx.try_send(()).unwrap();
2353 let _ = finish_rx.await;
2354 Ok(())
2355 })
2356 })),
2357 ..MockInterface::default()
2358 }),
2359 );
2360 block_server.handle_requests(stream).await.unwrap();
2361 },
2362 async move {
2363 let (session_proxy, server) = fidl::endpoints::create_proxy();
2364 proxy.open_session(server).unwrap();
2365
2366 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2367 let vmo_id = session_proxy
2368 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2369 .await
2370 .unwrap()
2371 .unwrap();
2372
2373 let mut fifo = fasync::Fifo::<BlockFifoResponse, BlockFifoRequest>::from_fifo(
2374 session_proxy.get_fifo().await.unwrap().unwrap(),
2375 );
2376 let (_reader, mut writer) = fifo.async_io();
2377
2378 writer
2379 .write_entries(&BlockFifoRequest {
2380 command: BlockFifoCommand {
2381 opcode: BlockOpcode::Read.into_primitive(),
2382 ..Default::default()
2383 },
2384 reqid: 1,
2385 vmoid: vmo_id.id,
2386 length: 1,
2387 ..Default::default()
2388 })
2389 .await
2390 .unwrap();
2391
2392 start_rx.next().await.unwrap();
2394
2395 let mut close_fut = std::pin::pin!(session_proxy.close().fuse());
2397 let mut timer_fut = std::pin::pin!(
2398 fasync::Timer::new(std::time::Duration::from_millis(100)).fuse()
2399 );
2400 futures::select! {
2401 res = close_fut => panic!("close completed too early: {:?}", res),
2402 _ = timer_fut => {}
2403 }
2404
2405 finish_tx.send(()).unwrap();
2407
2408 close_fut.await.unwrap().unwrap();
2410
2411 std::mem::drop(proxy);
2412 }
2413 );
2414 }
2415
2416 #[fuchsia::test]
2417 async fn test_groups() {
2418 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2419
2420 futures::join!(
2421 async move {
2422 let block_server = BlockServer::new(
2423 BLOCK_SIZE,
2424 Arc::new(MockInterface {
2425 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2426 ..MockInterface::default()
2427 }),
2428 );
2429 block_server.handle_requests(stream).await.unwrap();
2430 },
2431 async move {
2432 let (session_proxy, server) = fidl::endpoints::create_proxy();
2433
2434 proxy.open_session(server).unwrap();
2435
2436 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2437 let vmo_id = session_proxy
2438 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2439 .await
2440 .unwrap()
2441 .unwrap();
2442
2443 let mut fifo =
2444 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2445 let (mut reader, mut writer) = fifo.async_io();
2446
2447 writer
2448 .write_entries(&BlockFifoRequest {
2449 command: BlockFifoCommand {
2450 opcode: BlockOpcode::Read.into_primitive(),
2451 flags: BlockIoFlag::GROUP_ITEM.bits(),
2452 ..Default::default()
2453 },
2454 group: 1,
2455 vmoid: vmo_id.id,
2456 length: 1,
2457 ..Default::default()
2458 })
2459 .await
2460 .unwrap();
2461
2462 writer
2463 .write_entries(&BlockFifoRequest {
2464 command: BlockFifoCommand {
2465 opcode: BlockOpcode::Read.into_primitive(),
2466 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2467 ..Default::default()
2468 },
2469 reqid: 2,
2470 group: 1,
2471 vmoid: vmo_id.id,
2472 length: 1,
2473 ..Default::default()
2474 })
2475 .await
2476 .unwrap();
2477
2478 let mut response = BlockFifoResponse::default();
2479 reader.read_entries(&mut response).await.unwrap();
2480 assert_eq!(response.status, zx::sys::ZX_OK);
2481 assert_eq!(response.reqid, 2);
2482 assert_eq!(response.group, 1);
2483 }
2484 );
2485 }
2486
2487 #[fuchsia::test]
2488 async fn test_group_error() {
2489 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2490
2491 let counter = Arc::new(AtomicU64::new(0));
2492 let counter_clone = counter.clone();
2493
2494 futures::join!(
2495 async move {
2496 let block_server = BlockServer::new(
2497 BLOCK_SIZE,
2498 Arc::new(MockInterface {
2499 read_hook: Some(Box::new(move |_, _, _, _| {
2500 counter_clone.fetch_add(1, Ordering::Relaxed);
2501 Box::pin(async { Err(zx::Status::BAD_STATE) })
2502 })),
2503 ..MockInterface::default()
2504 }),
2505 );
2506 block_server.handle_requests(stream).await.unwrap();
2507 },
2508 async move {
2509 let (session_proxy, server) = fidl::endpoints::create_proxy();
2510
2511 proxy.open_session(server).unwrap();
2512
2513 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2514 let vmo_id = session_proxy
2515 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2516 .await
2517 .unwrap()
2518 .unwrap();
2519
2520 let mut fifo =
2521 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2522 let (mut reader, mut writer) = fifo.async_io();
2523
2524 writer
2525 .write_entries(&BlockFifoRequest {
2526 command: BlockFifoCommand {
2527 opcode: BlockOpcode::Read.into_primitive(),
2528 flags: BlockIoFlag::GROUP_ITEM.bits(),
2529 ..Default::default()
2530 },
2531 group: 1,
2532 vmoid: vmo_id.id,
2533 length: 1,
2534 ..Default::default()
2535 })
2536 .await
2537 .unwrap();
2538
2539 poll_fn(|cx: &mut Context<'_>| {
2541 if counter.load(Ordering::Relaxed) == 1 {
2542 Poll::Ready(())
2543 } else {
2544 cx.waker().wake_by_ref();
2546 Poll::Pending
2547 }
2548 })
2549 .await;
2550
2551 let mut response = BlockFifoResponse::default();
2552 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2553
2554 writer
2555 .write_entries(&BlockFifoRequest {
2556 command: BlockFifoCommand {
2557 opcode: BlockOpcode::Read.into_primitive(),
2558 flags: BlockIoFlag::GROUP_ITEM.bits(),
2559 ..Default::default()
2560 },
2561 group: 1,
2562 vmoid: vmo_id.id,
2563 length: 1,
2564 ..Default::default()
2565 })
2566 .await
2567 .unwrap();
2568
2569 writer
2570 .write_entries(&BlockFifoRequest {
2571 command: BlockFifoCommand {
2572 opcode: BlockOpcode::Read.into_primitive(),
2573 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2574 ..Default::default()
2575 },
2576 reqid: 2,
2577 group: 1,
2578 vmoid: vmo_id.id,
2579 length: 1,
2580 ..Default::default()
2581 })
2582 .await
2583 .unwrap();
2584
2585 reader.read_entries(&mut response).await.unwrap();
2586 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2587 assert_eq!(response.reqid, 2);
2588 assert_eq!(response.group, 1);
2589
2590 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2591
2592 assert_eq!(counter.load(Ordering::Relaxed), 1);
2594 }
2595 );
2596 }
2597
2598 #[fuchsia::test]
2599 async fn test_group_with_two_lasts() {
2600 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2601
2602 let (tx, rx) = oneshot::channel();
2603
2604 futures::join!(
2605 async move {
2606 let rx = Mutex::new(Some(rx));
2607 let block_server = BlockServer::new(
2608 BLOCK_SIZE,
2609 Arc::new(MockInterface {
2610 read_hook: Some(Box::new(move |_, _, _, _| {
2611 let rx = rx.lock().take().unwrap();
2612 Box::pin(async {
2613 let _ = rx.await;
2614 Ok(())
2615 })
2616 })),
2617 ..MockInterface::default()
2618 }),
2619 );
2620 block_server.handle_requests(stream).await.unwrap();
2621 },
2622 async move {
2623 let (session_proxy, server) = fidl::endpoints::create_proxy();
2624
2625 proxy.open_session(server).unwrap();
2626
2627 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2628 let vmo_id = session_proxy
2629 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2630 .await
2631 .unwrap()
2632 .unwrap();
2633
2634 let mut fifo =
2635 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2636 let (mut reader, mut writer) = fifo.async_io();
2637
2638 writer
2639 .write_entries(&BlockFifoRequest {
2640 command: BlockFifoCommand {
2641 opcode: BlockOpcode::Read.into_primitive(),
2642 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2643 ..Default::default()
2644 },
2645 reqid: 1,
2646 group: 1,
2647 vmoid: vmo_id.id,
2648 length: 1,
2649 ..Default::default()
2650 })
2651 .await
2652 .unwrap();
2653
2654 writer
2655 .write_entries(&BlockFifoRequest {
2656 command: BlockFifoCommand {
2657 opcode: BlockOpcode::Read.into_primitive(),
2658 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2659 ..Default::default()
2660 },
2661 reqid: 2,
2662 group: 1,
2663 vmoid: vmo_id.id,
2664 length: 1,
2665 ..Default::default()
2666 })
2667 .await
2668 .unwrap();
2669
2670 writer
2672 .write_entries(&BlockFifoRequest {
2673 command: BlockFifoCommand {
2674 opcode: BlockOpcode::CloseVmo.into_primitive(),
2675 ..Default::default()
2676 },
2677 reqid: 3,
2678 vmoid: vmo_id.id,
2679 ..Default::default()
2680 })
2681 .await
2682 .unwrap();
2683
2684 let mut response = BlockFifoResponse::default();
2686 reader.read_entries(&mut response).await.unwrap();
2687 assert_eq!(response.status, zx::sys::ZX_OK);
2688 assert_eq!(response.reqid, 3);
2689
2690 tx.send(()).unwrap();
2692
2693 let mut response = BlockFifoResponse::default();
2696 reader.read_entries(&mut response).await.unwrap();
2697 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2698 assert_eq!(response.reqid, 1);
2699 assert_eq!(response.group, 1);
2700 }
2701 );
2702 }
2703
2704 #[fuchsia::test(allow_stalls = false)]
2705 async fn test_requests_dont_block_sessions() {
2706 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2707
2708 let (tx, rx) = oneshot::channel();
2709
2710 fasync::Task::local(async move {
2711 let rx = Mutex::new(Some(rx));
2712 let block_server = BlockServer::new(
2713 BLOCK_SIZE,
2714 Arc::new(MockInterface {
2715 read_hook: Some(Box::new(move |_, _, _, _| {
2716 let rx = rx.lock().take().unwrap();
2717 Box::pin(async {
2718 let _ = rx.await;
2719 Ok(())
2720 })
2721 })),
2722 ..MockInterface::default()
2723 }),
2724 );
2725 block_server.handle_requests(stream).await.unwrap();
2726 })
2727 .detach();
2728
2729 let mut fut = pin!(async {
2730 let (session_proxy, server) = fidl::endpoints::create_proxy();
2731
2732 proxy.open_session(server).unwrap();
2733
2734 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2735 let vmo_id = session_proxy
2736 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2737 .await
2738 .unwrap()
2739 .unwrap();
2740
2741 let mut fifo =
2742 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2743 let (mut reader, mut writer) = fifo.async_io();
2744
2745 writer
2746 .write_entries(&BlockFifoRequest {
2747 command: BlockFifoCommand {
2748 opcode: BlockOpcode::Read.into_primitive(),
2749 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2750 ..Default::default()
2751 },
2752 reqid: 1,
2753 group: 1,
2754 vmoid: vmo_id.id,
2755 length: 1,
2756 ..Default::default()
2757 })
2758 .await
2759 .unwrap();
2760
2761 let mut response = BlockFifoResponse::default();
2762 reader.read_entries(&mut response).await.unwrap();
2763 assert_eq!(response.status, zx::sys::ZX_OK);
2764 });
2765
2766 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2768
2769 let mut fut2 = pin!(proxy.get_volume_info());
2770
2771 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2773
2774 let _ = tx.send(());
2777
2778 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2779 }
2780
2781 #[fuchsia::test]
2782 async fn test_request_flow_control() {
2783 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2784
2785 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2788 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2789 let event_clone = event.clone();
2790 futures::join!(
2791 async move {
2792 let block_server = BlockServer::new(
2793 BLOCK_SIZE,
2794 Arc::new(MockInterface {
2795 read_hook: Some(Box::new(move |_, _, _, _| {
2796 let event_clone = event_clone.clone();
2797 Box::pin(async move {
2798 if !event_clone.1.load(Ordering::SeqCst) {
2799 event_clone.0.listen().await;
2800 }
2801 Ok(())
2802 })
2803 })),
2804 ..MockInterface::default()
2805 }),
2806 );
2807 block_server.handle_requests(stream).await.unwrap();
2808 },
2809 async move {
2810 let (session_proxy, server) = fidl::endpoints::create_proxy();
2811
2812 proxy.open_session(server).unwrap();
2813
2814 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2815 let vmo_id = session_proxy
2816 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2817 .await
2818 .unwrap()
2819 .unwrap();
2820
2821 let mut fifo =
2822 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2823 let (mut reader, mut writer) = fifo.async_io();
2824
2825 for i in 0..MAX_REQUESTS {
2826 writer
2827 .write_entries(&BlockFifoRequest {
2828 command: BlockFifoCommand {
2829 opcode: BlockOpcode::Read.into_primitive(),
2830 ..Default::default()
2831 },
2832 reqid: (i + 1) as u32,
2833 dev_offset: i,
2834 vmoid: vmo_id.id,
2835 length: 1,
2836 ..Default::default()
2837 })
2838 .await
2839 .unwrap();
2840 }
2841 assert!(
2842 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2843 command: BlockFifoCommand {
2844 opcode: BlockOpcode::Read.into_primitive(),
2845 ..Default::default()
2846 },
2847 reqid: u32::MAX,
2848 dev_offset: MAX_REQUESTS,
2849 vmoid: vmo_id.id,
2850 length: 1,
2851 ..Default::default()
2852 })))
2853 .is_pending()
2854 );
2855 event.1.store(true, Ordering::SeqCst);
2857 event.0.notify(usize::MAX);
2858 let mut finished_reqids = vec![];
2860 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2861 let mut response = BlockFifoResponse::default();
2862 reader.read_entries(&mut response).await.unwrap();
2863 assert_eq!(response.status, zx::sys::ZX_OK);
2864 finished_reqids.push(response.reqid);
2865 writer
2866 .write_entries(&BlockFifoRequest {
2867 command: BlockFifoCommand {
2868 opcode: BlockOpcode::Read.into_primitive(),
2869 ..Default::default()
2870 },
2871 reqid: (i + 1) as u32,
2872 dev_offset: i,
2873 vmoid: vmo_id.id,
2874 length: 1,
2875 ..Default::default()
2876 })
2877 .await
2878 .unwrap();
2879 }
2880 let mut response = BlockFifoResponse::default();
2881 for _ in 0..MAX_REQUESTS {
2882 reader.read_entries(&mut response).await.unwrap();
2883 assert_eq!(response.status, zx::sys::ZX_OK);
2884 finished_reqids.push(response.reqid);
2885 }
2886 finished_reqids.sort();
2889 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2890 let mut i = 1;
2891 for reqid in finished_reqids {
2892 assert_eq!(reqid, i);
2893 i += 1;
2894 }
2895 }
2896 );
2897 }
2898
2899 #[fuchsia::test]
2900 async fn test_passthrough_io_with_fixed_map() {
2901 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2902
2903 let expected_op = Arc::new(Mutex::new(None));
2904 let expected_op_clone = expected_op.clone();
2905 futures::join!(
2906 async {
2907 let block_server = BlockServer::new(
2908 BLOCK_SIZE,
2909 Arc::new(IoMockInterface {
2910 return_errors: false,
2911 do_checks: true,
2912 expected_op: expected_op_clone,
2913 }),
2914 );
2915 block_server.handle_requests(stream).await.unwrap();
2916 },
2917 async move {
2918 let (session_proxy, server) = fidl::endpoints::create_proxy();
2919
2920 let mapping = fblock::BlockOffsetMapping {
2921 source_block_offset: 0,
2922 target_block_offset: 10,
2923 length: 20,
2924 };
2925 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2926
2927 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2928 let vmo_id = session_proxy
2929 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2930 .await
2931 .unwrap()
2932 .unwrap();
2933
2934 let mut fifo =
2935 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2936 let (mut reader, mut writer) = fifo.async_io();
2937
2938 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2940 writer
2941 .write_entries(&BlockFifoRequest {
2942 command: BlockFifoCommand {
2943 opcode: BlockOpcode::Read.into_primitive(),
2944 ..Default::default()
2945 },
2946 vmoid: vmo_id.id,
2947 dev_offset: 1,
2948 length: 2,
2949 vmo_offset: 3,
2950 ..Default::default()
2951 })
2952 .await
2953 .unwrap();
2954
2955 let mut response = BlockFifoResponse::default();
2956 reader.read_entries(&mut response).await.unwrap();
2957 assert_eq!(response.status, zx::sys::ZX_OK);
2958
2959 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2961 writer
2962 .write_entries(&BlockFifoRequest {
2963 command: BlockFifoCommand {
2964 opcode: BlockOpcode::Write.into_primitive(),
2965 ..Default::default()
2966 },
2967 vmoid: vmo_id.id,
2968 dev_offset: 4,
2969 length: 5,
2970 vmo_offset: 6,
2971 ..Default::default()
2972 })
2973 .await
2974 .unwrap();
2975
2976 reader.read_entries(&mut response).await.unwrap();
2977 assert_eq!(response.status, zx::sys::ZX_OK);
2978
2979 *expected_op.lock() = Some(ExpectedOp::Flush);
2981 writer
2982 .write_entries(&BlockFifoRequest {
2983 command: BlockFifoCommand {
2984 opcode: BlockOpcode::Flush.into_primitive(),
2985 ..Default::default()
2986 },
2987 ..Default::default()
2988 })
2989 .await
2990 .unwrap();
2991
2992 reader.read_entries(&mut response).await.unwrap();
2993 assert_eq!(response.status, zx::sys::ZX_OK);
2994
2995 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2997 writer
2998 .write_entries(&BlockFifoRequest {
2999 command: BlockFifoCommand {
3000 opcode: BlockOpcode::Trim.into_primitive(),
3001 ..Default::default()
3002 },
3003 dev_offset: 7,
3004 length: 3,
3005 ..Default::default()
3006 })
3007 .await
3008 .unwrap();
3009
3010 reader.read_entries(&mut response).await.unwrap();
3011 assert_eq!(response.status, zx::sys::ZX_OK);
3012
3013 *expected_op.lock() = None;
3015 writer
3016 .write_entries(&BlockFifoRequest {
3017 command: BlockFifoCommand {
3018 opcode: BlockOpcode::Read.into_primitive(),
3019 ..Default::default()
3020 },
3021 vmoid: vmo_id.id,
3022 dev_offset: 19,
3023 length: 2,
3024 vmo_offset: 3,
3025 ..Default::default()
3026 })
3027 .await
3028 .unwrap();
3029
3030 reader.read_entries(&mut response).await.unwrap();
3031 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
3032
3033 std::mem::drop(proxy);
3034 }
3035 );
3036 }
3037
3038 #[fuchsia::test]
3039 fn operation_map() {
3040 const BLOCK_SIZE: u32 = 512;
3041
3042 #[track_caller]
3043 fn expect_map_result(
3044 mut operation: Operation,
3045 mapping: Option<BlockOffsetMapping>,
3046 max_blocks: Option<NonZero<u32>>,
3047 expected_operations: Vec<Operation>,
3048 ) {
3049 let mut ops = vec![];
3050 while let Some(remainder) =
3051 operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
3052 {
3053 ops.push(operation);
3054 operation = remainder;
3055 }
3056 ops.push(operation);
3057 assert_eq!(ops, expected_operations);
3058 }
3059
3060 expect_map_result(
3062 Operation::Read {
3063 device_block_offset: 10,
3064 block_count: 200,
3065 _unused: 0,
3066 vmo_offset: 0,
3067 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3068 },
3069 None,
3070 None,
3071 vec![Operation::Read {
3072 device_block_offset: 10,
3073 block_count: 200,
3074 _unused: 0,
3075 vmo_offset: 0,
3076 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3077 }],
3078 );
3079
3080 expect_map_result(
3082 Operation::Read {
3083 device_block_offset: 10,
3084 block_count: 200,
3085 _unused: 0,
3086 vmo_offset: 0,
3087 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3088 },
3089 None,
3090 NonZero::new(120),
3091 vec![
3092 Operation::Read {
3093 device_block_offset: 10,
3094 block_count: 120,
3095 _unused: 0,
3096 vmo_offset: 0,
3097 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3098 },
3099 Operation::Read {
3100 device_block_offset: 130,
3101 block_count: 80,
3102 _unused: 0,
3103 vmo_offset: 120 * BLOCK_SIZE as u64,
3104 options: ReadOptions {
3105 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3107 },
3108 },
3109 ],
3110 );
3111 expect_map_result(
3112 Operation::Trim { device_block_offset: 10, block_count: 200 },
3113 None,
3114 NonZero::new(120),
3115 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3116 );
3117
3118 expect_map_result(
3120 Operation::Read {
3121 device_block_offset: 10,
3122 block_count: 200,
3123 _unused: 0,
3124 vmo_offset: 0,
3125 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3126 },
3127 Some(BlockOffsetMapping {
3128 source_block_offset: 10,
3129 target_block_offset: 100,
3130 length: 200,
3131 }),
3132 NonZero::new(120),
3133 vec![
3134 Operation::Read {
3135 device_block_offset: 100,
3136 block_count: 120,
3137 _unused: 0,
3138 vmo_offset: 0,
3139 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3140 },
3141 Operation::Read {
3142 device_block_offset: 220,
3143 block_count: 80,
3144 _unused: 0,
3145 vmo_offset: 120 * BLOCK_SIZE as u64,
3146 options: ReadOptions {
3147 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3148 },
3149 },
3150 ],
3151 );
3152 expect_map_result(
3153 Operation::Trim { device_block_offset: 10, block_count: 200 },
3154 Some(BlockOffsetMapping {
3155 source_block_offset: 10,
3156 target_block_offset: 100,
3157 length: 200,
3158 }),
3159 NonZero::new(120),
3160 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3161 );
3162 }
3163
3164 #[fuchsia::test]
3166 async fn test_pre_barrier_flush_failure() {
3167 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3168
3169 struct NoBarrierInterface;
3170 impl super::async_interface::Interface for NoBarrierInterface {
3171 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3172 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3173 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3175 block_range: Some(0..100),
3176 type_guid: [0; 16],
3177 instance_guid: [0; 16],
3178 name: "test".to_string(),
3179 flags: 0,
3180 }))
3181 }
3182 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3183 Ok(())
3184 }
3185 async fn read(
3186 &self,
3187 _: u64,
3188 _: u32,
3189 _: &Arc<zx::Vmo>,
3190 _: u64,
3191 _: ReadOptions,
3192 _: TraceFlowId,
3193 ) -> Result<(), zx::Status> {
3194 unreachable!()
3195 }
3196 async fn write(
3197 &self,
3198 _: u64,
3199 _: u32,
3200 _: &Arc<zx::Vmo>,
3201 _: u64,
3202 _: WriteOptions,
3203 _: TraceFlowId,
3204 ) -> Result<(), zx::Status> {
3205 panic!("Write should not be called");
3206 }
3207 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3208 Err(zx::Status::IO)
3209 }
3210 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3211 unreachable!()
3212 }
3213 }
3214
3215 futures::join!(
3216 async move {
3217 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3218 block_server.handle_requests(stream).await.unwrap();
3219 },
3220 async move {
3221 let (session_proxy, server) = fidl::endpoints::create_proxy();
3222 proxy.open_session(server).unwrap();
3223 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3224 let vmo_id = session_proxy
3225 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3226 .await
3227 .unwrap()
3228 .unwrap();
3229
3230 let mut fifo =
3231 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3232 let (mut reader, mut writer) = fifo.async_io();
3233
3234 writer
3235 .write_entries(&BlockFifoRequest {
3236 command: BlockFifoCommand {
3237 opcode: BlockOpcode::Write.into_primitive(),
3238 flags: BlockIoFlag::PRE_BARRIER.bits(),
3239 ..Default::default()
3240 },
3241 vmoid: vmo_id.id,
3242 length: 1,
3243 ..Default::default()
3244 })
3245 .await
3246 .unwrap();
3247
3248 let mut response = BlockFifoResponse::default();
3249 reader.read_entries(&mut response).await.unwrap();
3250 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3251 }
3252 );
3253 }
3254
3255 #[fuchsia::test]
3258 async fn test_post_barrier_write_failure() {
3259 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3260
3261 struct NoBarrierInterface;
3262 impl super::async_interface::Interface for NoBarrierInterface {
3263 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3264 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3265 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3267 block_range: Some(0..100),
3268 type_guid: [0; 16],
3269 instance_guid: [0; 16],
3270 name: "test".to_string(),
3271 flags: 0,
3272 }))
3273 }
3274 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3275 Ok(())
3276 }
3277 async fn read(
3278 &self,
3279 _: u64,
3280 _: u32,
3281 _: &Arc<zx::Vmo>,
3282 _: u64,
3283 _: ReadOptions,
3284 _: TraceFlowId,
3285 ) -> Result<(), zx::Status> {
3286 unreachable!()
3287 }
3288 async fn write(
3289 &self,
3290 _: u64,
3291 _: u32,
3292 _: &Arc<zx::Vmo>,
3293 _: u64,
3294 _: WriteOptions,
3295 _: TraceFlowId,
3296 ) -> Result<(), zx::Status> {
3297 Err(zx::Status::IO)
3298 }
3299 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3300 panic!("Flush should not be called")
3301 }
3302 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3303 unreachable!()
3304 }
3305 }
3306
3307 futures::join!(
3308 async move {
3309 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3310 block_server.handle_requests(stream).await.unwrap();
3311 },
3312 async move {
3313 let (session_proxy, server) = fidl::endpoints::create_proxy();
3314 proxy.open_session(server).unwrap();
3315 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3316 let vmo_id = session_proxy
3317 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3318 .await
3319 .unwrap()
3320 .unwrap();
3321
3322 let mut fifo =
3323 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3324 let (mut reader, mut writer) = fifo.async_io();
3325
3326 writer
3327 .write_entries(&BlockFifoRequest {
3328 command: BlockFifoCommand {
3329 opcode: BlockOpcode::Write.into_primitive(),
3330 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3331 ..Default::default()
3332 },
3333 vmoid: vmo_id.id,
3334 length: 1,
3335 ..Default::default()
3336 })
3337 .await
3338 .unwrap();
3339
3340 let mut response = BlockFifoResponse::default();
3341 reader.read_entries(&mut response).await.unwrap();
3342 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3343 }
3344 );
3345 }
3346}