1use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _, TryStreamExt as _};
9use std::borrow::Cow;
10use std::collections::btree_map::Entry;
11use std::collections::BTreeMap;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::AtomicU64;
15use std::sync::Arc;
16use zx::HandleBased;
17use {
18 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
19 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
20};
21
22pub mod async_interface;
23pub mod c_interface;
24
25#[derive(Clone)]
26pub enum DeviceInfo {
27 Block(BlockInfo),
28 Partition(PartitionInfo),
29}
30
31#[derive(Clone)]
33pub struct BlockInfo {
34 pub device_flags: fblock::Flag,
35 pub block_count: u64,
36}
37
38#[derive(Clone)]
40pub struct PartitionInfo {
41 pub device_flags: fblock::Flag,
43 pub block_range: Option<Range<u64>>,
47 pub type_guid: [u8; 16],
48 pub instance_guid: [u8; 16],
49 pub name: String,
50 pub flags: u64,
51}
52
53struct FifoMessageGroup {
66 status: zx::Status,
67 count: u32,
68 req_id: Option<u32>,
69}
70
71impl FifoMessageGroup {
72 fn new() -> Self {
73 FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
74 }
75}
76
77#[derive(Default)]
78struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
79
80impl FifoMessageGroups {
82 fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
85 let mut map = self.0.lock();
86 let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
87 let group = o.get_mut();
88 if group.count == 1 {
89 if let Some(reqid) = group.req_id {
90 let status =
91 if group.status != zx::Status::OK { group.status } else { status }.into_raw();
92
93 o.remove();
94
95 return Some(BlockFifoResponse {
96 status,
97 reqid,
98 group: group_id,
99 ..Default::default()
100 });
101 }
102 }
103
104 group.count = group.count.checked_sub(1).unwrap();
105 if status != zx::Status::OK && group.status == zx::Status::OK {
106 group.status = status
107 }
108 None
109 }
110}
111
112pub struct BlockServer<SM> {
115 block_size: u32,
116 session_manager: Arc<SM>,
117}
118
119pub struct BlockOffsetMapping {
121 source_block_offset: u64,
122 target_block_offset: u64,
123 length: u64,
124}
125
126impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
127 type Error = zx::Status;
128
129 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
130 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
131 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
132 Ok(Self {
133 source_block_offset: wire.source_block_offset,
134 target_block_offset: wire.target_block_offset,
135 length: wire.length,
136 })
137 }
138}
139
140pub struct OffsetMap {
146 mapping: BlockOffsetMapping,
147}
148
149impl OffsetMap {
150 pub fn new(mapping: BlockOffsetMapping) -> Self {
151 Self { mapping }
152 }
153
154 pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
157 if length == 0 {
158 return None;
159 }
160 let end = dev_offset.checked_add(length)?;
161 if self.mapping.source_block_offset > dev_offset
162 || end > self.mapping.source_block_offset + self.mapping.length
163 {
164 return None;
165 }
166 let delta = dev_offset - self.mapping.source_block_offset;
167 Some(self.mapping.target_block_offset + delta)
168 }
169}
170
171pub trait SessionManager: 'static {
174 fn on_attach_vmo(
175 self: Arc<Self>,
176 vmo: &Arc<zx::Vmo>,
177 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
178
179 fn open_session(
184 self: Arc<Self>,
185 stream: fblock::SessionRequestStream,
186 offset_map: Option<OffsetMap>,
187 block_size: u32,
188 ) -> impl Future<Output = Result<(), Error>> + Send;
189
190 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
192
193 fn get_volume_info(
195 &self,
196 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
197 {
198 async { Err(zx::Status::NOT_SUPPORTED) }
199 }
200
201 fn query_slices(
203 &self,
204 _start_slices: &[u64],
205 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
206 async { Err(zx::Status::NOT_SUPPORTED) }
207 }
208
209 fn extend(
211 &self,
212 _start_slice: u64,
213 _slice_count: u64,
214 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
215 async { Err(zx::Status::NOT_SUPPORTED) }
216 }
217
218 fn shrink(
220 &self,
221 _start_slice: u64,
222 _slice_count: u64,
223 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
224 async { Err(zx::Status::NOT_SUPPORTED) }
225 }
226}
227
228pub trait IntoSessionManager {
229 type SM: SessionManager;
230
231 fn into_session_manager(self) -> Arc<Self::SM>;
232}
233
234impl<SM: SessionManager> BlockServer<SM> {
235 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
236 Self { block_size, session_manager: session_manager.into_session_manager() }
237 }
238
239 pub async fn handle_requests(
241 &self,
242 mut requests: fvolume::VolumeRequestStream,
243 ) -> Result<(), Error> {
244 let scope = fasync::Scope::new();
245 while let Some(request) = requests.try_next().await.unwrap() {
246 if let Some(session) = self.handle_request(request).await? {
247 scope.spawn(session.map(|_| ()));
248 }
249 }
250 scope.await;
251 Ok(())
252 }
253
254 async fn handle_request(
257 &self,
258 request: fvolume::VolumeRequest,
259 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
260 match request {
261 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
262 Ok(info) => {
263 let (block_count, flags) = match info.as_ref() {
264 DeviceInfo::Block(BlockInfo { block_count, device_flags }) => {
265 (*block_count, *device_flags)
266 }
267 DeviceInfo::Partition(partition_info) => {
268 let block_count = if let Some(range) =
269 partition_info.block_range.as_ref()
270 {
271 range.end - range.start
272 } else {
273 let volume_info = self.session_manager.get_volume_info().await?;
274 volume_info.0.slice_size * volume_info.1.partition_slice_count
275 / self.block_size as u64
276 };
277 (block_count, partition_info.device_flags)
278 }
279 };
280 responder.send(Ok(&fblock::BlockInfo {
281 block_count,
282 block_size: self.block_size,
283 max_transfer_size: fblock::MAX_TRANSFER_UNBOUNDED,
284 flags,
285 }))?;
286 }
287 Err(status) => responder.send(Err(status.into_raw()))?,
288 },
289 fvolume::VolumeRequest::GetStats { clear: _, responder } => {
290 responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
292 }
293 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
294 return Ok(Some(self.session_manager.clone().open_session(
295 session.into_stream(),
296 None,
297 self.block_size,
298 )));
299 }
300 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
301 session,
302 offset_map,
303 initial_mappings,
304 control_handle: _,
305 } => {
306 if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
307 session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
311 return Ok(None);
312 }
313 let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
314 Ok(m) => m,
315 Err(status) => {
316 session.close_with_epitaph(status)?;
317 return Ok(None);
318 }
319 };
320 let offset_map = OffsetMap::new(initial_mapping);
321 return Ok(Some(self.session_manager.clone().open_session(
322 session.into_stream(),
323 Some(offset_map),
324 self.block_size,
325 )));
326 }
327 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
328 Ok(info) => {
329 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
330 let mut guid =
331 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
332 guid.value.copy_from_slice(&partition_info.type_guid);
333 responder.send(zx::sys::ZX_OK, Some(&guid))?;
334 } else {
335 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
336 }
337 }
338 Err(status) => {
339 responder.send(status.into_raw(), None)?;
340 }
341 },
342 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
343 match self.device_info().await {
344 Ok(info) => {
345 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
346 let mut guid =
347 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
348 guid.value.copy_from_slice(&partition_info.instance_guid);
349 responder.send(zx::sys::ZX_OK, Some(&guid))?;
350 } else {
351 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
352 }
353 }
354 Err(status) => {
355 responder.send(status.into_raw(), None)?;
356 }
357 }
358 }
359 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
360 Ok(info) => {
361 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
362 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
363 } else {
364 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
365 }
366 }
367 Err(status) => {
368 responder.send(status.into_raw(), None)?;
369 }
370 },
371 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
372 Ok(info) => {
373 if let DeviceInfo::Partition(info) = info.as_ref() {
374 let mut type_guid =
375 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
376 type_guid.value.copy_from_slice(&info.type_guid);
377 let mut instance_guid =
378 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
379 instance_guid.value.copy_from_slice(&info.instance_guid);
380 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
381 name: Some(info.name.clone()),
382 type_guid: Some(type_guid),
383 instance_guid: Some(instance_guid),
384 start_block_offset: info.block_range.as_ref().map(|range| range.start),
385 num_blocks: info
386 .block_range
387 .as_ref()
388 .map(|range| range.end - range.start),
389 flags: Some(info.flags),
390 ..Default::default()
391 }))?;
392 }
393 }
394 Err(status) => responder.send(Err(status.into_raw()))?,
395 },
396 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
397 match self.session_manager.query_slices(&start_slices).await {
398 Ok(mut results) => {
399 let results_len = results.len();
400 assert!(results_len <= 16);
401 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
402 responder.send(
403 zx::sys::ZX_OK,
404 &results.try_into().unwrap(),
405 results_len as u64,
406 )?;
407 }
408 Err(s) => {
409 responder.send(
410 s.into_raw(),
411 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
412 0,
413 )?;
414 }
415 }
416 }
417 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
418 match self.session_manager.get_volume_info().await {
419 Ok((manager_info, volume_info)) => {
420 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
421 }
422 Err(s) => responder.send(s.into_raw(), None, None)?,
423 }
424 }
425 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
426 responder.send(
427 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
428 .into_raw(),
429 )?;
430 }
431 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
432 responder.send(
433 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
434 .into_raw(),
435 )?;
436 }
437 fvolume::VolumeRequest::Destroy { responder, .. } => {
438 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
439 }
440 }
441 Ok(None)
442 }
443
444 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
445 self.session_manager.get_info().await
446 }
447}
448
449struct SessionHelper<SM: SessionManager> {
450 session_manager: Arc<SM>,
451 offset_map: Option<OffsetMap>,
452 block_size: u32,
453 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
454 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
455 message_groups: FifoMessageGroups,
456}
457
458impl<SM: SessionManager> SessionHelper<SM> {
459 fn new(
460 session_manager: Arc<SM>,
461 offset_map: Option<OffsetMap>,
462 block_size: u32,
463 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
464 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
465 Ok((
466 Self {
467 session_manager,
468 offset_map,
469 block_size,
470 peer_fifo,
471 vmos: Mutex::default(),
472 message_groups: FifoMessageGroups::default(),
473 },
474 fifo,
475 ))
476 }
477
478 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
479 match request {
480 fblock::SessionRequest::GetFifo { responder } => {
481 let rights = zx::Rights::TRANSFER
482 | zx::Rights::READ
483 | zx::Rights::WRITE
484 | zx::Rights::SIGNAL
485 | zx::Rights::WAIT;
486 match self.peer_fifo.duplicate_handle(rights) {
487 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
488 Err(s) => responder.send(Err(s.into_raw()))?,
489 }
490 Ok(())
491 }
492 fblock::SessionRequest::AttachVmo { vmo, responder } => {
493 let vmo = Arc::new(vmo);
494 let vmo_id = {
495 let mut vmos = self.vmos.lock();
496 if vmos.len() == u16::MAX as usize {
497 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
498 return Ok(());
499 } else {
500 let vmo_id = match vmos.last_entry() {
501 None => 1,
502 Some(o) => {
503 o.key().checked_add(1).unwrap_or_else(|| {
504 let mut vmo_id = 1;
505 for (&id, _) in &*vmos {
507 if id > vmo_id {
508 break;
509 }
510 vmo_id = id + 1;
511 }
512 vmo_id
513 })
514 }
515 };
516 vmos.insert(vmo_id, vmo.clone());
517 vmo_id
518 }
519 };
520 self.session_manager.clone().on_attach_vmo(&vmo).await?;
521 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
522 Ok(())
523 }
524 fblock::SessionRequest::Close { responder } => {
525 responder.send(Ok(()))?;
526 Err(anyhow!("Closed"))
527 }
528 }
529 }
530
531 fn finish_fifo_request(
532 &self,
533 request: RequestTracking,
534 status: zx::Status,
535 ) -> Option<BlockFifoResponse> {
536 match request.group_or_request {
537 GroupOrRequest::Group(group_id) => {
538 let response = self.message_groups.complete(group_id, status);
539 fuchsia_trace::duration!(
540 c"storage",
541 c"block_server::finish_transaction_in_group",
542 "group" => u32::from(group_id),
543 "group_completed" => response.is_some(),
544 "status" => status.into_raw());
545 if let Some(trace_flow_id) = request.trace_flow_id {
546 fuchsia_trace::flow_step!(
547 c"storage",
548 c"block_server::finish_transaction",
549 trace_flow_id.get().into()
550 );
551 }
552 response
553 }
554 GroupOrRequest::Request(reqid) => {
555 fuchsia_trace::duration!(
556 c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
557 if let Some(trace_flow_id) = request.trace_flow_id {
558 fuchsia_trace::flow_step!(
559 c"storage",
560 c"block_server::finish_transaction",
561 trace_flow_id.get().into()
562 );
563 }
564 Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
565 }
566 }
567 }
568
569 fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
570 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
571 let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
572 let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
573 let mut op_code =
574 BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
575
576 let group_or_request = if is_group {
577 let mut groups = self.message_groups.0.lock();
578 let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
579 if group.req_id.is_some() {
580 if group.status == zx::Status::OK {
582 group.status = zx::Status::INVALID_ARGS;
583 }
584 return None;
585 }
586 if last_in_group {
587 group.req_id = Some(request.reqid);
588 if group.status != zx::Status::OK {
590 op_code = Err(group.status);
591 }
592 } else if group.status != zx::Status::OK {
593 return None;
596 }
597 group.count += 1;
598 GroupOrRequest::Group(request.group)
599 } else {
600 GroupOrRequest::Request(request.reqid)
601 };
602
603 let mut vmo_offset = 0;
604 let vmo = match op_code {
605 Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
606 if request.length == 0 {
607 return Err(zx::Status::INVALID_ARGS);
608 }
609 vmo_offset = request
610 .vmo_offset
611 .checked_mul(self.block_size as u64)
612 .ok_or(zx::Status::OUT_OF_RANGE)?;
613 self.vmos
614 .lock()
615 .get(&request.vmoid)
616 .cloned()
617 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
618 })(),
619 Ok(BlockOpcode::CloseVmo) => self
620 .vmos
621 .lock()
622 .remove(&request.vmoid)
623 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
624 _ => Ok(None),
625 }
626 .unwrap_or_else(|e| {
627 op_code = Err(e);
628 None
629 });
630
631 let operation = op_code.map(|code| match code {
632 BlockOpcode::Read => Operation::Read {
633 device_block_offset: request.dev_offset,
634 block_count: request.length,
635 _unused: 0,
636 vmo_offset,
637 },
638 BlockOpcode::Write => Operation::Write {
639 device_block_offset: request.dev_offset,
640 block_count: request.length,
641 options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
642 WriteOptions::FORCE_ACCESS
643 } else {
644 WriteOptions::empty()
645 },
646 vmo_offset,
647 },
648 BlockOpcode::Flush => Operation::Flush,
649 BlockOpcode::Trim => Operation::Trim {
650 device_block_offset: request.dev_offset,
651 block_count: request.length,
652 },
653 BlockOpcode::CloseVmo => Operation::CloseVmo,
654 });
655 if let Ok(operation) = operation.as_ref() {
656 use fuchsia_trace::ArgValue;
657 static CACHE: AtomicU64 = AtomicU64::new(0);
658 if let Some(context) =
659 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
660 {
661 let trace_args_with_group = [
662 ArgValue::of("group", u32::from(group_or_request.group_id())),
663 ArgValue::of("opcode", operation.trace_label()),
664 ];
665 let trace_args = [ArgValue::of("opcode", operation.trace_label())];
666 let _scope = if group_or_request.is_group() {
667 fuchsia_trace::duration(
668 c"storage",
669 c"block_server::start_transaction",
670 &trace_args_with_group,
671 )
672 } else {
673 fuchsia_trace::duration(
674 c"storage",
675 c"block_server::start_transaction",
676 &trace_args,
677 )
678 };
679 let trace_flow_id = NonZero::new(request.trace_flow_id);
680 if let Some(trace_flow_id) = trace_flow_id.clone() {
681 fuchsia_trace::flow_step(
682 &context,
683 c"block_server::start_trnsaction",
684 trace_flow_id.get().into(),
685 &[],
686 );
687 }
688 }
689 }
690 Some(DecodedRequest::new(
691 RequestTracking {
692 group_or_request,
693 trace_flow_id: NonZero::new(request.trace_flow_id),
694 },
695 operation,
696 vmo,
697 self.offset_map.as_ref(),
698 ))
699 }
700
701 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
702 std::mem::take(&mut *self.vmos.lock())
703 }
704}
705
706#[derive(Debug)]
707struct DecodedRequest {
708 request_tracking: RequestTracking,
709 operation: Result<Operation, zx::Status>,
710 vmo: Option<Arc<zx::Vmo>>,
711}
712
713impl DecodedRequest {
714 fn new(
715 request_tracking: RequestTracking,
716 operation: Result<Operation, zx::Status>,
717 vmo: Option<Arc<zx::Vmo>>,
718 offset_map: Option<&OffsetMap>,
719 ) -> Self {
720 let operation =
721 operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
722 Self { request_tracking, operation, vmo }
723 }
724}
725
726pub type WriteOptions = block_protocol::WriteOptions;
728
729#[repr(C)]
730#[derive(Debug)]
731pub enum Operation {
732 Read {
737 device_block_offset: u64,
738 block_count: u32,
739 _unused: u32,
740 vmo_offset: u64,
741 },
742 Write {
743 device_block_offset: u64,
744 block_count: u32,
745 options: WriteOptions,
746 vmo_offset: u64,
747 },
748 Flush,
749 Trim {
750 device_block_offset: u64,
751 block_count: u32,
752 },
753 CloseVmo,
755}
756
757impl Operation {
758 fn validate_and_adjust_range(
761 mut self,
762 offset_map: Option<&OffsetMap>,
763 ) -> Result<Operation, zx::Status> {
764 let adjust_offset = |dev_offset, length| {
765 if length == 0 {
767 return Err(zx::Status::INVALID_ARGS);
768 }
769 if let Some(offset_map) = offset_map {
770 offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
771 } else {
772 Ok(dev_offset)
773 }
774 };
775 match &mut self {
776 Operation::Read { device_block_offset, block_count, .. } => {
777 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
778 }
779 Operation::Write { device_block_offset, block_count, .. } => {
780 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
781 }
782 Operation::Trim { device_block_offset, block_count, .. } => {
783 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
784 }
785 _ => {}
786 }
787 Ok(self)
788 }
789 fn trace_label(&self) -> &'static str {
790 match self {
791 Operation::Read { .. } => "read",
792 Operation::Write { .. } => "write",
793 Operation::Flush { .. } => "flush",
794 Operation::Trim { .. } => "trim",
795 Operation::CloseVmo { .. } => "close_vmo",
796 }
797 }
798}
799
800#[derive(Clone, Copy, Debug)]
801pub struct RequestTracking {
802 group_or_request: GroupOrRequest,
803 trace_flow_id: Option<NonZero<u64>>,
804}
805
806#[derive(Clone, Copy, Debug)]
807pub enum GroupOrRequest {
808 Group(u16),
809 Request(u32),
810}
811
812impl GroupOrRequest {
813 fn is_group(&self) -> bool {
814 if let Self::Group(_) = self {
815 true
816 } else {
817 false
818 }
819 }
820
821 fn group_id(&self) -> u16 {
822 match self {
823 Self::Group(id) => *id,
824 Self::Request(_) => 0,
825 }
826 }
827}
828
829const IS_GROUP: u64 = 0x8000_0000_0000_0000;
831const USED_VMO: u64 = 0x4000_0000_0000_0000;
833#[repr(transparent)]
834#[derive(Clone, Copy, Eq, PartialEq, Hash)]
835pub struct RequestId(u64);
836
837impl RequestId {
838 fn with_vmo(self) -> Self {
841 Self(self.0 | USED_VMO)
842 }
843
844 fn did_have_vmo(&self) -> bool {
846 self.0 & USED_VMO != 0
847 }
848}
849
850impl From<GroupOrRequest> for RequestId {
851 fn from(value: GroupOrRequest) -> Self {
852 match value {
853 GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
854 GroupOrRequest::Request(request) => RequestId(request as u64),
855 }
856 }
857}
858
859impl From<RequestId> for GroupOrRequest {
860 fn from(value: RequestId) -> Self {
861 if value.0 & IS_GROUP == 0 {
862 GroupOrRequest::Request(value.0 as u32)
863 } else {
864 GroupOrRequest::Group(value.0 as u16)
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::{BlockServer, DeviceInfo, PartitionInfo};
872 use crate::async_interface::FIFO_MAX_REQUESTS;
873 use assert_matches::assert_matches;
874 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
875 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
876 use fuchsia_sync::Mutex;
877 use futures::channel::oneshot;
878 use futures::future::BoxFuture;
879 use futures::FutureExt as _;
880 use std::borrow::Cow;
881 use std::future::poll_fn;
882 use std::num::NonZero;
883 use std::pin::pin;
884 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
885 use std::sync::Arc;
886 use std::task::{Context, Poll};
887 use zx::{AsHandleRef as _, HandleBased as _};
888 use {
889 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
890 fuchsia_async as fasync,
891 };
892
893 #[derive(Default)]
894 struct MockInterface {
895 read_hook: Option<
896 Box<
897 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
898 + Send
899 + Sync,
900 >,
901 >,
902 }
903
904 impl super::async_interface::Interface for MockInterface {
905 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
906 Ok(())
907 }
908
909 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
910 Ok(Cow::Owned(test_device_info()))
911 }
912
913 async fn read(
914 &self,
915 device_block_offset: u64,
916 block_count: u32,
917 vmo: &Arc<zx::Vmo>,
918 vmo_offset: u64,
919 _trace_flow_id: Option<NonZero<u64>>,
920 ) -> Result<(), zx::Status> {
921 if let Some(read_hook) = &self.read_hook {
922 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
923 } else {
924 unimplemented!();
925 }
926 }
927
928 async fn write(
929 &self,
930 _device_block_offset: u64,
931 _block_count: u32,
932 _vmo: &Arc<zx::Vmo>,
933 _vmo_offset: u64,
934 _opts: WriteOptions,
935 _trace_flow_id: Option<NonZero<u64>>,
936 ) -> Result<(), zx::Status> {
937 unreachable!();
938 }
939
940 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
941 unreachable!();
942 }
943
944 async fn trim(
945 &self,
946 _device_block_offset: u64,
947 _block_count: u32,
948 _trace_flow_id: Option<NonZero<u64>>,
949 ) -> Result<(), zx::Status> {
950 unreachable!();
951 }
952
953 async fn get_volume_info(
954 &self,
955 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
956 let () = std::future::pending().await;
958 unreachable!();
959 }
960 }
961
962 const BLOCK_SIZE: u32 = 512;
963
964 fn test_device_info() -> DeviceInfo {
965 DeviceInfo::Partition(PartitionInfo {
966 device_flags: fblock::Flag::READONLY,
967 block_range: Some(12..34),
968 type_guid: [1; 16],
969 instance_guid: [2; 16],
970 name: "foo".to_string(),
971 flags: 0xabcd,
972 })
973 }
974
975 #[fuchsia::test]
976 async fn test_info() {
977 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
978
979 futures::join!(
980 async {
981 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
982 block_server.handle_requests(stream).await.unwrap();
983 },
984 async {
985 let expected_info = test_device_info();
986 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
987 info
988 } else {
989 unreachable!()
990 };
991
992 let block_info = proxy.get_info().await.unwrap().unwrap();
993 assert_eq!(
994 block_info.block_count,
995 partition_info.block_range.as_ref().unwrap().end
996 - partition_info.block_range.as_ref().unwrap().start
997 );
998 assert_eq!(block_info.flags, fblock::Flag::READONLY);
999
1000 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1003 assert_eq!(status, zx::sys::ZX_OK);
1004 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1005
1006 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1007 assert_eq!(status, zx::sys::ZX_OK);
1008 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1009
1010 let (status, name) = proxy.get_name().await.unwrap();
1011 assert_eq!(status, zx::sys::ZX_OK);
1012 assert_eq!(name.as_ref(), Some(&partition_info.name));
1013
1014 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1015 assert_eq!(metadata.name, name);
1016 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1017 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1018 assert_eq!(
1019 metadata.start_block_offset,
1020 Some(partition_info.block_range.as_ref().unwrap().start)
1021 );
1022 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1023 assert_eq!(metadata.flags, Some(partition_info.flags));
1024
1025 std::mem::drop(proxy);
1026 }
1027 );
1028 }
1029
1030 #[fuchsia::test]
1031 async fn test_attach_vmo() {
1032 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1033
1034 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1035 let koid = vmo.get_koid().unwrap();
1036
1037 futures::join!(
1038 async {
1039 let block_server = BlockServer::new(
1040 BLOCK_SIZE,
1041 Arc::new(MockInterface {
1042 read_hook: Some(Box::new(move |_, _, vmo, _| {
1043 assert_eq!(vmo.get_koid().unwrap(), koid);
1044 Box::pin(async { Ok(()) })
1045 })),
1046 ..MockInterface::default()
1047 }),
1048 );
1049 block_server.handle_requests(stream).await.unwrap();
1050 },
1051 async move {
1052 let (session_proxy, server) = fidl::endpoints::create_proxy();
1053
1054 proxy.open_session(server).unwrap();
1055
1056 let vmo_id = session_proxy
1057 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1058 .await
1059 .unwrap()
1060 .unwrap();
1061 assert_ne!(vmo_id.id, 0);
1062
1063 let mut fifo =
1064 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1065 let (mut reader, mut writer) = fifo.async_io();
1066
1067 let mut count = 1;
1069 loop {
1070 match session_proxy
1071 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1072 .await
1073 .unwrap()
1074 {
1075 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1076 Err(e) => {
1077 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1078 break;
1079 }
1080 }
1081
1082 if count % 10 == 0 {
1084 writer
1085 .write_entries(&BlockFifoRequest {
1086 command: BlockFifoCommand {
1087 opcode: BlockOpcode::Read.into_primitive(),
1088 ..Default::default()
1089 },
1090 vmoid: vmo_id.id,
1091 length: 1,
1092 ..Default::default()
1093 })
1094 .await
1095 .unwrap();
1096
1097 let mut response = BlockFifoResponse::default();
1098 reader.read_entries(&mut response).await.unwrap();
1099 assert_eq!(response.status, zx::sys::ZX_OK);
1100 }
1101
1102 count += 1;
1103 }
1104
1105 assert_eq!(count, u16::MAX as u64);
1106
1107 writer
1109 .write_entries(&BlockFifoRequest {
1110 command: BlockFifoCommand {
1111 opcode: BlockOpcode::CloseVmo.into_primitive(),
1112 ..Default::default()
1113 },
1114 vmoid: vmo_id.id,
1115 ..Default::default()
1116 })
1117 .await
1118 .unwrap();
1119
1120 let mut response = BlockFifoResponse::default();
1121 reader.read_entries(&mut response).await.unwrap();
1122 assert_eq!(response.status, zx::sys::ZX_OK);
1123
1124 let new_vmo_id = session_proxy
1125 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1126 .await
1127 .unwrap()
1128 .unwrap();
1129 assert_eq!(new_vmo_id.id, vmo_id.id);
1131
1132 std::mem::drop(proxy);
1133 }
1134 );
1135 }
1136
1137 #[fuchsia::test]
1138 async fn test_close() {
1139 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1140
1141 let mut server = std::pin::pin!(async {
1142 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1143 block_server.handle_requests(stream).await.unwrap();
1144 }
1145 .fuse());
1146
1147 let mut client = std::pin::pin!(async {
1148 let (session_proxy, server) = fidl::endpoints::create_proxy();
1149
1150 proxy.open_session(server).unwrap();
1151
1152 std::mem::drop(proxy);
1155
1156 session_proxy.close().await.unwrap().unwrap();
1157
1158 let _: () = std::future::pending().await;
1160 }
1161 .fuse());
1162
1163 futures::select!(
1164 _ = server => {}
1165 _ = client => unreachable!(),
1166 );
1167 }
1168
1169 #[derive(Default)]
1170 struct IoMockInterface {
1171 do_checks: bool,
1172 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1173 return_errors: bool,
1174 }
1175
1176 #[derive(Debug)]
1177 enum ExpectedOp {
1178 Read(u64, u32, u64),
1179 Write(u64, u32, u64),
1180 Trim(u64, u32),
1181 Flush,
1182 }
1183
1184 impl super::async_interface::Interface for IoMockInterface {
1185 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1186 Ok(())
1187 }
1188
1189 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1190 Ok(Cow::Owned(test_device_info()))
1191 }
1192
1193 async fn read(
1194 &self,
1195 device_block_offset: u64,
1196 block_count: u32,
1197 _vmo: &Arc<zx::Vmo>,
1198 vmo_offset: u64,
1199 _trace_flow_id: Option<NonZero<u64>>,
1200 ) -> Result<(), zx::Status> {
1201 if self.return_errors {
1202 Err(zx::Status::INTERNAL)
1203 } else {
1204 if self.do_checks {
1205 assert_matches!(
1206 self.expected_op.lock().take(),
1207 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1208 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1209 "Read {device_block_offset} {block_count} {vmo_offset}"
1210 );
1211 }
1212 Ok(())
1213 }
1214 }
1215
1216 async fn write(
1217 &self,
1218 device_block_offset: u64,
1219 block_count: u32,
1220 _vmo: &Arc<zx::Vmo>,
1221 vmo_offset: u64,
1222 _opts: WriteOptions,
1223 _trace_flow_id: Option<NonZero<u64>>,
1224 ) -> Result<(), zx::Status> {
1225 if self.return_errors {
1226 Err(zx::Status::NOT_SUPPORTED)
1227 } else {
1228 if self.do_checks {
1229 assert_matches!(
1230 self.expected_op.lock().take(),
1231 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1232 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1233 "Write {device_block_offset} {block_count} {vmo_offset}"
1234 );
1235 }
1236 Ok(())
1237 }
1238 }
1239
1240 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1241 if self.return_errors {
1242 Err(zx::Status::NO_RESOURCES)
1243 } else {
1244 if self.do_checks {
1245 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1246 }
1247 Ok(())
1248 }
1249 }
1250
1251 async fn trim(
1252 &self,
1253 device_block_offset: u64,
1254 block_count: u32,
1255 _trace_flow_id: Option<NonZero<u64>>,
1256 ) -> Result<(), zx::Status> {
1257 if self.return_errors {
1258 Err(zx::Status::NO_MEMORY)
1259 } else {
1260 if self.do_checks {
1261 assert_matches!(
1262 self.expected_op.lock().take(),
1263 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1264 block_count == b,
1265 "Trim {device_block_offset} {block_count}"
1266 );
1267 }
1268 Ok(())
1269 }
1270 }
1271 }
1272
1273 #[fuchsia::test]
1274 async fn test_io() {
1275 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1276
1277 let expected_op = Arc::new(Mutex::new(None));
1278 let expected_op_clone = expected_op.clone();
1279
1280 let server = async {
1281 let block_server = BlockServer::new(
1282 BLOCK_SIZE,
1283 Arc::new(IoMockInterface {
1284 return_errors: false,
1285 do_checks: true,
1286 expected_op: expected_op_clone,
1287 }),
1288 );
1289 block_server.handle_requests(stream).await.unwrap();
1290 };
1291
1292 let client = async move {
1293 let (session_proxy, server) = fidl::endpoints::create_proxy();
1294
1295 proxy.open_session(server).unwrap();
1296
1297 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1298 let vmo_id = session_proxy
1299 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1300 .await
1301 .unwrap()
1302 .unwrap();
1303
1304 let mut fifo =
1305 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1306 let (mut reader, mut writer) = fifo.async_io();
1307
1308 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1310 writer
1311 .write_entries(&BlockFifoRequest {
1312 command: BlockFifoCommand {
1313 opcode: BlockOpcode::Read.into_primitive(),
1314 ..Default::default()
1315 },
1316 vmoid: vmo_id.id,
1317 dev_offset: 1,
1318 length: 2,
1319 vmo_offset: 3,
1320 ..Default::default()
1321 })
1322 .await
1323 .unwrap();
1324
1325 let mut response = BlockFifoResponse::default();
1326 reader.read_entries(&mut response).await.unwrap();
1327 assert_eq!(response.status, zx::sys::ZX_OK);
1328
1329 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1331 writer
1332 .write_entries(&BlockFifoRequest {
1333 command: BlockFifoCommand {
1334 opcode: BlockOpcode::Write.into_primitive(),
1335 ..Default::default()
1336 },
1337 vmoid: vmo_id.id,
1338 dev_offset: 4,
1339 length: 5,
1340 vmo_offset: 6,
1341 ..Default::default()
1342 })
1343 .await
1344 .unwrap();
1345
1346 let mut response = BlockFifoResponse::default();
1347 reader.read_entries(&mut response).await.unwrap();
1348 assert_eq!(response.status, zx::sys::ZX_OK);
1349
1350 *expected_op.lock() = Some(ExpectedOp::Flush);
1352 writer
1353 .write_entries(&BlockFifoRequest {
1354 command: BlockFifoCommand {
1355 opcode: BlockOpcode::Flush.into_primitive(),
1356 ..Default::default()
1357 },
1358 ..Default::default()
1359 })
1360 .await
1361 .unwrap();
1362
1363 reader.read_entries(&mut response).await.unwrap();
1364 assert_eq!(response.status, zx::sys::ZX_OK);
1365
1366 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1368 writer
1369 .write_entries(&BlockFifoRequest {
1370 command: BlockFifoCommand {
1371 opcode: BlockOpcode::Trim.into_primitive(),
1372 ..Default::default()
1373 },
1374 dev_offset: 7,
1375 length: 8,
1376 ..Default::default()
1377 })
1378 .await
1379 .unwrap();
1380
1381 reader.read_entries(&mut response).await.unwrap();
1382 assert_eq!(response.status, zx::sys::ZX_OK);
1383
1384 std::mem::drop(proxy);
1385 };
1386
1387 futures::join!(server, client);
1388 }
1389
1390 #[fuchsia::test]
1391 async fn test_io_errors() {
1392 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1393
1394 futures::join!(
1395 async {
1396 let block_server = BlockServer::new(
1397 BLOCK_SIZE,
1398 Arc::new(IoMockInterface {
1399 return_errors: true,
1400 do_checks: false,
1401 expected_op: Arc::new(Mutex::new(None)),
1402 }),
1403 );
1404 block_server.handle_requests(stream).await.unwrap();
1405 },
1406 async move {
1407 let (session_proxy, server) = fidl::endpoints::create_proxy();
1408
1409 proxy.open_session(server).unwrap();
1410
1411 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1412 let vmo_id = session_proxy
1413 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1414 .await
1415 .unwrap()
1416 .unwrap();
1417
1418 let mut fifo =
1419 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1420 let (mut reader, mut writer) = fifo.async_io();
1421
1422 writer
1424 .write_entries(&BlockFifoRequest {
1425 command: BlockFifoCommand {
1426 opcode: BlockOpcode::Read.into_primitive(),
1427 ..Default::default()
1428 },
1429 vmoid: vmo_id.id,
1430 length: 1,
1431 reqid: 1,
1432 ..Default::default()
1433 })
1434 .await
1435 .unwrap();
1436
1437 let mut response = BlockFifoResponse::default();
1438 reader.read_entries(&mut response).await.unwrap();
1439 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1440
1441 writer
1443 .write_entries(&BlockFifoRequest {
1444 command: BlockFifoCommand {
1445 opcode: BlockOpcode::Write.into_primitive(),
1446 ..Default::default()
1447 },
1448 vmoid: vmo_id.id,
1449 length: 1,
1450 reqid: 2,
1451 ..Default::default()
1452 })
1453 .await
1454 .unwrap();
1455
1456 reader.read_entries(&mut response).await.unwrap();
1457 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1458
1459 writer
1461 .write_entries(&BlockFifoRequest {
1462 command: BlockFifoCommand {
1463 opcode: BlockOpcode::Flush.into_primitive(),
1464 ..Default::default()
1465 },
1466 reqid: 3,
1467 ..Default::default()
1468 })
1469 .await
1470 .unwrap();
1471
1472 reader.read_entries(&mut response).await.unwrap();
1473 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1474
1475 writer
1477 .write_entries(&BlockFifoRequest {
1478 command: BlockFifoCommand {
1479 opcode: BlockOpcode::Trim.into_primitive(),
1480 ..Default::default()
1481 },
1482 reqid: 4,
1483 length: 1,
1484 ..Default::default()
1485 })
1486 .await
1487 .unwrap();
1488
1489 reader.read_entries(&mut response).await.unwrap();
1490 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1491
1492 std::mem::drop(proxy);
1493 }
1494 );
1495 }
1496
1497 #[fuchsia::test]
1498 async fn test_invalid_args() {
1499 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1500
1501 futures::join!(
1502 async {
1503 let block_server = BlockServer::new(
1504 BLOCK_SIZE,
1505 Arc::new(IoMockInterface {
1506 return_errors: false,
1507 do_checks: false,
1508 expected_op: Arc::new(Mutex::new(None)),
1509 }),
1510 );
1511 block_server.handle_requests(stream).await.unwrap();
1512 },
1513 async move {
1514 let (session_proxy, server) = fidl::endpoints::create_proxy();
1515
1516 proxy.open_session(server).unwrap();
1517
1518 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1519 let vmo_id = session_proxy
1520 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1521 .await
1522 .unwrap()
1523 .unwrap();
1524
1525 let mut fifo =
1526 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1527
1528 async fn test(
1529 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1530 request: BlockFifoRequest,
1531 ) -> Result<(), zx::Status> {
1532 let (mut reader, mut writer) = fifo.async_io();
1533 writer.write_entries(&request).await.unwrap();
1534 let mut response = BlockFifoResponse::default();
1535 reader.read_entries(&mut response).await.unwrap();
1536 zx::Status::ok(response.status)
1537 }
1538
1539 let good_read_request = || BlockFifoRequest {
1542 command: BlockFifoCommand {
1543 opcode: BlockOpcode::Read.into_primitive(),
1544 ..Default::default()
1545 },
1546 vmoid: vmo_id.id,
1547 ..Default::default()
1548 };
1549
1550 assert_eq!(
1551 test(
1552 &mut fifo,
1553 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1554 )
1555 .await,
1556 Err(zx::Status::INVALID_ARGS)
1557 );
1558
1559 assert_eq!(
1560 test(
1561 &mut fifo,
1562 BlockFifoRequest {
1563 vmo_offset: 0xffff_ffff_ffff_ffff,
1564 ..good_read_request()
1565 }
1566 )
1567 .await,
1568 Err(zx::Status::INVALID_ARGS)
1569 );
1570
1571 let good_write_request = || BlockFifoRequest {
1574 command: BlockFifoCommand {
1575 opcode: BlockOpcode::Write.into_primitive(),
1576 ..Default::default()
1577 },
1578 vmoid: vmo_id.id,
1579 ..Default::default()
1580 };
1581
1582 assert_eq!(
1583 test(
1584 &mut fifo,
1585 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1586 )
1587 .await,
1588 Err(zx::Status::INVALID_ARGS)
1589 );
1590
1591 assert_eq!(
1592 test(
1593 &mut fifo,
1594 BlockFifoRequest {
1595 vmo_offset: 0xffff_ffff_ffff_ffff,
1596 ..good_write_request()
1597 }
1598 )
1599 .await,
1600 Err(zx::Status::INVALID_ARGS)
1601 );
1602
1603 assert_eq!(
1606 test(
1607 &mut fifo,
1608 BlockFifoRequest {
1609 command: BlockFifoCommand {
1610 opcode: BlockOpcode::CloseVmo.into_primitive(),
1611 ..Default::default()
1612 },
1613 vmoid: vmo_id.id + 1,
1614 ..Default::default()
1615 }
1616 )
1617 .await,
1618 Err(zx::Status::IO)
1619 );
1620
1621 std::mem::drop(proxy);
1622 }
1623 );
1624 }
1625
1626 #[fuchsia::test]
1627 async fn test_concurrent_requests() {
1628 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1629
1630 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1631 let waiting_readers_clone = waiting_readers.clone();
1632
1633 futures::join!(
1634 async move {
1635 let block_server = BlockServer::new(
1636 BLOCK_SIZE,
1637 Arc::new(MockInterface {
1638 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1639 let (tx, rx) = oneshot::channel();
1640 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
1641 Box::pin(async move {
1642 let _ = rx.await;
1643 Ok(())
1644 })
1645 })),
1646 }),
1647 );
1648 block_server.handle_requests(stream).await.unwrap();
1649 },
1650 async move {
1651 let (session_proxy, server) = fidl::endpoints::create_proxy();
1652
1653 proxy.open_session(server).unwrap();
1654
1655 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1656 let vmo_id = session_proxy
1657 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1658 .await
1659 .unwrap()
1660 .unwrap();
1661
1662 let mut fifo =
1663 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1664 let (mut reader, mut writer) = fifo.async_io();
1665
1666 writer
1667 .write_entries(&BlockFifoRequest {
1668 command: BlockFifoCommand {
1669 opcode: BlockOpcode::Read.into_primitive(),
1670 ..Default::default()
1671 },
1672 reqid: 1,
1673 dev_offset: 1, vmoid: vmo_id.id,
1675 length: 1,
1676 ..Default::default()
1677 })
1678 .await
1679 .unwrap();
1680
1681 writer
1682 .write_entries(&BlockFifoRequest {
1683 command: BlockFifoCommand {
1684 opcode: BlockOpcode::Read.into_primitive(),
1685 ..Default::default()
1686 },
1687 reqid: 2,
1688 dev_offset: 2,
1689 vmoid: vmo_id.id,
1690 length: 1,
1691 ..Default::default()
1692 })
1693 .await
1694 .unwrap();
1695
1696 poll_fn(|cx: &mut Context<'_>| {
1698 if waiting_readers.lock().len() == 2 {
1699 Poll::Ready(())
1700 } else {
1701 cx.waker().wake_by_ref();
1703 Poll::Pending
1704 }
1705 })
1706 .await;
1707
1708 let mut response = BlockFifoResponse::default();
1709 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1710
1711 let (id, tx) = waiting_readers.lock().pop().unwrap();
1712 tx.send(()).unwrap();
1713
1714 reader.read_entries(&mut response).await.unwrap();
1715 assert_eq!(response.status, zx::sys::ZX_OK);
1716 assert_eq!(response.reqid, id);
1717
1718 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1719
1720 let (id, tx) = waiting_readers.lock().pop().unwrap();
1721 tx.send(()).unwrap();
1722
1723 reader.read_entries(&mut response).await.unwrap();
1724 assert_eq!(response.status, zx::sys::ZX_OK);
1725 assert_eq!(response.reqid, id);
1726 }
1727 );
1728 }
1729
1730 #[fuchsia::test]
1731 async fn test_groups() {
1732 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1733
1734 futures::join!(
1735 async move {
1736 let block_server = BlockServer::new(
1737 BLOCK_SIZE,
1738 Arc::new(MockInterface {
1739 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1740 }),
1741 );
1742 block_server.handle_requests(stream).await.unwrap();
1743 },
1744 async move {
1745 let (session_proxy, server) = fidl::endpoints::create_proxy();
1746
1747 proxy.open_session(server).unwrap();
1748
1749 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1750 let vmo_id = session_proxy
1751 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1752 .await
1753 .unwrap()
1754 .unwrap();
1755
1756 let mut fifo =
1757 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1758 let (mut reader, mut writer) = fifo.async_io();
1759
1760 writer
1761 .write_entries(&BlockFifoRequest {
1762 command: BlockFifoCommand {
1763 opcode: BlockOpcode::Read.into_primitive(),
1764 flags: BlockIoFlag::GROUP_ITEM.bits(),
1765 ..Default::default()
1766 },
1767 group: 1,
1768 vmoid: vmo_id.id,
1769 length: 1,
1770 ..Default::default()
1771 })
1772 .await
1773 .unwrap();
1774
1775 writer
1776 .write_entries(&BlockFifoRequest {
1777 command: BlockFifoCommand {
1778 opcode: BlockOpcode::Read.into_primitive(),
1779 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1780 ..Default::default()
1781 },
1782 reqid: 2,
1783 group: 1,
1784 vmoid: vmo_id.id,
1785 length: 1,
1786 ..Default::default()
1787 })
1788 .await
1789 .unwrap();
1790
1791 let mut response = BlockFifoResponse::default();
1792 reader.read_entries(&mut response).await.unwrap();
1793 assert_eq!(response.status, zx::sys::ZX_OK);
1794 assert_eq!(response.reqid, 2);
1795 assert_eq!(response.group, 1);
1796 }
1797 );
1798 }
1799
1800 #[fuchsia::test]
1801 async fn test_group_error() {
1802 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1803
1804 let counter = Arc::new(AtomicU64::new(0));
1805 let counter_clone = counter.clone();
1806
1807 futures::join!(
1808 async move {
1809 let block_server = BlockServer::new(
1810 BLOCK_SIZE,
1811 Arc::new(MockInterface {
1812 read_hook: Some(Box::new(move |_, _, _, _| {
1813 counter_clone.fetch_add(1, Ordering::Relaxed);
1814 Box::pin(async { Err(zx::Status::BAD_STATE) })
1815 })),
1816 }),
1817 );
1818 block_server.handle_requests(stream).await.unwrap();
1819 },
1820 async move {
1821 let (session_proxy, server) = fidl::endpoints::create_proxy();
1822
1823 proxy.open_session(server).unwrap();
1824
1825 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1826 let vmo_id = session_proxy
1827 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1828 .await
1829 .unwrap()
1830 .unwrap();
1831
1832 let mut fifo =
1833 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1834 let (mut reader, mut writer) = fifo.async_io();
1835
1836 writer
1837 .write_entries(&BlockFifoRequest {
1838 command: BlockFifoCommand {
1839 opcode: BlockOpcode::Read.into_primitive(),
1840 flags: BlockIoFlag::GROUP_ITEM.bits(),
1841 ..Default::default()
1842 },
1843 group: 1,
1844 vmoid: vmo_id.id,
1845 length: 1,
1846 ..Default::default()
1847 })
1848 .await
1849 .unwrap();
1850
1851 poll_fn(|cx: &mut Context<'_>| {
1853 if counter.load(Ordering::Relaxed) == 1 {
1854 Poll::Ready(())
1855 } else {
1856 cx.waker().wake_by_ref();
1858 Poll::Pending
1859 }
1860 })
1861 .await;
1862
1863 let mut response = BlockFifoResponse::default();
1864 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1865
1866 writer
1867 .write_entries(&BlockFifoRequest {
1868 command: BlockFifoCommand {
1869 opcode: BlockOpcode::Read.into_primitive(),
1870 flags: BlockIoFlag::GROUP_ITEM.bits(),
1871 ..Default::default()
1872 },
1873 group: 1,
1874 vmoid: vmo_id.id,
1875 length: 1,
1876 ..Default::default()
1877 })
1878 .await
1879 .unwrap();
1880
1881 writer
1882 .write_entries(&BlockFifoRequest {
1883 command: BlockFifoCommand {
1884 opcode: BlockOpcode::Read.into_primitive(),
1885 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1886 ..Default::default()
1887 },
1888 reqid: 2,
1889 group: 1,
1890 vmoid: vmo_id.id,
1891 length: 1,
1892 ..Default::default()
1893 })
1894 .await
1895 .unwrap();
1896
1897 reader.read_entries(&mut response).await.unwrap();
1898 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1899 assert_eq!(response.reqid, 2);
1900 assert_eq!(response.group, 1);
1901
1902 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1903
1904 assert_eq!(counter.load(Ordering::Relaxed), 1);
1906 }
1907 );
1908 }
1909
1910 #[fuchsia::test]
1911 async fn test_group_with_two_lasts() {
1912 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1913
1914 let (tx, rx) = oneshot::channel();
1915
1916 futures::join!(
1917 async move {
1918 let rx = Mutex::new(Some(rx));
1919 let block_server = BlockServer::new(
1920 BLOCK_SIZE,
1921 Arc::new(MockInterface {
1922 read_hook: Some(Box::new(move |_, _, _, _| {
1923 let rx = rx.lock().take().unwrap();
1924 Box::pin(async {
1925 let _ = rx.await;
1926 Ok(())
1927 })
1928 })),
1929 }),
1930 );
1931 block_server.handle_requests(stream).await.unwrap();
1932 },
1933 async move {
1934 let (session_proxy, server) = fidl::endpoints::create_proxy();
1935
1936 proxy.open_session(server).unwrap();
1937
1938 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1939 let vmo_id = session_proxy
1940 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1941 .await
1942 .unwrap()
1943 .unwrap();
1944
1945 let mut fifo =
1946 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1947 let (mut reader, mut writer) = fifo.async_io();
1948
1949 writer
1950 .write_entries(&BlockFifoRequest {
1951 command: BlockFifoCommand {
1952 opcode: BlockOpcode::Read.into_primitive(),
1953 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1954 ..Default::default()
1955 },
1956 reqid: 1,
1957 group: 1,
1958 vmoid: vmo_id.id,
1959 length: 1,
1960 ..Default::default()
1961 })
1962 .await
1963 .unwrap();
1964
1965 writer
1966 .write_entries(&BlockFifoRequest {
1967 command: BlockFifoCommand {
1968 opcode: BlockOpcode::Read.into_primitive(),
1969 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1970 ..Default::default()
1971 },
1972 reqid: 2,
1973 group: 1,
1974 vmoid: vmo_id.id,
1975 length: 1,
1976 ..Default::default()
1977 })
1978 .await
1979 .unwrap();
1980
1981 writer
1983 .write_entries(&BlockFifoRequest {
1984 command: BlockFifoCommand {
1985 opcode: BlockOpcode::CloseVmo.into_primitive(),
1986 ..Default::default()
1987 },
1988 reqid: 3,
1989 vmoid: vmo_id.id,
1990 ..Default::default()
1991 })
1992 .await
1993 .unwrap();
1994
1995 let mut response = BlockFifoResponse::default();
1997 reader.read_entries(&mut response).await.unwrap();
1998 assert_eq!(response.status, zx::sys::ZX_OK);
1999 assert_eq!(response.reqid, 3);
2000
2001 tx.send(()).unwrap();
2003
2004 let mut response = BlockFifoResponse::default();
2007 reader.read_entries(&mut response).await.unwrap();
2008 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2009 assert_eq!(response.reqid, 1);
2010 assert_eq!(response.group, 1);
2011 }
2012 );
2013 }
2014
2015 #[fuchsia::test(allow_stalls = false)]
2016 async fn test_requests_dont_block_sessions() {
2017 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2018
2019 let (tx, rx) = oneshot::channel();
2020
2021 fasync::Task::local(async move {
2022 let rx = Mutex::new(Some(rx));
2023 let block_server = BlockServer::new(
2024 BLOCK_SIZE,
2025 Arc::new(MockInterface {
2026 read_hook: Some(Box::new(move |_, _, _, _| {
2027 let rx = rx.lock().take().unwrap();
2028 Box::pin(async {
2029 let _ = rx.await;
2030 Ok(())
2031 })
2032 })),
2033 }),
2034 );
2035 block_server.handle_requests(stream).await.unwrap();
2036 })
2037 .detach();
2038
2039 let mut fut = pin!(async {
2040 let (session_proxy, server) = fidl::endpoints::create_proxy();
2041
2042 proxy.open_session(server).unwrap();
2043
2044 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2045 let vmo_id = session_proxy
2046 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2047 .await
2048 .unwrap()
2049 .unwrap();
2050
2051 let mut fifo =
2052 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2053 let (mut reader, mut writer) = fifo.async_io();
2054
2055 writer
2056 .write_entries(&BlockFifoRequest {
2057 command: BlockFifoCommand {
2058 opcode: BlockOpcode::Read.into_primitive(),
2059 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2060 ..Default::default()
2061 },
2062 reqid: 1,
2063 group: 1,
2064 vmoid: vmo_id.id,
2065 length: 1,
2066 ..Default::default()
2067 })
2068 .await
2069 .unwrap();
2070
2071 let mut response = BlockFifoResponse::default();
2072 reader.read_entries(&mut response).await.unwrap();
2073 assert_eq!(response.status, zx::sys::ZX_OK);
2074 });
2075
2076 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2078
2079 let mut fut2 = pin!(proxy.get_volume_info());
2080
2081 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2083
2084 let _ = tx.send(());
2087
2088 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2089 }
2090
2091 #[fuchsia::test]
2092 async fn test_request_flow_control() {
2093 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2094
2095 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2098 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2099 let event_clone = event.clone();
2100 futures::join!(
2101 async move {
2102 let block_server = BlockServer::new(
2103 BLOCK_SIZE,
2104 Arc::new(MockInterface {
2105 read_hook: Some(Box::new(move |_, _, _, _| {
2106 let event_clone = event_clone.clone();
2107 Box::pin(async move {
2108 if !event_clone.1.load(Ordering::SeqCst) {
2109 event_clone.0.listen().await;
2110 }
2111 Ok(())
2112 })
2113 })),
2114 }),
2115 );
2116 block_server.handle_requests(stream).await.unwrap();
2117 },
2118 async move {
2119 let (session_proxy, server) = fidl::endpoints::create_proxy();
2120
2121 proxy.open_session(server).unwrap();
2122
2123 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2124 let vmo_id = session_proxy
2125 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2126 .await
2127 .unwrap()
2128 .unwrap();
2129
2130 let mut fifo =
2131 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2132 let (mut reader, mut writer) = fifo.async_io();
2133
2134 for i in 0..MAX_REQUESTS {
2135 writer
2136 .write_entries(&BlockFifoRequest {
2137 command: BlockFifoCommand {
2138 opcode: BlockOpcode::Read.into_primitive(),
2139 ..Default::default()
2140 },
2141 reqid: (i + 1) as u32,
2142 dev_offset: i,
2143 vmoid: vmo_id.id,
2144 length: 1,
2145 ..Default::default()
2146 })
2147 .await
2148 .unwrap();
2149 }
2150 assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2151 command: BlockFifoCommand {
2152 opcode: BlockOpcode::Read.into_primitive(),
2153 ..Default::default()
2154 },
2155 reqid: u32::MAX,
2156 dev_offset: MAX_REQUESTS,
2157 vmoid: vmo_id.id,
2158 length: 1,
2159 ..Default::default()
2160 })))
2161 .is_pending());
2162 event.1.store(true, Ordering::SeqCst);
2164 event.0.notify(usize::MAX);
2165 let mut finished_reqids = vec![];
2167 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2168 let mut response = BlockFifoResponse::default();
2169 reader.read_entries(&mut response).await.unwrap();
2170 assert_eq!(response.status, zx::sys::ZX_OK);
2171 finished_reqids.push(response.reqid);
2172 writer
2173 .write_entries(&BlockFifoRequest {
2174 command: BlockFifoCommand {
2175 opcode: BlockOpcode::Read.into_primitive(),
2176 ..Default::default()
2177 },
2178 reqid: (i + 1) as u32,
2179 dev_offset: i,
2180 vmoid: vmo_id.id,
2181 length: 1,
2182 ..Default::default()
2183 })
2184 .await
2185 .unwrap();
2186 }
2187 let mut response = BlockFifoResponse::default();
2188 for _ in 0..MAX_REQUESTS {
2189 reader.read_entries(&mut response).await.unwrap();
2190 assert_eq!(response.status, zx::sys::ZX_OK);
2191 finished_reqids.push(response.reqid);
2192 }
2193 finished_reqids.sort();
2196 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2197 let mut i = 1;
2198 for reqid in finished_reqids {
2199 assert_eq!(reqid, i);
2200 i += 1;
2201 }
2202 }
2203 );
2204 }
2205
2206 #[fuchsia::test]
2207 async fn test_passthrough_io_with_fixed_map() {
2208 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2209
2210 let expected_op = Arc::new(Mutex::new(None));
2211 let expected_op_clone = expected_op.clone();
2212 futures::join!(
2213 async {
2214 let block_server = BlockServer::new(
2215 BLOCK_SIZE,
2216 Arc::new(IoMockInterface {
2217 return_errors: false,
2218 do_checks: true,
2219 expected_op: expected_op_clone,
2220 }),
2221 );
2222 block_server.handle_requests(stream).await.unwrap();
2223 },
2224 async move {
2225 let (session_proxy, server) = fidl::endpoints::create_proxy();
2226
2227 let mappings = [fblock::BlockOffsetMapping {
2228 source_block_offset: 0,
2229 target_block_offset: 10,
2230 length: 20,
2231 }];
2232 proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2233
2234 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2235 let vmo_id = session_proxy
2236 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2237 .await
2238 .unwrap()
2239 .unwrap();
2240
2241 let mut fifo =
2242 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2243 let (mut reader, mut writer) = fifo.async_io();
2244
2245 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2247 writer
2248 .write_entries(&BlockFifoRequest {
2249 command: BlockFifoCommand {
2250 opcode: BlockOpcode::Read.into_primitive(),
2251 ..Default::default()
2252 },
2253 vmoid: vmo_id.id,
2254 dev_offset: 1,
2255 length: 2,
2256 vmo_offset: 3,
2257 ..Default::default()
2258 })
2259 .await
2260 .unwrap();
2261
2262 let mut response = BlockFifoResponse::default();
2263 reader.read_entries(&mut response).await.unwrap();
2264 assert_eq!(response.status, zx::sys::ZX_OK);
2265
2266 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2268 writer
2269 .write_entries(&BlockFifoRequest {
2270 command: BlockFifoCommand {
2271 opcode: BlockOpcode::Write.into_primitive(),
2272 ..Default::default()
2273 },
2274 vmoid: vmo_id.id,
2275 dev_offset: 4,
2276 length: 5,
2277 vmo_offset: 6,
2278 ..Default::default()
2279 })
2280 .await
2281 .unwrap();
2282
2283 reader.read_entries(&mut response).await.unwrap();
2284 assert_eq!(response.status, zx::sys::ZX_OK);
2285
2286 *expected_op.lock() = Some(ExpectedOp::Flush);
2288 writer
2289 .write_entries(&BlockFifoRequest {
2290 command: BlockFifoCommand {
2291 opcode: BlockOpcode::Flush.into_primitive(),
2292 ..Default::default()
2293 },
2294 ..Default::default()
2295 })
2296 .await
2297 .unwrap();
2298
2299 reader.read_entries(&mut response).await.unwrap();
2300 assert_eq!(response.status, zx::sys::ZX_OK);
2301
2302 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2304 writer
2305 .write_entries(&BlockFifoRequest {
2306 command: BlockFifoCommand {
2307 opcode: BlockOpcode::Trim.into_primitive(),
2308 ..Default::default()
2309 },
2310 dev_offset: 7,
2311 length: 3,
2312 ..Default::default()
2313 })
2314 .await
2315 .unwrap();
2316
2317 reader.read_entries(&mut response).await.unwrap();
2318 assert_eq!(response.status, zx::sys::ZX_OK);
2319
2320 *expected_op.lock() = None;
2322 writer
2323 .write_entries(&BlockFifoRequest {
2324 command: BlockFifoCommand {
2325 opcode: BlockOpcode::Read.into_primitive(),
2326 ..Default::default()
2327 },
2328 vmoid: vmo_id.id,
2329 dev_offset: 19,
2330 length: 2,
2331 vmo_offset: 3,
2332 ..Default::default()
2333 })
2334 .await
2335 .unwrap();
2336
2337 reader.read_entries(&mut response).await.unwrap();
2338 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2339
2340 std::mem::drop(proxy);
2341 }
2342 );
2343 }
2344}