1use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_sync::Mutex;
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use std::borrow::Cow;
11use std::collections::btree_map::Entry;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::atomic::AtomicU64;
16use std::sync::Arc;
17use zx::HandleBased;
18use {
19 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
20 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
21};
22
23pub mod async_interface;
24pub mod c_interface;
25
26pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
27
28#[derive(Clone)]
29pub enum DeviceInfo {
30 Block(BlockInfo),
31 Partition(PartitionInfo),
32}
33
34impl DeviceInfo {
35 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
36 match self {
37 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
38 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
39 max_transfer_blocks.clone()
40 }
41 }
42 }
43
44 fn max_transfer_size(&self, block_size: u32) -> u32 {
45 if let Some(max_blocks) = self.max_transfer_blocks() {
46 max_blocks.get() * block_size
47 } else {
48 MAX_TRANSFER_UNBOUNDED
49 }
50 }
51}
52
53#[derive(Clone)]
55pub struct BlockInfo {
56 pub device_flags: fblock::Flag,
57 pub block_count: u64,
58 pub max_transfer_blocks: Option<NonZero<u32>>,
59}
60
61#[derive(Clone)]
63pub struct PartitionInfo {
64 pub device_flags: fblock::Flag,
66 pub max_transfer_blocks: Option<NonZero<u32>>,
67 pub block_range: Option<Range<u64>>,
71 pub type_guid: [u8; 16],
72 pub instance_guid: [u8; 16],
73 pub name: String,
74 pub flags: u64,
75}
76
77struct FifoMessageGroup {
90 status: zx::Status,
91 count: u32,
92 req_id: Option<u32>,
93}
94
95impl FifoMessageGroup {
96 fn new() -> Self {
97 FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
98 }
99}
100
101#[derive(Default)]
103struct FifoMessageGroups(Mutex<BTreeMap<GroupOrRequest, FifoMessageGroup>>);
104
105impl FifoMessageGroups {
106 fn complete(&self, group_id: GroupOrRequest, status: zx::Status) -> Option<BlockFifoResponse> {
109 let mut map = self.0.lock();
110 let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
111 let group = o.get_mut();
112 if group.count == 1 {
113 if let Some(reqid) = group.req_id {
114 let status =
115 if group.status != zx::Status::OK { group.status } else { status }.into_raw();
116
117 o.remove();
118
119 return Some(BlockFifoResponse {
120 status,
121 reqid,
122 group: group_id.group_id(),
123 ..Default::default()
124 });
125 }
126 }
127
128 group.count = group.count.checked_sub(1).unwrap();
129 if status != zx::Status::OK && group.status == zx::Status::OK {
130 group.status = status
131 }
132 None
133 }
134}
135
136pub struct BlockServer<SM> {
139 block_size: u32,
140 session_manager: Arc<SM>,
141}
142
143pub struct BlockOffsetMapping {
145 source_block_offset: u64,
146 target_block_offset: u64,
147 length: u64,
148}
149
150impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
151 type Error = zx::Status;
152
153 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
154 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
155 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
156 Ok(Self {
157 source_block_offset: wire.source_block_offset,
158 target_block_offset: wire.target_block_offset,
159 length: wire.length,
160 })
161 }
162}
163
164pub struct OffsetMap {
170 mapping: Option<BlockOffsetMapping>,
171 max_transfer_blocks: Option<NonZero<u32>>,
172}
173
174impl OffsetMap {
175 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
177 Self { mapping: Some(mapping), max_transfer_blocks }
178 }
179
180 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
182 Self { mapping: None, max_transfer_blocks }
183 }
184
185 pub fn is_empty(&self) -> bool {
186 self.mapping.is_some()
187 }
188
189 pub fn adjust_request(&self, dev_offset: u64, length: u32, is_rw: bool) -> Option<(u64, u32)> {
192 let length = if is_rw {
194 if let Some(max) = self.max_transfer_blocks {
195 std::cmp::min(max.get(), length)
196 } else {
197 length
198 }
199 } else {
200 length
201 };
202 let end = dev_offset.checked_add(length as u64)?;
203 if let Some(mapping) = &self.mapping {
204 if mapping.source_block_offset > dev_offset
205 || end > mapping.source_block_offset + mapping.length
206 {
207 return None;
208 }
209 let delta = dev_offset - mapping.source_block_offset;
210 Some((mapping.target_block_offset + delta, length))
211 } else {
212 Some((dev_offset, length))
213 }
214 }
215}
216
217pub trait SessionManager: 'static {
220 fn on_attach_vmo(
221 self: Arc<Self>,
222 vmo: &Arc<zx::Vmo>,
223 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
224
225 fn open_session(
230 self: Arc<Self>,
231 stream: fblock::SessionRequestStream,
232 offset_map: OffsetMap,
233 block_size: u32,
234 ) -> impl Future<Output = Result<(), Error>> + Send;
235
236 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
238
239 fn get_volume_info(
241 &self,
242 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
243 {
244 async { Err(zx::Status::NOT_SUPPORTED) }
245 }
246
247 fn query_slices(
249 &self,
250 _start_slices: &[u64],
251 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
252 async { Err(zx::Status::NOT_SUPPORTED) }
253 }
254
255 fn extend(
257 &self,
258 _start_slice: u64,
259 _slice_count: u64,
260 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
261 async { Err(zx::Status::NOT_SUPPORTED) }
262 }
263
264 fn shrink(
266 &self,
267 _start_slice: u64,
268 _slice_count: u64,
269 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
270 async { Err(zx::Status::NOT_SUPPORTED) }
271 }
272}
273
274pub trait IntoSessionManager {
275 type SM: SessionManager;
276
277 fn into_session_manager(self) -> Arc<Self::SM>;
278}
279
280impl<SM: SessionManager> BlockServer<SM> {
281 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
282 Self { block_size, session_manager: session_manager.into_session_manager() }
283 }
284
285 pub async fn handle_requests(
287 &self,
288 mut requests: fvolume::VolumeRequestStream,
289 ) -> Result<(), Error> {
290 let scope = fasync::Scope::new();
291 while let Some(request) = requests.try_next().await.unwrap() {
292 if let Some(session) = self.handle_request(request).await? {
293 scope.spawn(session.map(|_| ()));
294 }
295 }
296 scope.await;
297 Ok(())
298 }
299
300 async fn handle_request(
303 &self,
304 request: fvolume::VolumeRequest,
305 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
306 match request {
307 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
308 Ok(info) => {
309 let max_transfer_size = info.max_transfer_size(self.block_size);
310 let (block_count, flags) = match info.as_ref() {
311 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
312 (*block_count, *device_flags)
313 }
314 DeviceInfo::Partition(partition_info) => {
315 let block_count = if let Some(range) =
316 partition_info.block_range.as_ref()
317 {
318 range.end - range.start
319 } else {
320 let volume_info = self.session_manager.get_volume_info().await?;
321 volume_info.0.slice_size * volume_info.1.partition_slice_count
322 / self.block_size as u64
323 };
324 (block_count, partition_info.device_flags)
325 }
326 };
327 responder.send(Ok(&fblock::BlockInfo {
328 block_count,
329 block_size: self.block_size,
330 max_transfer_size,
331 flags,
332 }))?;
333 }
334 Err(status) => responder.send(Err(status.into_raw()))?,
335 },
336 fvolume::VolumeRequest::GetStats { clear: _, responder } => {
337 responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
339 }
340 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
341 match self.device_info().await {
342 Ok(info) => {
343 return Ok(Some(self.session_manager.clone().open_session(
344 session.into_stream(),
345 OffsetMap::empty(info.max_transfer_blocks()),
346 self.block_size,
347 )));
348 }
349 Err(status) => session.close_with_epitaph(status)?,
350 }
351 }
352 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
353 session,
354 offset_map,
355 initial_mappings,
356 control_handle: _,
357 } => match self.device_info().await {
358 Ok(info) => {
359 if offset_map.is_some()
360 || initial_mappings.as_ref().is_none_or(|m| m.len() != 1)
361 {
362 session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
366 return Ok(None);
367 }
368 let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into()
369 {
370 Ok(m) => m,
371 Err(status) => {
372 session.close_with_epitaph(status)?;
373 return Ok(None);
374 }
375 };
376 return Ok(Some(self.session_manager.clone().open_session(
377 session.into_stream(),
378 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
379 self.block_size,
380 )));
381 }
382 Err(status) => session.close_with_epitaph(status)?,
383 },
384 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
385 Ok(info) => {
386 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
387 let mut guid =
388 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
389 guid.value.copy_from_slice(&partition_info.type_guid);
390 responder.send(zx::sys::ZX_OK, Some(&guid))?;
391 } else {
392 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
393 }
394 }
395 Err(status) => {
396 responder.send(status.into_raw(), None)?;
397 }
398 },
399 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
400 match self.device_info().await {
401 Ok(info) => {
402 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
403 let mut guid =
404 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
405 guid.value.copy_from_slice(&partition_info.instance_guid);
406 responder.send(zx::sys::ZX_OK, Some(&guid))?;
407 } else {
408 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
409 }
410 }
411 Err(status) => {
412 responder.send(status.into_raw(), None)?;
413 }
414 }
415 }
416 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
417 Ok(info) => {
418 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
419 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
420 } else {
421 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
422 }
423 }
424 Err(status) => {
425 responder.send(status.into_raw(), None)?;
426 }
427 },
428 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
429 Ok(info) => {
430 if let DeviceInfo::Partition(info) = info.as_ref() {
431 let mut type_guid =
432 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
433 type_guid.value.copy_from_slice(&info.type_guid);
434 let mut instance_guid =
435 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
436 instance_guid.value.copy_from_slice(&info.instance_guid);
437 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
438 name: Some(info.name.clone()),
439 type_guid: Some(type_guid),
440 instance_guid: Some(instance_guid),
441 start_block_offset: info.block_range.as_ref().map(|range| range.start),
442 num_blocks: info
443 .block_range
444 .as_ref()
445 .map(|range| range.end - range.start),
446 flags: Some(info.flags),
447 ..Default::default()
448 }))?;
449 }
450 }
451 Err(status) => responder.send(Err(status.into_raw()))?,
452 },
453 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
454 match self.session_manager.query_slices(&start_slices).await {
455 Ok(mut results) => {
456 let results_len = results.len();
457 assert!(results_len <= 16);
458 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
459 responder.send(
460 zx::sys::ZX_OK,
461 &results.try_into().unwrap(),
462 results_len as u64,
463 )?;
464 }
465 Err(s) => {
466 responder.send(
467 s.into_raw(),
468 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
469 0,
470 )?;
471 }
472 }
473 }
474 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
475 match self.session_manager.get_volume_info().await {
476 Ok((manager_info, volume_info)) => {
477 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
478 }
479 Err(s) => responder.send(s.into_raw(), None, None)?,
480 }
481 }
482 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
483 responder.send(
484 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
485 .into_raw(),
486 )?;
487 }
488 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
489 responder.send(
490 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
491 .into_raw(),
492 )?;
493 }
494 fvolume::VolumeRequest::Destroy { responder, .. } => {
495 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
496 }
497 }
498 Ok(None)
499 }
500
501 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
502 self.session_manager.get_info().await
503 }
504}
505
506struct SessionHelper<SM: SessionManager> {
507 session_manager: Arc<SM>,
508 offset_map: OffsetMap,
509 block_size: u32,
510 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
511 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
512 message_groups: FifoMessageGroups,
513}
514
515impl<SM: SessionManager> SessionHelper<SM> {
516 fn new(
517 session_manager: Arc<SM>,
518 offset_map: OffsetMap,
519 block_size: u32,
520 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
521 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
522 Ok((
523 Self {
524 session_manager,
525 offset_map,
526 block_size,
527 peer_fifo,
528 vmos: Mutex::default(),
529 message_groups: FifoMessageGroups::default(),
530 },
531 fifo,
532 ))
533 }
534
535 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
536 match request {
537 fblock::SessionRequest::GetFifo { responder } => {
538 let rights = zx::Rights::TRANSFER
539 | zx::Rights::READ
540 | zx::Rights::WRITE
541 | zx::Rights::SIGNAL
542 | zx::Rights::WAIT;
543 match self.peer_fifo.duplicate_handle(rights) {
544 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
545 Err(s) => responder.send(Err(s.into_raw()))?,
546 }
547 Ok(())
548 }
549 fblock::SessionRequest::AttachVmo { vmo, responder } => {
550 let vmo = Arc::new(vmo);
551 let vmo_id = {
552 let mut vmos = self.vmos.lock();
553 if vmos.len() == u16::MAX as usize {
554 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
555 return Ok(());
556 } else {
557 let vmo_id = match vmos.last_entry() {
558 None => 1,
559 Some(o) => {
560 o.key().checked_add(1).unwrap_or_else(|| {
561 let mut vmo_id = 1;
562 for (&id, _) in &*vmos {
564 if id > vmo_id {
565 break;
566 }
567 vmo_id = id + 1;
568 }
569 vmo_id
570 })
571 }
572 };
573 vmos.insert(vmo_id, vmo.clone());
574 vmo_id
575 }
576 };
577 self.session_manager.clone().on_attach_vmo(&vmo).await?;
578 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
579 Ok(())
580 }
581 fblock::SessionRequest::Close { responder } => {
582 responder.send(Ok(()))?;
583 Err(anyhow!("Closed"))
584 }
585 }
586 }
587
588 fn finish_fifo_request(
589 &self,
590 request: RequestTracking,
591 status: zx::Status,
592 ) -> Option<BlockFifoResponse> {
593 match request.group_or_request {
594 group @ GroupOrRequest::Group(_) => {
595 let response = self.message_groups.complete(group, status);
596 fuchsia_trace::duration!(
597 c"storage",
598 c"block_server::finish_transaction_in_group",
599 "group" => group.id(),
600 "group_completed" => response.is_some(),
601 "status" => status.into_raw());
602 if let Some(trace_flow_id) = request.trace_flow_id {
603 fuchsia_trace::flow_step!(
604 c"storage",
605 c"block_server::finish_transaction",
606 trace_flow_id.get().into()
607 );
608 }
609 response
610 }
611 group @ GroupOrRequest::OneShotGroup(_) => {
612 let response = self.message_groups.complete(group, status);
613 fuchsia_trace::duration!(
614 c"storage",
615 c"block_server::finish_transaction_in_oneshot_group",
616 "group" => group.id(),
617 "group_completed" => response.is_some(),
618 "status" => status.into_raw());
619 if let Some(trace_flow_id) = request.trace_flow_id {
620 fuchsia_trace::flow_step!(
621 c"storage",
622 c"block_server::finish_transaction",
623 trace_flow_id.get().into()
624 );
625 }
626 response
627 }
628 GroupOrRequest::Request(reqid) => {
629 fuchsia_trace::duration!(
630 c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
631 if let Some(trace_flow_id) = request.trace_flow_id {
632 fuchsia_trace::flow_step!(
633 c"storage",
634 c"block_server::finish_transaction",
635 trace_flow_id.get().into()
636 );
637 }
638 Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
639 }
640 }
641 }
642
643 fn decode_fifo_request(
646 &self,
647 request: &mut BlockFifoRequest,
648 mut is_split: bool,
649 ) -> DecodeResult {
650 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
651 let old_length = request.length;
652 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
653 .ok_or(zx::Status::INVALID_ARGS)
654 .and_then(|code| {
655 let op = match code {
656 BlockOpcode::Read => Operation::Read {
657 device_block_offset: request.dev_offset,
658 block_count: request.length,
659 _unused: 0,
660 vmo_offset: request
661 .vmo_offset
662 .checked_mul(self.block_size as u64)
663 .ok_or(zx::Status::OUT_OF_RANGE)?,
664 },
665 BlockOpcode::Write => Operation::Write {
666 device_block_offset: request.dev_offset,
667 block_count: request.length,
668 options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
669 WriteOptions::FORCE_ACCESS
670 } else {
671 WriteOptions::empty()
672 },
673 vmo_offset: request
674 .vmo_offset
675 .checked_mul(self.block_size as u64)
676 .ok_or(zx::Status::OUT_OF_RANGE)?,
677 },
678 BlockOpcode::Flush => Operation::Flush,
679 BlockOpcode::Trim => Operation::Trim {
680 device_block_offset: request.dev_offset,
681 block_count: request.length,
682 },
683 BlockOpcode::CloseVmo => Operation::CloseVmo,
684 };
685 op.validate_and_adjust_range(&self.offset_map)
686 });
687
688 let sub_request_length =
689 if let Ok(Operation::Read { block_count, .. } | Operation::Write { block_count, .. }) =
690 &operation
691 {
692 if *block_count < old_length {
693 Some(*block_count)
694 } else {
695 None
696 }
697 } else {
698 None
699 };
700 is_split |= sub_request_length.is_some();
701
702 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
703 GroupOrRequest::Group(request.group)
704 } else if is_split {
705 GroupOrRequest::OneShotGroup(request.reqid)
706 } else {
707 GroupOrRequest::Request(request.reqid)
708 };
709
710 if group_or_request.is_group() {
711 let finishes_group = (!flags.contains(BlockIoFlag::GROUP_ITEM)
712 || flags.contains(BlockIoFlag::GROUP_LAST))
713 && (!is_split || sub_request_length.is_none());
714 let mut groups = self.message_groups.0.lock();
715 let group = groups.entry(group_or_request).or_insert_with(|| FifoMessageGroup::new());
716 if group.req_id.is_some() {
717 if group.status == zx::Status::OK {
719 group.status = zx::Status::INVALID_ARGS;
720 }
721 return DecodeResult::IgnoreRequest;
722 }
723 if finishes_group {
724 group.req_id = Some(request.reqid);
725 if group.status != zx::Status::OK {
727 operation = Err(group.status);
728 }
729 } else if group.status != zx::Status::OK {
730 return DecodeResult::IgnoreRequest;
733 }
734 group.count += 1;
735 group_or_request
736 } else {
737 GroupOrRequest::Request(request.reqid)
738 };
739 let request_tracking = RequestTracking {
740 group_or_request,
741 trace_flow_id: NonZero::new(request.trace_flow_id),
742 };
743
744 let vmo = match &operation {
745 Ok(Operation::Read { .. } | Operation::Write { .. }) => self
746 .vmos
747 .lock()
748 .get(&request.vmoid)
749 .cloned()
750 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
751 Ok(Operation::CloseVmo) => self
752 .vmos
753 .lock()
754 .remove(&request.vmoid)
755 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
756 _ => Ok(None),
757 }
758 .unwrap_or_else(|e| {
759 operation = Err(e);
760 None
761 });
762
763 let operation = match operation {
764 Ok(operation) => operation,
765 Err(status) => return DecodeResult::InvalidRequest(request_tracking, status),
766 };
767
768 {
769 use fuchsia_trace::ArgValue;
770 static CACHE: AtomicU64 = AtomicU64::new(0);
771 if let Some(context) =
772 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
773 {
774 let trace_args_with_group = [
775 ArgValue::of("group", u32::from(group_or_request.id())),
776 ArgValue::of("oneshot", group_or_request.is_oneshot()),
777 ArgValue::of("opcode", operation.trace_label()),
778 ];
779 let trace_args = [ArgValue::of("opcode", operation.trace_label())];
780 let _scope = if group_or_request.is_group() {
781 fuchsia_trace::duration(
782 c"storage",
783 c"block_server::start_transaction",
784 &trace_args_with_group,
785 )
786 } else {
787 fuchsia_trace::duration(
788 c"storage",
789 c"block_server::start_transaction",
790 &trace_args,
791 )
792 };
793 let trace_flow_id = NonZero::new(request.trace_flow_id);
794 if let Some(trace_flow_id) = trace_flow_id.clone() {
795 fuchsia_trace::flow_step(
796 &context,
797 c"block_server::start_transaction",
798 trace_flow_id.get().into(),
799 &[],
800 );
801 }
802 }
803 }
804
805 if let Some(sub_request_length) = sub_request_length {
806 request.dev_offset += sub_request_length as u64;
807 request.vmo_offset += sub_request_length as u64;
808 request.length -= sub_request_length;
809 DecodeResult::Split(DecodedRequest { request_tracking, operation, vmo })
810 } else {
811 DecodeResult::Ok(DecodedRequest { request_tracking, operation, vmo })
812 }
813 }
814
815 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
816 std::mem::take(&mut *self.vmos.lock())
817 }
818}
819
820#[derive(Debug)]
821enum DecodeResult {
822 Ok(DecodedRequest),
824 Split(DecodedRequest),
828 InvalidRequest(RequestTracking, zx::Status),
830 IgnoreRequest,
835}
836
837#[derive(Debug)]
838struct DecodedRequest {
839 request_tracking: RequestTracking,
840 operation: Operation,
842 vmo: Option<Arc<zx::Vmo>>,
843}
844
845pub type WriteOptions = block_protocol::WriteOptions;
847
848#[repr(C)]
849#[derive(Debug)]
850pub enum Operation {
851 Read {
856 device_block_offset: u64,
857 block_count: u32,
858 _unused: u32,
859 vmo_offset: u64,
860 },
861 Write {
862 device_block_offset: u64,
863 block_count: u32,
864 options: WriteOptions,
865 vmo_offset: u64,
866 },
867 Flush,
868 Trim {
869 device_block_offset: u64,
870 block_count: u32,
871 },
872 CloseVmo,
874}
875
876impl Operation {
877 fn validate_and_adjust_range(
882 mut self,
883 offset_map: &OffsetMap,
884 ) -> Result<Operation, zx::Status> {
885 let adjust_fn = |dev_offset, length, is_rw| {
886 if length == 0 {
888 return Err(zx::Status::INVALID_ARGS);
889 }
890 offset_map.adjust_request(dev_offset, length, is_rw).ok_or(zx::Status::OUT_OF_RANGE)
891 };
892 match &mut self {
893 Operation::Read { device_block_offset, block_count, .. } => {
894 (*device_block_offset, *block_count) =
895 adjust_fn(*device_block_offset, *block_count, true)?;
896 }
897 Operation::Write { device_block_offset, block_count, .. } => {
898 (*device_block_offset, *block_count) =
899 adjust_fn(*device_block_offset, *block_count, true)?;
900 }
901 Operation::Trim { device_block_offset, block_count, .. } => {
902 (*device_block_offset, *block_count) =
903 adjust_fn(*device_block_offset, *block_count, false)?;
904 }
905 _ => {}
906 }
907 Ok(self)
908 }
909 fn trace_label(&self) -> &'static str {
910 match self {
911 Operation::Read { .. } => "read",
912 Operation::Write { .. } => "write",
913 Operation::Flush { .. } => "flush",
914 Operation::Trim { .. } => "trim",
915 Operation::CloseVmo { .. } => "close_vmo",
916 }
917 }
918}
919
920#[derive(Clone, Copy, Debug)]
921pub struct RequestTracking {
922 group_or_request: GroupOrRequest,
923 trace_flow_id: Option<NonZero<u64>>,
924}
925
926#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
927pub enum GroupOrRequest {
928 Group(u16),
929 OneShotGroup(u32),
932 Request(u32),
933}
934
935impl GroupOrRequest {
936 fn is_group(&self) -> bool {
937 matches!(self, Self::Group(_) | Self::OneShotGroup(_))
938 }
939
940 fn is_oneshot(&self) -> bool {
941 matches!(self, Self::OneShotGroup(_))
942 }
943
944 fn group_id(&self) -> u16 {
946 match self {
947 Self::Group(id) => *id,
948 Self::Request(_) | Self::OneShotGroup(_) => 0,
949 }
950 }
951
952 fn id(&self) -> u32 {
953 match self {
954 Self::Group(id) => *id as u32,
955 Self::Request(id) => *id,
956 Self::OneShotGroup(id) => *id,
957 }
958 }
959}
960
961#[cfg(test)]
962mod tests {
963 use super::{BlockServer, DeviceInfo, PartitionInfo, FIFO_MAX_REQUESTS};
964 use assert_matches::assert_matches;
965 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
966 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
967 use fuchsia_sync::Mutex;
968 use futures::channel::oneshot;
969 use futures::future::BoxFuture;
970 use futures::FutureExt as _;
971 use std::borrow::Cow;
972 use std::future::poll_fn;
973 use std::num::NonZero;
974 use std::pin::pin;
975 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
976 use std::sync::Arc;
977 use std::task::{Context, Poll};
978 use zx::{AsHandleRef as _, HandleBased as _};
979 use {
980 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
981 fuchsia_async as fasync,
982 };
983
984 #[derive(Default)]
985 struct MockInterface {
986 read_hook: Option<
987 Box<
988 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
989 + Send
990 + Sync,
991 >,
992 >,
993 }
994
995 impl super::async_interface::Interface for MockInterface {
996 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
997 Ok(())
998 }
999
1000 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1001 Ok(Cow::Owned(test_device_info()))
1002 }
1003
1004 async fn read(
1005 &self,
1006 device_block_offset: u64,
1007 block_count: u32,
1008 vmo: &Arc<zx::Vmo>,
1009 vmo_offset: u64,
1010 _trace_flow_id: Option<NonZero<u64>>,
1011 ) -> Result<(), zx::Status> {
1012 if let Some(read_hook) = &self.read_hook {
1013 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1014 } else {
1015 unimplemented!();
1016 }
1017 }
1018
1019 async fn write(
1020 &self,
1021 _device_block_offset: u64,
1022 _block_count: u32,
1023 _vmo: &Arc<zx::Vmo>,
1024 _vmo_offset: u64,
1025 _opts: WriteOptions,
1026 _trace_flow_id: Option<NonZero<u64>>,
1027 ) -> Result<(), zx::Status> {
1028 unreachable!();
1029 }
1030
1031 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1032 unreachable!();
1033 }
1034
1035 async fn trim(
1036 &self,
1037 _device_block_offset: u64,
1038 _block_count: u32,
1039 _trace_flow_id: Option<NonZero<u64>>,
1040 ) -> Result<(), zx::Status> {
1041 unreachable!();
1042 }
1043
1044 async fn get_volume_info(
1045 &self,
1046 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1047 let () = std::future::pending().await;
1049 unreachable!();
1050 }
1051 }
1052
1053 const BLOCK_SIZE: u32 = 512;
1054
1055 fn test_device_info() -> DeviceInfo {
1056 DeviceInfo::Partition(PartitionInfo {
1057 device_flags: fblock::Flag::READONLY,
1058 max_transfer_blocks: None,
1059 block_range: Some(12..34),
1060 type_guid: [1; 16],
1061 instance_guid: [2; 16],
1062 name: "foo".to_string(),
1063 flags: 0xabcd,
1064 })
1065 }
1066
1067 #[fuchsia::test]
1068 async fn test_info() {
1069 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1070
1071 futures::join!(
1072 async {
1073 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1074 block_server.handle_requests(stream).await.unwrap();
1075 },
1076 async {
1077 let expected_info = test_device_info();
1078 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1079 info
1080 } else {
1081 unreachable!()
1082 };
1083
1084 let block_info = proxy.get_info().await.unwrap().unwrap();
1085 assert_eq!(
1086 block_info.block_count,
1087 partition_info.block_range.as_ref().unwrap().end
1088 - partition_info.block_range.as_ref().unwrap().start
1089 );
1090 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1091
1092 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1095 assert_eq!(status, zx::sys::ZX_OK);
1096 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1097
1098 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1099 assert_eq!(status, zx::sys::ZX_OK);
1100 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1101
1102 let (status, name) = proxy.get_name().await.unwrap();
1103 assert_eq!(status, zx::sys::ZX_OK);
1104 assert_eq!(name.as_ref(), Some(&partition_info.name));
1105
1106 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1107 assert_eq!(metadata.name, name);
1108 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1109 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1110 assert_eq!(
1111 metadata.start_block_offset,
1112 Some(partition_info.block_range.as_ref().unwrap().start)
1113 );
1114 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1115 assert_eq!(metadata.flags, Some(partition_info.flags));
1116
1117 std::mem::drop(proxy);
1118 }
1119 );
1120 }
1121
1122 #[fuchsia::test]
1123 async fn test_attach_vmo() {
1124 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1125
1126 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1127 let koid = vmo.get_koid().unwrap();
1128
1129 futures::join!(
1130 async {
1131 let block_server = BlockServer::new(
1132 BLOCK_SIZE,
1133 Arc::new(MockInterface {
1134 read_hook: Some(Box::new(move |_, _, vmo, _| {
1135 assert_eq!(vmo.get_koid().unwrap(), koid);
1136 Box::pin(async { Ok(()) })
1137 })),
1138 ..MockInterface::default()
1139 }),
1140 );
1141 block_server.handle_requests(stream).await.unwrap();
1142 },
1143 async move {
1144 let (session_proxy, server) = fidl::endpoints::create_proxy();
1145
1146 proxy.open_session(server).unwrap();
1147
1148 let vmo_id = session_proxy
1149 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1150 .await
1151 .unwrap()
1152 .unwrap();
1153 assert_ne!(vmo_id.id, 0);
1154
1155 let mut fifo =
1156 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1157 let (mut reader, mut writer) = fifo.async_io();
1158
1159 let mut count = 1;
1161 loop {
1162 match session_proxy
1163 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1164 .await
1165 .unwrap()
1166 {
1167 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1168 Err(e) => {
1169 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1170 break;
1171 }
1172 }
1173
1174 if count % 10 == 0 {
1176 writer
1177 .write_entries(&BlockFifoRequest {
1178 command: BlockFifoCommand {
1179 opcode: BlockOpcode::Read.into_primitive(),
1180 ..Default::default()
1181 },
1182 vmoid: vmo_id.id,
1183 length: 1,
1184 ..Default::default()
1185 })
1186 .await
1187 .unwrap();
1188
1189 let mut response = BlockFifoResponse::default();
1190 reader.read_entries(&mut response).await.unwrap();
1191 assert_eq!(response.status, zx::sys::ZX_OK);
1192 }
1193
1194 count += 1;
1195 }
1196
1197 assert_eq!(count, u16::MAX as u64);
1198
1199 writer
1201 .write_entries(&BlockFifoRequest {
1202 command: BlockFifoCommand {
1203 opcode: BlockOpcode::CloseVmo.into_primitive(),
1204 ..Default::default()
1205 },
1206 vmoid: vmo_id.id,
1207 ..Default::default()
1208 })
1209 .await
1210 .unwrap();
1211
1212 let mut response = BlockFifoResponse::default();
1213 reader.read_entries(&mut response).await.unwrap();
1214 assert_eq!(response.status, zx::sys::ZX_OK);
1215
1216 let new_vmo_id = session_proxy
1217 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1218 .await
1219 .unwrap()
1220 .unwrap();
1221 assert_eq!(new_vmo_id.id, vmo_id.id);
1223
1224 std::mem::drop(proxy);
1225 }
1226 );
1227 }
1228
1229 #[fuchsia::test]
1230 async fn test_close() {
1231 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1232
1233 let mut server = std::pin::pin!(async {
1234 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1235 block_server.handle_requests(stream).await.unwrap();
1236 }
1237 .fuse());
1238
1239 let mut client = std::pin::pin!(async {
1240 let (session_proxy, server) = fidl::endpoints::create_proxy();
1241
1242 proxy.open_session(server).unwrap();
1243
1244 std::mem::drop(proxy);
1247
1248 session_proxy.close().await.unwrap().unwrap();
1249
1250 let _: () = std::future::pending().await;
1252 }
1253 .fuse());
1254
1255 futures::select!(
1256 _ = server => {}
1257 _ = client => unreachable!(),
1258 );
1259 }
1260
1261 #[derive(Default)]
1262 struct IoMockInterface {
1263 do_checks: bool,
1264 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1265 return_errors: bool,
1266 }
1267
1268 #[derive(Debug)]
1269 enum ExpectedOp {
1270 Read(u64, u32, u64),
1271 Write(u64, u32, u64),
1272 Trim(u64, u32),
1273 Flush,
1274 }
1275
1276 impl super::async_interface::Interface for IoMockInterface {
1277 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1278 Ok(())
1279 }
1280
1281 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1282 Ok(Cow::Owned(test_device_info()))
1283 }
1284
1285 async fn read(
1286 &self,
1287 device_block_offset: u64,
1288 block_count: u32,
1289 _vmo: &Arc<zx::Vmo>,
1290 vmo_offset: u64,
1291 _trace_flow_id: Option<NonZero<u64>>,
1292 ) -> Result<(), zx::Status> {
1293 if self.return_errors {
1294 Err(zx::Status::INTERNAL)
1295 } else {
1296 if self.do_checks {
1297 assert_matches!(
1298 self.expected_op.lock().take(),
1299 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1300 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1301 "Read {device_block_offset} {block_count} {vmo_offset}"
1302 );
1303 }
1304 Ok(())
1305 }
1306 }
1307
1308 async fn write(
1309 &self,
1310 device_block_offset: u64,
1311 block_count: u32,
1312 _vmo: &Arc<zx::Vmo>,
1313 vmo_offset: u64,
1314 _opts: WriteOptions,
1315 _trace_flow_id: Option<NonZero<u64>>,
1316 ) -> Result<(), zx::Status> {
1317 if self.return_errors {
1318 Err(zx::Status::NOT_SUPPORTED)
1319 } else {
1320 if self.do_checks {
1321 assert_matches!(
1322 self.expected_op.lock().take(),
1323 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1324 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1325 "Write {device_block_offset} {block_count} {vmo_offset}"
1326 );
1327 }
1328 Ok(())
1329 }
1330 }
1331
1332 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1333 if self.return_errors {
1334 Err(zx::Status::NO_RESOURCES)
1335 } else {
1336 if self.do_checks {
1337 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1338 }
1339 Ok(())
1340 }
1341 }
1342
1343 async fn trim(
1344 &self,
1345 device_block_offset: u64,
1346 block_count: u32,
1347 _trace_flow_id: Option<NonZero<u64>>,
1348 ) -> Result<(), zx::Status> {
1349 if self.return_errors {
1350 Err(zx::Status::NO_MEMORY)
1351 } else {
1352 if self.do_checks {
1353 assert_matches!(
1354 self.expected_op.lock().take(),
1355 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1356 block_count == b,
1357 "Trim {device_block_offset} {block_count}"
1358 );
1359 }
1360 Ok(())
1361 }
1362 }
1363 }
1364
1365 #[fuchsia::test]
1366 async fn test_io() {
1367 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1368
1369 let expected_op = Arc::new(Mutex::new(None));
1370 let expected_op_clone = expected_op.clone();
1371
1372 let server = async {
1373 let block_server = BlockServer::new(
1374 BLOCK_SIZE,
1375 Arc::new(IoMockInterface {
1376 return_errors: false,
1377 do_checks: true,
1378 expected_op: expected_op_clone,
1379 }),
1380 );
1381 block_server.handle_requests(stream).await.unwrap();
1382 };
1383
1384 let client = async move {
1385 let (session_proxy, server) = fidl::endpoints::create_proxy();
1386
1387 proxy.open_session(server).unwrap();
1388
1389 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1390 let vmo_id = session_proxy
1391 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1392 .await
1393 .unwrap()
1394 .unwrap();
1395
1396 let mut fifo =
1397 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1398 let (mut reader, mut writer) = fifo.async_io();
1399
1400 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1402 writer
1403 .write_entries(&BlockFifoRequest {
1404 command: BlockFifoCommand {
1405 opcode: BlockOpcode::Read.into_primitive(),
1406 ..Default::default()
1407 },
1408 vmoid: vmo_id.id,
1409 dev_offset: 1,
1410 length: 2,
1411 vmo_offset: 3,
1412 ..Default::default()
1413 })
1414 .await
1415 .unwrap();
1416
1417 let mut response = BlockFifoResponse::default();
1418 reader.read_entries(&mut response).await.unwrap();
1419 assert_eq!(response.status, zx::sys::ZX_OK);
1420
1421 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1423 writer
1424 .write_entries(&BlockFifoRequest {
1425 command: BlockFifoCommand {
1426 opcode: BlockOpcode::Write.into_primitive(),
1427 ..Default::default()
1428 },
1429 vmoid: vmo_id.id,
1430 dev_offset: 4,
1431 length: 5,
1432 vmo_offset: 6,
1433 ..Default::default()
1434 })
1435 .await
1436 .unwrap();
1437
1438 let mut response = BlockFifoResponse::default();
1439 reader.read_entries(&mut response).await.unwrap();
1440 assert_eq!(response.status, zx::sys::ZX_OK);
1441
1442 *expected_op.lock() = Some(ExpectedOp::Flush);
1444 writer
1445 .write_entries(&BlockFifoRequest {
1446 command: BlockFifoCommand {
1447 opcode: BlockOpcode::Flush.into_primitive(),
1448 ..Default::default()
1449 },
1450 ..Default::default()
1451 })
1452 .await
1453 .unwrap();
1454
1455 reader.read_entries(&mut response).await.unwrap();
1456 assert_eq!(response.status, zx::sys::ZX_OK);
1457
1458 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1460 writer
1461 .write_entries(&BlockFifoRequest {
1462 command: BlockFifoCommand {
1463 opcode: BlockOpcode::Trim.into_primitive(),
1464 ..Default::default()
1465 },
1466 dev_offset: 7,
1467 length: 8,
1468 ..Default::default()
1469 })
1470 .await
1471 .unwrap();
1472
1473 reader.read_entries(&mut response).await.unwrap();
1474 assert_eq!(response.status, zx::sys::ZX_OK);
1475
1476 std::mem::drop(proxy);
1477 };
1478
1479 futures::join!(server, client);
1480 }
1481
1482 #[fuchsia::test]
1483 async fn test_io_errors() {
1484 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1485
1486 futures::join!(
1487 async {
1488 let block_server = BlockServer::new(
1489 BLOCK_SIZE,
1490 Arc::new(IoMockInterface {
1491 return_errors: true,
1492 do_checks: false,
1493 expected_op: Arc::new(Mutex::new(None)),
1494 }),
1495 );
1496 block_server.handle_requests(stream).await.unwrap();
1497 },
1498 async move {
1499 let (session_proxy, server) = fidl::endpoints::create_proxy();
1500
1501 proxy.open_session(server).unwrap();
1502
1503 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1504 let vmo_id = session_proxy
1505 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1506 .await
1507 .unwrap()
1508 .unwrap();
1509
1510 let mut fifo =
1511 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1512 let (mut reader, mut writer) = fifo.async_io();
1513
1514 writer
1516 .write_entries(&BlockFifoRequest {
1517 command: BlockFifoCommand {
1518 opcode: BlockOpcode::Read.into_primitive(),
1519 ..Default::default()
1520 },
1521 vmoid: vmo_id.id,
1522 length: 1,
1523 reqid: 1,
1524 ..Default::default()
1525 })
1526 .await
1527 .unwrap();
1528
1529 let mut response = BlockFifoResponse::default();
1530 reader.read_entries(&mut response).await.unwrap();
1531 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1532
1533 writer
1535 .write_entries(&BlockFifoRequest {
1536 command: BlockFifoCommand {
1537 opcode: BlockOpcode::Write.into_primitive(),
1538 ..Default::default()
1539 },
1540 vmoid: vmo_id.id,
1541 length: 1,
1542 reqid: 2,
1543 ..Default::default()
1544 })
1545 .await
1546 .unwrap();
1547
1548 reader.read_entries(&mut response).await.unwrap();
1549 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1550
1551 writer
1553 .write_entries(&BlockFifoRequest {
1554 command: BlockFifoCommand {
1555 opcode: BlockOpcode::Flush.into_primitive(),
1556 ..Default::default()
1557 },
1558 reqid: 3,
1559 ..Default::default()
1560 })
1561 .await
1562 .unwrap();
1563
1564 reader.read_entries(&mut response).await.unwrap();
1565 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1566
1567 writer
1569 .write_entries(&BlockFifoRequest {
1570 command: BlockFifoCommand {
1571 opcode: BlockOpcode::Trim.into_primitive(),
1572 ..Default::default()
1573 },
1574 reqid: 4,
1575 length: 1,
1576 ..Default::default()
1577 })
1578 .await
1579 .unwrap();
1580
1581 reader.read_entries(&mut response).await.unwrap();
1582 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1583
1584 std::mem::drop(proxy);
1585 }
1586 );
1587 }
1588
1589 #[fuchsia::test]
1590 async fn test_invalid_args() {
1591 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1592
1593 futures::join!(
1594 async {
1595 let block_server = BlockServer::new(
1596 BLOCK_SIZE,
1597 Arc::new(IoMockInterface {
1598 return_errors: false,
1599 do_checks: false,
1600 expected_op: Arc::new(Mutex::new(None)),
1601 }),
1602 );
1603 block_server.handle_requests(stream).await.unwrap();
1604 },
1605 async move {
1606 let (session_proxy, server) = fidl::endpoints::create_proxy();
1607
1608 proxy.open_session(server).unwrap();
1609
1610 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1611 let vmo_id = session_proxy
1612 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1613 .await
1614 .unwrap()
1615 .unwrap();
1616
1617 let mut fifo =
1618 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1619
1620 async fn test(
1621 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1622 request: BlockFifoRequest,
1623 ) -> Result<(), zx::Status> {
1624 let (mut reader, mut writer) = fifo.async_io();
1625 writer.write_entries(&request).await.unwrap();
1626 let mut response = BlockFifoResponse::default();
1627 reader.read_entries(&mut response).await.unwrap();
1628 zx::Status::ok(response.status)
1629 }
1630
1631 let good_read_request = || BlockFifoRequest {
1634 command: BlockFifoCommand {
1635 opcode: BlockOpcode::Read.into_primitive(),
1636 ..Default::default()
1637 },
1638 vmoid: vmo_id.id,
1639 ..Default::default()
1640 };
1641
1642 assert_eq!(
1643 test(
1644 &mut fifo,
1645 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1646 )
1647 .await,
1648 Err(zx::Status::INVALID_ARGS)
1649 );
1650
1651 assert_eq!(
1652 test(
1653 &mut fifo,
1654 BlockFifoRequest {
1655 vmo_offset: 0xffff_ffff_ffff_ffff,
1656 ..good_read_request()
1657 }
1658 )
1659 .await,
1660 Err(zx::Status::OUT_OF_RANGE)
1661 );
1662
1663 let good_write_request = || BlockFifoRequest {
1666 command: BlockFifoCommand {
1667 opcode: BlockOpcode::Write.into_primitive(),
1668 ..Default::default()
1669 },
1670 vmoid: vmo_id.id,
1671 ..Default::default()
1672 };
1673
1674 assert_eq!(
1675 test(
1676 &mut fifo,
1677 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1678 )
1679 .await,
1680 Err(zx::Status::INVALID_ARGS)
1681 );
1682
1683 assert_eq!(
1684 test(
1685 &mut fifo,
1686 BlockFifoRequest {
1687 vmo_offset: 0xffff_ffff_ffff_ffff,
1688 ..good_write_request()
1689 }
1690 )
1691 .await,
1692 Err(zx::Status::OUT_OF_RANGE)
1693 );
1694
1695 assert_eq!(
1698 test(
1699 &mut fifo,
1700 BlockFifoRequest {
1701 command: BlockFifoCommand {
1702 opcode: BlockOpcode::CloseVmo.into_primitive(),
1703 ..Default::default()
1704 },
1705 vmoid: vmo_id.id + 1,
1706 ..Default::default()
1707 }
1708 )
1709 .await,
1710 Err(zx::Status::IO)
1711 );
1712
1713 std::mem::drop(proxy);
1714 }
1715 );
1716 }
1717
1718 #[fuchsia::test]
1719 async fn test_concurrent_requests() {
1720 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1721
1722 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1723 let waiting_readers_clone = waiting_readers.clone();
1724
1725 futures::join!(
1726 async move {
1727 let block_server = BlockServer::new(
1728 BLOCK_SIZE,
1729 Arc::new(MockInterface {
1730 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1731 let (tx, rx) = oneshot::channel();
1732 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
1733 Box::pin(async move {
1734 let _ = rx.await;
1735 Ok(())
1736 })
1737 })),
1738 }),
1739 );
1740 block_server.handle_requests(stream).await.unwrap();
1741 },
1742 async move {
1743 let (session_proxy, server) = fidl::endpoints::create_proxy();
1744
1745 proxy.open_session(server).unwrap();
1746
1747 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1748 let vmo_id = session_proxy
1749 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1750 .await
1751 .unwrap()
1752 .unwrap();
1753
1754 let mut fifo =
1755 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1756 let (mut reader, mut writer) = fifo.async_io();
1757
1758 writer
1759 .write_entries(&BlockFifoRequest {
1760 command: BlockFifoCommand {
1761 opcode: BlockOpcode::Read.into_primitive(),
1762 ..Default::default()
1763 },
1764 reqid: 1,
1765 dev_offset: 1, vmoid: vmo_id.id,
1767 length: 1,
1768 ..Default::default()
1769 })
1770 .await
1771 .unwrap();
1772
1773 writer
1774 .write_entries(&BlockFifoRequest {
1775 command: BlockFifoCommand {
1776 opcode: BlockOpcode::Read.into_primitive(),
1777 ..Default::default()
1778 },
1779 reqid: 2,
1780 dev_offset: 2,
1781 vmoid: vmo_id.id,
1782 length: 1,
1783 ..Default::default()
1784 })
1785 .await
1786 .unwrap();
1787
1788 poll_fn(|cx: &mut Context<'_>| {
1790 if waiting_readers.lock().len() == 2 {
1791 Poll::Ready(())
1792 } else {
1793 cx.waker().wake_by_ref();
1795 Poll::Pending
1796 }
1797 })
1798 .await;
1799
1800 let mut response = BlockFifoResponse::default();
1801 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1802
1803 let (id, tx) = waiting_readers.lock().pop().unwrap();
1804 tx.send(()).unwrap();
1805
1806 reader.read_entries(&mut response).await.unwrap();
1807 assert_eq!(response.status, zx::sys::ZX_OK);
1808 assert_eq!(response.reqid, id);
1809
1810 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1811
1812 let (id, tx) = waiting_readers.lock().pop().unwrap();
1813 tx.send(()).unwrap();
1814
1815 reader.read_entries(&mut response).await.unwrap();
1816 assert_eq!(response.status, zx::sys::ZX_OK);
1817 assert_eq!(response.reqid, id);
1818 }
1819 );
1820 }
1821
1822 #[fuchsia::test]
1823 async fn test_groups() {
1824 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1825
1826 futures::join!(
1827 async move {
1828 let block_server = BlockServer::new(
1829 BLOCK_SIZE,
1830 Arc::new(MockInterface {
1831 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1832 }),
1833 );
1834 block_server.handle_requests(stream).await.unwrap();
1835 },
1836 async move {
1837 let (session_proxy, server) = fidl::endpoints::create_proxy();
1838
1839 proxy.open_session(server).unwrap();
1840
1841 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1842 let vmo_id = session_proxy
1843 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1844 .await
1845 .unwrap()
1846 .unwrap();
1847
1848 let mut fifo =
1849 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1850 let (mut reader, mut writer) = fifo.async_io();
1851
1852 writer
1853 .write_entries(&BlockFifoRequest {
1854 command: BlockFifoCommand {
1855 opcode: BlockOpcode::Read.into_primitive(),
1856 flags: BlockIoFlag::GROUP_ITEM.bits(),
1857 ..Default::default()
1858 },
1859 group: 1,
1860 vmoid: vmo_id.id,
1861 length: 1,
1862 ..Default::default()
1863 })
1864 .await
1865 .unwrap();
1866
1867 writer
1868 .write_entries(&BlockFifoRequest {
1869 command: BlockFifoCommand {
1870 opcode: BlockOpcode::Read.into_primitive(),
1871 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1872 ..Default::default()
1873 },
1874 reqid: 2,
1875 group: 1,
1876 vmoid: vmo_id.id,
1877 length: 1,
1878 ..Default::default()
1879 })
1880 .await
1881 .unwrap();
1882
1883 let mut response = BlockFifoResponse::default();
1884 reader.read_entries(&mut response).await.unwrap();
1885 assert_eq!(response.status, zx::sys::ZX_OK);
1886 assert_eq!(response.reqid, 2);
1887 assert_eq!(response.group, 1);
1888 }
1889 );
1890 }
1891
1892 #[fuchsia::test]
1893 async fn test_group_error() {
1894 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1895
1896 let counter = Arc::new(AtomicU64::new(0));
1897 let counter_clone = counter.clone();
1898
1899 futures::join!(
1900 async move {
1901 let block_server = BlockServer::new(
1902 BLOCK_SIZE,
1903 Arc::new(MockInterface {
1904 read_hook: Some(Box::new(move |_, _, _, _| {
1905 counter_clone.fetch_add(1, Ordering::Relaxed);
1906 Box::pin(async { Err(zx::Status::BAD_STATE) })
1907 })),
1908 }),
1909 );
1910 block_server.handle_requests(stream).await.unwrap();
1911 },
1912 async move {
1913 let (session_proxy, server) = fidl::endpoints::create_proxy();
1914
1915 proxy.open_session(server).unwrap();
1916
1917 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1918 let vmo_id = session_proxy
1919 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1920 .await
1921 .unwrap()
1922 .unwrap();
1923
1924 let mut fifo =
1925 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1926 let (mut reader, mut writer) = fifo.async_io();
1927
1928 writer
1929 .write_entries(&BlockFifoRequest {
1930 command: BlockFifoCommand {
1931 opcode: BlockOpcode::Read.into_primitive(),
1932 flags: BlockIoFlag::GROUP_ITEM.bits(),
1933 ..Default::default()
1934 },
1935 group: 1,
1936 vmoid: vmo_id.id,
1937 length: 1,
1938 ..Default::default()
1939 })
1940 .await
1941 .unwrap();
1942
1943 poll_fn(|cx: &mut Context<'_>| {
1945 if counter.load(Ordering::Relaxed) == 1 {
1946 Poll::Ready(())
1947 } else {
1948 cx.waker().wake_by_ref();
1950 Poll::Pending
1951 }
1952 })
1953 .await;
1954
1955 let mut response = BlockFifoResponse::default();
1956 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1957
1958 writer
1959 .write_entries(&BlockFifoRequest {
1960 command: BlockFifoCommand {
1961 opcode: BlockOpcode::Read.into_primitive(),
1962 flags: BlockIoFlag::GROUP_ITEM.bits(),
1963 ..Default::default()
1964 },
1965 group: 1,
1966 vmoid: vmo_id.id,
1967 length: 1,
1968 ..Default::default()
1969 })
1970 .await
1971 .unwrap();
1972
1973 writer
1974 .write_entries(&BlockFifoRequest {
1975 command: BlockFifoCommand {
1976 opcode: BlockOpcode::Read.into_primitive(),
1977 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1978 ..Default::default()
1979 },
1980 reqid: 2,
1981 group: 1,
1982 vmoid: vmo_id.id,
1983 length: 1,
1984 ..Default::default()
1985 })
1986 .await
1987 .unwrap();
1988
1989 reader.read_entries(&mut response).await.unwrap();
1990 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1991 assert_eq!(response.reqid, 2);
1992 assert_eq!(response.group, 1);
1993
1994 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1995
1996 assert_eq!(counter.load(Ordering::Relaxed), 1);
1998 }
1999 );
2000 }
2001
2002 #[fuchsia::test]
2003 async fn test_group_with_two_lasts() {
2004 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2005
2006 let (tx, rx) = oneshot::channel();
2007
2008 futures::join!(
2009 async move {
2010 let rx = Mutex::new(Some(rx));
2011 let block_server = BlockServer::new(
2012 BLOCK_SIZE,
2013 Arc::new(MockInterface {
2014 read_hook: Some(Box::new(move |_, _, _, _| {
2015 let rx = rx.lock().take().unwrap();
2016 Box::pin(async {
2017 let _ = rx.await;
2018 Ok(())
2019 })
2020 })),
2021 }),
2022 );
2023 block_server.handle_requests(stream).await.unwrap();
2024 },
2025 async move {
2026 let (session_proxy, server) = fidl::endpoints::create_proxy();
2027
2028 proxy.open_session(server).unwrap();
2029
2030 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2031 let vmo_id = session_proxy
2032 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2033 .await
2034 .unwrap()
2035 .unwrap();
2036
2037 let mut fifo =
2038 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2039 let (mut reader, mut writer) = fifo.async_io();
2040
2041 writer
2042 .write_entries(&BlockFifoRequest {
2043 command: BlockFifoCommand {
2044 opcode: BlockOpcode::Read.into_primitive(),
2045 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2046 ..Default::default()
2047 },
2048 reqid: 1,
2049 group: 1,
2050 vmoid: vmo_id.id,
2051 length: 1,
2052 ..Default::default()
2053 })
2054 .await
2055 .unwrap();
2056
2057 writer
2058 .write_entries(&BlockFifoRequest {
2059 command: BlockFifoCommand {
2060 opcode: BlockOpcode::Read.into_primitive(),
2061 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2062 ..Default::default()
2063 },
2064 reqid: 2,
2065 group: 1,
2066 vmoid: vmo_id.id,
2067 length: 1,
2068 ..Default::default()
2069 })
2070 .await
2071 .unwrap();
2072
2073 writer
2075 .write_entries(&BlockFifoRequest {
2076 command: BlockFifoCommand {
2077 opcode: BlockOpcode::CloseVmo.into_primitive(),
2078 ..Default::default()
2079 },
2080 reqid: 3,
2081 vmoid: vmo_id.id,
2082 ..Default::default()
2083 })
2084 .await
2085 .unwrap();
2086
2087 let mut response = BlockFifoResponse::default();
2089 reader.read_entries(&mut response).await.unwrap();
2090 assert_eq!(response.status, zx::sys::ZX_OK);
2091 assert_eq!(response.reqid, 3);
2092
2093 tx.send(()).unwrap();
2095
2096 let mut response = BlockFifoResponse::default();
2099 reader.read_entries(&mut response).await.unwrap();
2100 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2101 assert_eq!(response.reqid, 1);
2102 assert_eq!(response.group, 1);
2103 }
2104 );
2105 }
2106
2107 #[fuchsia::test(allow_stalls = false)]
2108 async fn test_requests_dont_block_sessions() {
2109 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2110
2111 let (tx, rx) = oneshot::channel();
2112
2113 fasync::Task::local(async move {
2114 let rx = Mutex::new(Some(rx));
2115 let block_server = BlockServer::new(
2116 BLOCK_SIZE,
2117 Arc::new(MockInterface {
2118 read_hook: Some(Box::new(move |_, _, _, _| {
2119 let rx = rx.lock().take().unwrap();
2120 Box::pin(async {
2121 let _ = rx.await;
2122 Ok(())
2123 })
2124 })),
2125 }),
2126 );
2127 block_server.handle_requests(stream).await.unwrap();
2128 })
2129 .detach();
2130
2131 let mut fut = pin!(async {
2132 let (session_proxy, server) = fidl::endpoints::create_proxy();
2133
2134 proxy.open_session(server).unwrap();
2135
2136 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2137 let vmo_id = session_proxy
2138 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2139 .await
2140 .unwrap()
2141 .unwrap();
2142
2143 let mut fifo =
2144 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2145 let (mut reader, mut writer) = fifo.async_io();
2146
2147 writer
2148 .write_entries(&BlockFifoRequest {
2149 command: BlockFifoCommand {
2150 opcode: BlockOpcode::Read.into_primitive(),
2151 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2152 ..Default::default()
2153 },
2154 reqid: 1,
2155 group: 1,
2156 vmoid: vmo_id.id,
2157 length: 1,
2158 ..Default::default()
2159 })
2160 .await
2161 .unwrap();
2162
2163 let mut response = BlockFifoResponse::default();
2164 reader.read_entries(&mut response).await.unwrap();
2165 assert_eq!(response.status, zx::sys::ZX_OK);
2166 });
2167
2168 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2170
2171 let mut fut2 = pin!(proxy.get_volume_info());
2172
2173 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2175
2176 let _ = tx.send(());
2179
2180 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2181 }
2182
2183 #[fuchsia::test]
2184 async fn test_request_flow_control() {
2185 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2186
2187 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2190 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2191 let event_clone = event.clone();
2192 futures::join!(
2193 async move {
2194 let block_server = BlockServer::new(
2195 BLOCK_SIZE,
2196 Arc::new(MockInterface {
2197 read_hook: Some(Box::new(move |_, _, _, _| {
2198 let event_clone = event_clone.clone();
2199 Box::pin(async move {
2200 if !event_clone.1.load(Ordering::SeqCst) {
2201 event_clone.0.listen().await;
2202 }
2203 Ok(())
2204 })
2205 })),
2206 }),
2207 );
2208 block_server.handle_requests(stream).await.unwrap();
2209 },
2210 async move {
2211 let (session_proxy, server) = fidl::endpoints::create_proxy();
2212
2213 proxy.open_session(server).unwrap();
2214
2215 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2216 let vmo_id = session_proxy
2217 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2218 .await
2219 .unwrap()
2220 .unwrap();
2221
2222 let mut fifo =
2223 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2224 let (mut reader, mut writer) = fifo.async_io();
2225
2226 for i in 0..MAX_REQUESTS {
2227 writer
2228 .write_entries(&BlockFifoRequest {
2229 command: BlockFifoCommand {
2230 opcode: BlockOpcode::Read.into_primitive(),
2231 ..Default::default()
2232 },
2233 reqid: (i + 1) as u32,
2234 dev_offset: i,
2235 vmoid: vmo_id.id,
2236 length: 1,
2237 ..Default::default()
2238 })
2239 .await
2240 .unwrap();
2241 }
2242 assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2243 command: BlockFifoCommand {
2244 opcode: BlockOpcode::Read.into_primitive(),
2245 ..Default::default()
2246 },
2247 reqid: u32::MAX,
2248 dev_offset: MAX_REQUESTS,
2249 vmoid: vmo_id.id,
2250 length: 1,
2251 ..Default::default()
2252 })))
2253 .is_pending());
2254 event.1.store(true, Ordering::SeqCst);
2256 event.0.notify(usize::MAX);
2257 let mut finished_reqids = vec![];
2259 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2260 let mut response = BlockFifoResponse::default();
2261 reader.read_entries(&mut response).await.unwrap();
2262 assert_eq!(response.status, zx::sys::ZX_OK);
2263 finished_reqids.push(response.reqid);
2264 writer
2265 .write_entries(&BlockFifoRequest {
2266 command: BlockFifoCommand {
2267 opcode: BlockOpcode::Read.into_primitive(),
2268 ..Default::default()
2269 },
2270 reqid: (i + 1) as u32,
2271 dev_offset: i,
2272 vmoid: vmo_id.id,
2273 length: 1,
2274 ..Default::default()
2275 })
2276 .await
2277 .unwrap();
2278 }
2279 let mut response = BlockFifoResponse::default();
2280 for _ in 0..MAX_REQUESTS {
2281 reader.read_entries(&mut response).await.unwrap();
2282 assert_eq!(response.status, zx::sys::ZX_OK);
2283 finished_reqids.push(response.reqid);
2284 }
2285 finished_reqids.sort();
2288 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2289 let mut i = 1;
2290 for reqid in finished_reqids {
2291 assert_eq!(reqid, i);
2292 i += 1;
2293 }
2294 }
2295 );
2296 }
2297
2298 #[fuchsia::test]
2299 async fn test_passthrough_io_with_fixed_map() {
2300 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2301
2302 let expected_op = Arc::new(Mutex::new(None));
2303 let expected_op_clone = expected_op.clone();
2304 futures::join!(
2305 async {
2306 let block_server = BlockServer::new(
2307 BLOCK_SIZE,
2308 Arc::new(IoMockInterface {
2309 return_errors: false,
2310 do_checks: true,
2311 expected_op: expected_op_clone,
2312 }),
2313 );
2314 block_server.handle_requests(stream).await.unwrap();
2315 },
2316 async move {
2317 let (session_proxy, server) = fidl::endpoints::create_proxy();
2318
2319 let mappings = [fblock::BlockOffsetMapping {
2320 source_block_offset: 0,
2321 target_block_offset: 10,
2322 length: 20,
2323 }];
2324 proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2325
2326 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2327 let vmo_id = session_proxy
2328 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2329 .await
2330 .unwrap()
2331 .unwrap();
2332
2333 let mut fifo =
2334 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2335 let (mut reader, mut writer) = fifo.async_io();
2336
2337 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2339 writer
2340 .write_entries(&BlockFifoRequest {
2341 command: BlockFifoCommand {
2342 opcode: BlockOpcode::Read.into_primitive(),
2343 ..Default::default()
2344 },
2345 vmoid: vmo_id.id,
2346 dev_offset: 1,
2347 length: 2,
2348 vmo_offset: 3,
2349 ..Default::default()
2350 })
2351 .await
2352 .unwrap();
2353
2354 let mut response = BlockFifoResponse::default();
2355 reader.read_entries(&mut response).await.unwrap();
2356 assert_eq!(response.status, zx::sys::ZX_OK);
2357
2358 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2360 writer
2361 .write_entries(&BlockFifoRequest {
2362 command: BlockFifoCommand {
2363 opcode: BlockOpcode::Write.into_primitive(),
2364 ..Default::default()
2365 },
2366 vmoid: vmo_id.id,
2367 dev_offset: 4,
2368 length: 5,
2369 vmo_offset: 6,
2370 ..Default::default()
2371 })
2372 .await
2373 .unwrap();
2374
2375 reader.read_entries(&mut response).await.unwrap();
2376 assert_eq!(response.status, zx::sys::ZX_OK);
2377
2378 *expected_op.lock() = Some(ExpectedOp::Flush);
2380 writer
2381 .write_entries(&BlockFifoRequest {
2382 command: BlockFifoCommand {
2383 opcode: BlockOpcode::Flush.into_primitive(),
2384 ..Default::default()
2385 },
2386 ..Default::default()
2387 })
2388 .await
2389 .unwrap();
2390
2391 reader.read_entries(&mut response).await.unwrap();
2392 assert_eq!(response.status, zx::sys::ZX_OK);
2393
2394 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2396 writer
2397 .write_entries(&BlockFifoRequest {
2398 command: BlockFifoCommand {
2399 opcode: BlockOpcode::Trim.into_primitive(),
2400 ..Default::default()
2401 },
2402 dev_offset: 7,
2403 length: 3,
2404 ..Default::default()
2405 })
2406 .await
2407 .unwrap();
2408
2409 reader.read_entries(&mut response).await.unwrap();
2410 assert_eq!(response.status, zx::sys::ZX_OK);
2411
2412 *expected_op.lock() = None;
2414 writer
2415 .write_entries(&BlockFifoRequest {
2416 command: BlockFifoCommand {
2417 opcode: BlockOpcode::Read.into_primitive(),
2418 ..Default::default()
2419 },
2420 vmoid: vmo_id.id,
2421 dev_offset: 19,
2422 length: 2,
2423 vmo_offset: 3,
2424 ..Default::default()
2425 })
2426 .await
2427 .unwrap();
2428
2429 reader.read_entries(&mut response).await.unwrap();
2430 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2431
2432 std::mem::drop(proxy);
2433 }
2434 );
2435 }
2436}