1use crate::bin::Bin;
5use anyhow::{anyhow, Error};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use zx::HandleBased;
19use {
20 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn block_count(&self) -> Option<u64> {
40 match self {
41 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42 Self::Partition(PartitionInfo { block_range, .. }) => {
43 block_range.as_ref().map(|range| range.end - range.start)
44 }
45 }
46 }
47
48 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49 match self {
50 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52 max_transfer_blocks.clone()
53 }
54 }
55 }
56
57 fn max_transfer_size(&self, block_size: u32) -> u32 {
58 if let Some(max_blocks) = self.max_transfer_blocks() {
59 max_blocks.get() * block_size
60 } else {
61 MAX_TRANSFER_UNBOUNDED
62 }
63 }
64}
65
66#[derive(Clone, Default)]
68pub struct BlockInfo {
69 pub device_flags: fblock::Flag,
70 pub block_count: u64,
71 pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74#[derive(Clone, Default)]
76pub struct PartitionInfo {
77 pub device_flags: fblock::Flag,
79 pub max_transfer_blocks: Option<NonZero<u32>>,
80 pub block_range: Option<Range<u64>>,
84 pub type_guid: [u8; 16],
85 pub instance_guid: [u8; 16],
86 pub name: String,
87 pub flags: u64,
88}
89
90struct ActiveRequest<S> {
93 session: S,
94 group_or_request: GroupOrRequest,
95 trace_flow_id: TraceFlowId,
96 vmo_bin_key: Option<usize>,
97 status: zx::Status,
98 count: u32,
99 req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105 fn default() -> Self {
106 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107 }
108}
109
110impl<S> ActiveRequests<S> {
111 fn complete_and_take_response(
112 &self,
113 request_id: RequestId,
114 status: zx::Status,
115 ) -> Option<(S, BlockFifoResponse)> {
116 self.0.lock().complete_and_take_response(request_id, status)
117 }
118}
119
120struct ActiveRequestsInner<S> {
121 requests: Slab<ActiveRequest<S>>,
122 vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125impl<S> ActiveRequestsInner<S> {
127 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129 let group = &mut self.requests[request_id.0];
130
131 group.count = group.count.checked_sub(1).unwrap();
132 if status != zx::Status::OK && group.status == zx::Status::OK {
133 group.status = status
134 }
135
136 fuchsia_trace::duration!(
137 c"storage",
138 c"block_server::finish_transaction",
139 "request_id" => request_id.0,
140 "group_completed" => group.count == 0,
141 "status" => status.into_raw());
142 if let Some(trace_flow_id) = group.trace_flow_id {
143 fuchsia_trace::flow_step!(
144 c"storage",
145 c"block_server::finish_request",
146 trace_flow_id.get().into()
147 );
148 }
149 }
150
151 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153 let group = &self.requests[request_id.0];
154 match group.req_id {
155 Some(reqid) if group.count == 0 => {
156 let group = self.requests.remove(request_id.0);
157 if let Some(vmo_bin_key) = group.vmo_bin_key {
158 self.vmo_bin.release(vmo_bin_key);
159 }
160 Some((
161 group.session,
162 BlockFifoResponse {
163 status: group.status.into_raw(),
164 reqid,
165 group: group.group_or_request.group_id().unwrap_or(0),
166 ..Default::default()
167 },
168 ))
169 }
170 _ => None,
171 }
172 }
173
174 fn complete_and_take_response(
176 &mut self,
177 request_id: RequestId,
178 status: zx::Status,
179 ) -> Option<(S, BlockFifoResponse)> {
180 self.complete(request_id, status);
181 self.take_response(request_id)
182 }
183}
184
185pub struct BlockServer<SM> {
188 block_size: u32,
189 session_manager: Arc<SM>,
190}
191
192#[derive(Debug)]
194pub struct BlockOffsetMapping {
195 source_block_offset: u64,
196 target_block_offset: u64,
197 length: u64,
198}
199
200impl BlockOffsetMapping {
201 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202 blocks.0 >= self.source_block_offset
203 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204 }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208 type Error = zx::Status;
209
210 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213 Ok(Self {
214 source_block_offset: wire.source_block_offset,
215 target_block_offset: wire.target_block_offset,
216 length: wire.length,
217 })
218 }
219}
220
221pub struct OffsetMap {
223 mapping: Option<BlockOffsetMapping>,
224 max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230 Self { mapping: Some(mapping), max_transfer_blocks }
231 }
232
233 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235 Self { mapping: None, max_transfer_blocks }
236 }
237
238 pub fn is_empty(&self) -> bool {
239 self.mapping.is_none()
240 }
241
242 fn mapping(&self) -> Option<&BlockOffsetMapping> {
243 self.mapping.as_ref()
244 }
245
246 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247 self.max_transfer_blocks
248 }
249}
250
251pub trait SessionManager: 'static {
254 type Session;
255
256 fn on_attach_vmo(
257 self: Arc<Self>,
258 vmo: &Arc<zx::Vmo>,
259 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261 fn open_session(
266 self: Arc<Self>,
267 stream: fblock::SessionRequestStream,
268 offset_map: OffsetMap,
269 block_size: u32,
270 ) -> impl Future<Output = Result<(), Error>> + Send;
271
272 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275 fn get_volume_info(
277 &self,
278 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279 {
280 async { Err(zx::Status::NOT_SUPPORTED) }
281 }
282
283 fn query_slices(
285 &self,
286 _start_slices: &[u64],
287 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288 async { Err(zx::Status::NOT_SUPPORTED) }
289 }
290
291 fn extend(
293 &self,
294 _start_slice: u64,
295 _slice_count: u64,
296 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297 async { Err(zx::Status::NOT_SUPPORTED) }
298 }
299
300 fn shrink(
302 &self,
303 _start_slice: u64,
304 _slice_count: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306 async { Err(zx::Status::NOT_SUPPORTED) }
307 }
308
309 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314 type SM: SessionManager;
315
316 fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321 Self { block_size, session_manager: session_manager.into_session_manager() }
322 }
323
324 pub async fn handle_requests(
326 &self,
327 mut requests: fvolume::VolumeRequestStream,
328 ) -> Result<(), Error> {
329 let scope = fasync::Scope::new();
330 while let Some(request) = requests.try_next().await.unwrap() {
331 if let Some(session) = self.handle_request(request).await? {
332 scope.spawn(session.map(|_| ()));
333 }
334 }
335 scope.await;
336 Ok(())
337 }
338
339 async fn handle_request(
342 &self,
343 request: fvolume::VolumeRequest,
344 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
345 match request {
346 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
347 Ok(info) => {
348 let max_transfer_size = info.max_transfer_size(self.block_size);
349 let (block_count, flags) = match info.as_ref() {
350 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
351 (*block_count, *device_flags)
352 }
353 DeviceInfo::Partition(partition_info) => {
354 let block_count = if let Some(range) =
355 partition_info.block_range.as_ref()
356 {
357 range.end - range.start
358 } else {
359 let volume_info = self.session_manager.get_volume_info().await?;
360 volume_info.0.slice_size * volume_info.1.partition_slice_count
361 / self.block_size as u64
362 };
363 (block_count, partition_info.device_flags)
364 }
365 };
366 responder.send(Ok(&fblock::BlockInfo {
367 block_count,
368 block_size: self.block_size,
369 max_transfer_size,
370 flags,
371 }))?;
372 }
373 Err(status) => responder.send(Err(status.into_raw()))?,
374 },
375 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
376 match self.device_info().await {
377 Ok(info) => {
378 return Ok(Some(self.session_manager.clone().open_session(
379 session.into_stream(),
380 OffsetMap::empty(info.max_transfer_blocks()),
381 self.block_size,
382 )));
383 }
384 Err(status) => session.close_with_epitaph(status)?,
385 }
386 }
387 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
388 session,
389 mapping,
390 control_handle: _,
391 } => match self.device_info().await {
392 Ok(info) => {
393 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
394 Ok(m) => m,
395 Err(status) => {
396 session.close_with_epitaph(status)?;
397 return Ok(None);
398 }
399 };
400 if let Some(max) = info.block_count() {
401 if initial_mapping.target_block_offset + initial_mapping.length > max {
402 log::warn!(
403 "Invalid mapping for session: {initial_mapping:?} (max {max})"
404 );
405 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
406 return Ok(None);
407 }
408 }
409 return Ok(Some(self.session_manager.clone().open_session(
410 session.into_stream(),
411 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
412 self.block_size,
413 )));
414 }
415 Err(status) => session.close_with_epitaph(status)?,
416 },
417 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
418 Ok(info) => {
419 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
420 let mut guid =
421 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
422 guid.value.copy_from_slice(&partition_info.type_guid);
423 responder.send(zx::sys::ZX_OK, Some(&guid))?;
424 } else {
425 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
426 }
427 }
428 Err(status) => {
429 responder.send(status.into_raw(), None)?;
430 }
431 },
432 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
433 match self.device_info().await {
434 Ok(info) => {
435 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
436 let mut guid =
437 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
438 guid.value.copy_from_slice(&partition_info.instance_guid);
439 responder.send(zx::sys::ZX_OK, Some(&guid))?;
440 } else {
441 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
442 }
443 }
444 Err(status) => {
445 responder.send(status.into_raw(), None)?;
446 }
447 }
448 }
449 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
450 Ok(info) => {
451 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
452 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
453 } else {
454 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
455 }
456 }
457 Err(status) => {
458 responder.send(status.into_raw(), None)?;
459 }
460 },
461 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
462 Ok(info) => {
463 if let DeviceInfo::Partition(info) = info.as_ref() {
464 let mut type_guid =
465 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
466 type_guid.value.copy_from_slice(&info.type_guid);
467 let mut instance_guid =
468 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
469 instance_guid.value.copy_from_slice(&info.instance_guid);
470 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
471 name: Some(info.name.clone()),
472 type_guid: Some(type_guid),
473 instance_guid: Some(instance_guid),
474 start_block_offset: info.block_range.as_ref().map(|range| range.start),
475 num_blocks: info
476 .block_range
477 .as_ref()
478 .map(|range| range.end - range.start),
479 flags: Some(info.flags),
480 ..Default::default()
481 }))?;
482 }
483 }
484 Err(status) => responder.send(Err(status.into_raw()))?,
485 },
486 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
487 match self.session_manager.query_slices(&start_slices).await {
488 Ok(mut results) => {
489 let results_len = results.len();
490 assert!(results_len <= 16);
491 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
492 responder.send(
493 zx::sys::ZX_OK,
494 &results.try_into().unwrap(),
495 results_len as u64,
496 )?;
497 }
498 Err(s) => {
499 responder.send(
500 s.into_raw(),
501 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
502 0,
503 )?;
504 }
505 }
506 }
507 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
508 match self.session_manager.get_volume_info().await {
509 Ok((manager_info, volume_info)) => {
510 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
511 }
512 Err(s) => responder.send(s.into_raw(), None, None)?,
513 }
514 }
515 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
516 responder.send(
517 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
518 .into_raw(),
519 )?;
520 }
521 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
522 responder.send(
523 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
524 .into_raw(),
525 )?;
526 }
527 fvolume::VolumeRequest::Destroy { responder, .. } => {
528 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
529 }
530 }
531 Ok(None)
532 }
533
534 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
535 self.session_manager.get_info().await
536 }
537}
538
539struct SessionHelper<SM: SessionManager> {
540 session_manager: Arc<SM>,
541 offset_map: OffsetMap,
542 block_size: u32,
543 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
544 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
545}
546
547impl<SM: SessionManager> SessionHelper<SM> {
548 fn new(
549 session_manager: Arc<SM>,
550 offset_map: OffsetMap,
551 block_size: u32,
552 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
553 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
554 Ok((
555 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
556 fifo,
557 ))
558 }
559
560 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
561 match request {
562 fblock::SessionRequest::GetFifo { responder } => {
563 let rights = zx::Rights::TRANSFER
564 | zx::Rights::READ
565 | zx::Rights::WRITE
566 | zx::Rights::SIGNAL
567 | zx::Rights::WAIT;
568 match self.peer_fifo.duplicate_handle(rights) {
569 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
570 Err(s) => responder.send(Err(s.into_raw()))?,
571 }
572 Ok(())
573 }
574 fblock::SessionRequest::AttachVmo { vmo, responder } => {
575 let vmo = Arc::new(vmo);
576 let vmo_id = {
577 let mut vmos = self.vmos.lock();
578 if vmos.len() == u16::MAX as usize {
579 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
580 return Ok(());
581 } else {
582 let vmo_id = match vmos.last_entry() {
583 None => 1,
584 Some(o) => {
585 o.key().checked_add(1).unwrap_or_else(|| {
586 let mut vmo_id = 1;
587 for (&id, _) in &*vmos {
589 if id > vmo_id {
590 break;
591 }
592 vmo_id = id + 1;
593 }
594 vmo_id
595 })
596 }
597 };
598 vmos.insert(vmo_id, vmo.clone());
599 vmo_id
600 }
601 };
602 self.session_manager.clone().on_attach_vmo(&vmo).await?;
603 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
604 Ok(())
605 }
606 fblock::SessionRequest::Close { responder } => {
607 responder.send(Ok(()))?;
608 Err(anyhow!("Closed"))
609 }
610 }
611 }
612
613 fn decode_fifo_request(
615 &self,
616 session: SM::Session,
617 request: &BlockFifoRequest,
618 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
619 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
620 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
621 .ok_or(zx::Status::INVALID_ARGS)
622 .and_then(|code| {
623 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
624 if request.length == 0 {
625 return Err(zx::Status::INVALID_ARGS);
626 }
627 if request.dev_offset.checked_add(request.length as u64).is_none()
629 || (code != BlockOpcode::Trim
630 && (request.length as u64 * self.block_size as u64)
631 .checked_add(request.vmo_offset)
632 .is_none())
633 {
634 return Err(zx::Status::OUT_OF_RANGE);
635 }
636 }
637 Ok(match code {
638 BlockOpcode::Read => Operation::Read {
639 device_block_offset: request.dev_offset,
640 block_count: request.length,
641 _unused: 0,
642 vmo_offset: request
643 .vmo_offset
644 .checked_mul(self.block_size as u64)
645 .ok_or(zx::Status::OUT_OF_RANGE)?,
646 },
647 BlockOpcode::Write => {
648 let mut options = WriteOptions::empty();
649 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
650 options |= WriteOptions::FORCE_ACCESS;
651 }
652 if flags.contains(BlockIoFlag::PRE_BARRIER) {
653 options |= WriteOptions::PRE_BARRIER;
654 }
655 Operation::Write {
656 device_block_offset: request.dev_offset,
657 block_count: request.length,
658 options: options,
659 vmo_offset: request
660 .vmo_offset
661 .checked_mul(self.block_size as u64)
662 .ok_or(zx::Status::OUT_OF_RANGE)?,
663 }
664 }
665 BlockOpcode::Flush => Operation::Flush,
666 BlockOpcode::Trim => Operation::Trim {
667 device_block_offset: request.dev_offset,
668 block_count: request.length,
669 },
670 BlockOpcode::CloseVmo => Operation::CloseVmo,
671 })
672 });
673
674 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
675 GroupOrRequest::Group(request.group)
676 } else {
677 GroupOrRequest::Request(request.reqid)
678 };
679
680 let mut active_requests = self.session_manager.active_requests().0.lock();
681 let mut request_id = None;
682
683 if group_or_request.is_group() {
693 for (key, group) in &mut active_requests.requests {
697 if group.group_or_request == group_or_request {
698 if group.req_id.is_some() {
699 if group.status == zx::Status::OK {
701 group.status = zx::Status::INVALID_ARGS;
702 }
703 return Err(None);
705 }
706 if flags.contains(BlockIoFlag::GROUP_LAST) {
707 group.req_id = Some(request.reqid);
708 if group.status != zx::Status::OK {
711 operation = Err(group.status);
712 }
713 } else if group.status != zx::Status::OK {
714 return Err(None);
717 }
718 request_id = Some(RequestId(key));
719 group.count += 1;
720 break;
721 }
722 }
723 }
724
725 let mut retain_vmo = false;
726 let vmo = match &operation {
727 Ok(Operation::Read { .. } | Operation::Write { .. }) => {
728 self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
729 retain_vmo = true;
730 Ok(Some(vmo))
731 })
732 }
733 Ok(Operation::CloseVmo) => {
734 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
735 active_requests.vmo_bin.add(vmo.clone());
736 Ok(Some(vmo))
737 })
738 }
739 _ => Ok(None),
740 }
741 .unwrap_or_else(|e| {
742 operation = Err(e);
743 None
744 });
745
746 let trace_flow_id = NonZero::new(request.trace_flow_id);
747 let request_id = request_id.unwrap_or_else(|| {
748 let vmo_bin_key =
749 if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
750 RequestId(active_requests.requests.insert(ActiveRequest {
751 session,
752 group_or_request,
753 trace_flow_id,
754 vmo_bin_key,
755 status: zx::Status::OK,
756 count: 1,
757 req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
758 || flags.contains(BlockIoFlag::GROUP_LAST)
759 {
760 Some(request.reqid)
761 } else {
762 None
763 },
764 }))
765 });
766
767 Ok(DecodedRequest {
768 request_id,
769 trace_flow_id,
770 operation: operation.map_err(|status| {
771 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
772 })?,
773 vmo,
774 })
775 }
776
777 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
778 std::mem::take(&mut *self.vmos.lock())
779 }
780
781 fn map_request(
783 &self,
784 mut request: DecodedRequest,
785 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
786 let mut active_requests = self.session_manager.active_requests().0.lock();
787 let active_request = &mut active_requests.requests[request.request_id.0];
788 if active_request.status != zx::Status::OK {
789 return Err(active_requests
790 .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
791 .map(|(_, r)| r));
792 }
793 let mapping = self.offset_map.mapping();
794 match (mapping, request.operation.blocks()) {
795 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
796 return Err(active_requests
797 .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
798 .map(|(_, r)| r));
799 }
800 _ => {}
801 }
802 let remainder = request.operation.map(
803 self.offset_map.mapping(),
804 self.offset_map.max_transfer_blocks(),
805 self.block_size,
806 );
807 if remainder.is_some() {
808 active_request.count += 1;
809 }
810 static CACHE: AtomicU64 = AtomicU64::new(0);
811 if let Some(context) =
812 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
813 {
814 use fuchsia_trace::ArgValue;
815 let trace_args = [
816 ArgValue::of("request_id", request.request_id.0),
817 ArgValue::of("opcode", request.operation.trace_label()),
818 ];
819 let _scope = fuchsia_trace::duration(
820 c"storage",
821 c"block_server::start_transaction",
822 &trace_args,
823 );
824 if let Some(trace_flow_id) = active_request.trace_flow_id {
825 fuchsia_trace::flow_step(
826 &context,
827 c"block_server::start_transaction",
828 trace_flow_id.get().into(),
829 &[],
830 );
831 }
832 }
833 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
834 Ok((request, remainder))
835 }
836
837 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
838 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
839 }
840}
841
842#[repr(transparent)]
843#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
844pub struct RequestId(usize);
845
846#[derive(Clone, Debug)]
847struct DecodedRequest {
848 request_id: RequestId,
849 trace_flow_id: TraceFlowId,
850 operation: Operation,
851 vmo: Option<Arc<zx::Vmo>>,
852}
853
854pub type WriteOptions = block_protocol::WriteOptions;
856
857#[repr(C)]
858#[derive(Clone, Debug, PartialEq, Eq)]
859pub enum Operation {
860 Read {
865 device_block_offset: u64,
866 block_count: u32,
867 _unused: u32,
868 vmo_offset: u64,
869 },
870 Write {
871 device_block_offset: u64,
872 block_count: u32,
873 options: WriteOptions,
874 vmo_offset: u64,
875 },
876 Flush,
877 Trim {
878 device_block_offset: u64,
879 block_count: u32,
880 },
881 CloseVmo,
883}
884
885impl Operation {
886 fn trace_label(&self) -> &'static str {
887 match self {
888 Operation::Read { .. } => "read",
889 Operation::Write { .. } => "write",
890 Operation::Flush { .. } => "flush",
891 Operation::Trim { .. } => "trim",
892 Operation::CloseVmo { .. } => "close_vmo",
893 }
894 }
895
896 fn blocks(&self) -> Option<(u64, u32)> {
898 match self {
899 Operation::Read { device_block_offset, block_count, .. }
900 | Operation::Write { device_block_offset, block_count, .. }
901 | Operation::Trim { device_block_offset, block_count, .. } => {
902 Some((*device_block_offset, *block_count))
903 }
904 _ => None,
905 }
906 }
907
908 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
910 match self {
911 Operation::Read { device_block_offset, block_count, .. }
912 | Operation::Write { device_block_offset, block_count, .. }
913 | Operation::Trim { device_block_offset, block_count, .. } => {
914 Some((device_block_offset, block_count))
915 }
916 _ => None,
917 }
918 }
919
920 fn map(
923 &mut self,
924 mapping: Option<&BlockOffsetMapping>,
925 max_blocks: Option<NonZero<u32>>,
926 block_size: u32,
927 ) -> Option<Self> {
928 let mut max = match self {
929 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
930 _ => None,
931 };
932 let (offset, length) = self.blocks_mut()?;
933 let orig_offset = *offset;
934 if let Some(mapping) = mapping {
935 let delta = *offset - mapping.source_block_offset;
936 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
937 *offset = mapping.target_block_offset + delta;
938 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
939 max = match max {
940 None => Some(mapping_max),
941 Some(m) => Some(std::cmp::min(m, mapping_max)),
942 };
943 };
944 if let Some(max) = max {
945 if *length as u64 > max {
946 let rem = (*length as u64 - max) as u32;
947 *length = max as u32;
948 return Some(match self {
949 Operation::Read {
950 device_block_offset: _,
951 block_count: _,
952 vmo_offset,
953 _unused,
954 } => Operation::Read {
955 device_block_offset: orig_offset + max,
956 block_count: rem,
957 vmo_offset: *vmo_offset + max * block_size as u64,
958 _unused: *_unused,
959 },
960 Operation::Write {
961 device_block_offset: _,
962 block_count: _,
963 vmo_offset,
964 options,
965 } => {
966 let options = *options & !WriteOptions::PRE_BARRIER;
968 Operation::Write {
969 device_block_offset: orig_offset + max,
970 block_count: rem,
971 vmo_offset: *vmo_offset + max * block_size as u64,
972 options,
973 }
974 }
975 Operation::Trim { device_block_offset: _, block_count: _ } => {
976 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
977 }
978 _ => unreachable!(),
979 });
980 }
981 }
982 None
983 }
984}
985
986#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
987pub enum GroupOrRequest {
988 Group(u16),
989 Request(u32),
990}
991
992impl GroupOrRequest {
993 fn is_group(&self) -> bool {
994 matches!(self, Self::Group(_))
995 }
996
997 fn group_id(&self) -> Option<u16> {
998 match self {
999 Self::Group(id) => Some(*id),
1000 Self::Request(_) => None,
1001 }
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::{
1008 BlockOffsetMapping, BlockServer, DeviceInfo, Operation, PartitionInfo, TraceFlowId,
1009 FIFO_MAX_REQUESTS,
1010 };
1011 use assert_matches::assert_matches;
1012 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
1013 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1014 use fuchsia_sync::Mutex;
1015 use futures::channel::oneshot;
1016 use futures::future::BoxFuture;
1017 use futures::FutureExt as _;
1018 use std::borrow::Cow;
1019 use std::future::poll_fn;
1020 use std::num::NonZero;
1021 use std::pin::pin;
1022 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1023 use std::sync::Arc;
1024 use std::task::{Context, Poll};
1025 use zx::{AsHandleRef as _, HandleBased as _};
1026 use {
1027 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1028 fuchsia_async as fasync,
1029 };
1030
1031 #[derive(Default)]
1032 struct MockInterface {
1033 read_hook: Option<
1034 Box<
1035 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1036 + Send
1037 + Sync,
1038 >,
1039 >,
1040 write_hook:
1041 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1042 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1043 }
1044
1045 impl super::async_interface::Interface for MockInterface {
1046 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1047 Ok(())
1048 }
1049
1050 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1051 Ok(Cow::Owned(test_device_info()))
1052 }
1053
1054 async fn read(
1055 &self,
1056 device_block_offset: u64,
1057 block_count: u32,
1058 vmo: &Arc<zx::Vmo>,
1059 vmo_offset: u64,
1060 _trace_flow_id: TraceFlowId,
1061 ) -> Result<(), zx::Status> {
1062 if let Some(read_hook) = &self.read_hook {
1063 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1064 } else {
1065 unimplemented!();
1066 }
1067 }
1068
1069 async fn write(
1070 &self,
1071 device_block_offset: u64,
1072 _block_count: u32,
1073 _vmo: &Arc<zx::Vmo>,
1074 _vmo_offset: u64,
1075 _opts: WriteOptions,
1076 _trace_flow_id: TraceFlowId,
1077 ) -> Result<(), zx::Status> {
1078 if let Some(write_hook) = &self.write_hook {
1079 write_hook(device_block_offset).await
1080 } else {
1081 unimplemented!();
1082 }
1083 }
1084
1085 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1086 unreachable!();
1087 }
1088
1089 fn barrier(&self) -> Result<(), zx::Status> {
1090 if let Some(barrier_hook) = &self.barrier_hook {
1091 barrier_hook()
1092 } else {
1093 unimplemented!();
1094 }
1095 }
1096
1097 async fn trim(
1098 &self,
1099 _device_block_offset: u64,
1100 _block_count: u32,
1101 _trace_flow_id: TraceFlowId,
1102 ) -> Result<(), zx::Status> {
1103 unreachable!();
1104 }
1105
1106 async fn get_volume_info(
1107 &self,
1108 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1109 let () = std::future::pending().await;
1111 unreachable!();
1112 }
1113 }
1114
1115 const BLOCK_SIZE: u32 = 512;
1116 const MAX_TRANSFER_BLOCKS: u32 = 10;
1117
1118 fn test_device_info() -> DeviceInfo {
1119 DeviceInfo::Partition(PartitionInfo {
1120 device_flags: fblock::Flag::READONLY,
1121 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1122 block_range: Some(0..100),
1123 type_guid: [1; 16],
1124 instance_guid: [2; 16],
1125 name: "foo".to_string(),
1126 flags: 0xabcd,
1127 })
1128 }
1129
1130 #[fuchsia::test]
1131 async fn test_split_request_only_issues_single_barrier() {
1132 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1133 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1134 let barrier_called = Arc::new(AtomicU32::new(0));
1135 let barrier_called_clone = barrier_called.clone();
1136
1137 futures::join!(
1138 async move {
1139 let block_server = BlockServer::new(
1140 BLOCK_SIZE,
1141 Arc::new(MockInterface {
1142 barrier_hook: Some(Box::new(move || {
1143 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1144 Ok(())
1145 })),
1146 write_hook: Some(Box::new(move |_device_block_offset| {
1147 Box::pin(async move { Ok(()) })
1148 })),
1149 ..MockInterface::default()
1150 }),
1151 );
1152 block_server.handle_requests(stream).await.unwrap();
1153 },
1154 async move {
1155 let (session_proxy, server) = fidl::endpoints::create_proxy();
1156
1157 proxy.open_session(server).unwrap();
1158
1159 let vmo_id = session_proxy
1160 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1161 .await
1162 .unwrap()
1163 .unwrap();
1164 assert_ne!(vmo_id.id, 0);
1165
1166 let mut fifo =
1167 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1168 let (mut reader, mut writer) = fifo.async_io();
1169 writer
1172 .write_entries(&BlockFifoRequest {
1173 command: BlockFifoCommand {
1174 opcode: BlockOpcode::Write.into_primitive(),
1175 flags: BlockIoFlag::PRE_BARRIER.bits(),
1176 ..Default::default()
1177 },
1178 vmoid: vmo_id.id,
1179 dev_offset: 0,
1180 length: MAX_TRANSFER_BLOCKS + 1,
1181 vmo_offset: 6,
1182 ..Default::default()
1183 })
1184 .await
1185 .unwrap();
1186
1187 let mut response = BlockFifoResponse::default();
1188 reader.read_entries(&mut response).await.unwrap();
1189 assert_eq!(response.status, zx::sys::ZX_OK);
1190
1191 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1192
1193 std::mem::drop(proxy);
1194 }
1195 );
1196 }
1197
1198 #[fuchsia::test]
1199 async fn test_barriers_not_supported() {
1200 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1201 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1202
1203 futures::join!(
1204 async move {
1205 let block_server = BlockServer::new(
1206 BLOCK_SIZE,
1207 Arc::new(MockInterface {
1208 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1209 write_hook: Some(Box::new(move |_device_block_offset| {
1210 Box::pin(async move { Ok(()) })
1211 })),
1212 ..MockInterface::default()
1213 }),
1214 );
1215 block_server.handle_requests(stream).await.unwrap();
1216 },
1217 async move {
1218 let (session_proxy, server) = fidl::endpoints::create_proxy();
1219
1220 proxy.open_session(server).unwrap();
1221
1222 let vmo_id = session_proxy
1223 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1224 .await
1225 .unwrap()
1226 .unwrap();
1227 assert_ne!(vmo_id.id, 0);
1228
1229 let mut fifo =
1230 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1231 let (mut reader, mut writer) = fifo.async_io();
1232
1233 writer
1236 .write_entries(&BlockFifoRequest {
1237 command: BlockFifoCommand {
1238 opcode: BlockOpcode::Write.into_primitive(),
1239 flags: BlockIoFlag::PRE_BARRIER.bits(),
1240 ..Default::default()
1241 },
1242 vmoid: vmo_id.id,
1243 dev_offset: 0,
1244 length: MAX_TRANSFER_BLOCKS + 1,
1245 vmo_offset: 6,
1246 ..Default::default()
1247 })
1248 .await
1249 .unwrap();
1250
1251 let mut response = BlockFifoResponse::default();
1252 reader.read_entries(&mut response).await.unwrap();
1253 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1254
1255 std::mem::drop(proxy);
1256 }
1257 );
1258 }
1259
1260 #[fuchsia::test]
1261 async fn test_barriers_ordering() {
1262 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1263 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1264 let barrier_called = Arc::new(AtomicBool::new(false));
1265
1266 futures::join!(
1267 async move {
1268 let barrier_called_clone = barrier_called.clone();
1269 let block_server = BlockServer::new(
1270 BLOCK_SIZE,
1271 Arc::new(MockInterface {
1272 barrier_hook: Some(Box::new(move || {
1273 barrier_called.store(true, Ordering::Relaxed);
1274 Ok(())
1275 })),
1276 write_hook: Some(Box::new(move |device_block_offset| {
1277 let barrier_called = barrier_called_clone.clone();
1278 Box::pin(async move {
1279 if device_block_offset % 2 == 0 {
1281 fasync::Timer::new(fasync::MonotonicInstant::after(
1282 zx::MonotonicDuration::from_millis(200),
1283 ))
1284 .await;
1285 }
1286 assert!(barrier_called.load(Ordering::Relaxed));
1287 Ok(())
1288 })
1289 })),
1290 ..MockInterface::default()
1291 }),
1292 );
1293 block_server.handle_requests(stream).await.unwrap();
1294 },
1295 async move {
1296 let (session_proxy, server) = fidl::endpoints::create_proxy();
1297
1298 proxy.open_session(server).unwrap();
1299
1300 let vmo_id = session_proxy
1301 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1302 .await
1303 .unwrap()
1304 .unwrap();
1305 assert_ne!(vmo_id.id, 0);
1306
1307 let mut fifo =
1308 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1309 let (mut reader, mut writer) = fifo.async_io();
1310
1311 writer
1312 .write_entries(&BlockFifoRequest {
1313 command: BlockFifoCommand {
1314 opcode: BlockOpcode::Write.into_primitive(),
1315 flags: BlockIoFlag::PRE_BARRIER.bits(),
1316 ..Default::default()
1317 },
1318 vmoid: vmo_id.id,
1319 dev_offset: 0,
1320 length: 5,
1321 vmo_offset: 6,
1322 ..Default::default()
1323 })
1324 .await
1325 .unwrap();
1326
1327 for i in 0..10 {
1328 writer
1329 .write_entries(&BlockFifoRequest {
1330 command: BlockFifoCommand {
1331 opcode: BlockOpcode::Write.into_primitive(),
1332 ..Default::default()
1333 },
1334 vmoid: vmo_id.id,
1335 dev_offset: i + 1,
1336 length: 5,
1337 vmo_offset: 6,
1338 ..Default::default()
1339 })
1340 .await
1341 .unwrap();
1342 }
1343 for _ in 0..11 {
1344 let mut response = BlockFifoResponse::default();
1345 reader.read_entries(&mut response).await.unwrap();
1346 assert_eq!(response.status, zx::sys::ZX_OK);
1347 }
1348
1349 std::mem::drop(proxy);
1350 }
1351 );
1352 }
1353
1354 #[fuchsia::test]
1355 async fn test_info() {
1356 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1357
1358 futures::join!(
1359 async {
1360 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1361 block_server.handle_requests(stream).await.unwrap();
1362 },
1363 async {
1364 let expected_info = test_device_info();
1365 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1366 info
1367 } else {
1368 unreachable!()
1369 };
1370
1371 let block_info = proxy.get_info().await.unwrap().unwrap();
1372 assert_eq!(
1373 block_info.block_count,
1374 partition_info.block_range.as_ref().unwrap().end
1375 - partition_info.block_range.as_ref().unwrap().start
1376 );
1377 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1378
1379 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1380
1381 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1382 assert_eq!(status, zx::sys::ZX_OK);
1383 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1384
1385 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1386 assert_eq!(status, zx::sys::ZX_OK);
1387 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1388
1389 let (status, name) = proxy.get_name().await.unwrap();
1390 assert_eq!(status, zx::sys::ZX_OK);
1391 assert_eq!(name.as_ref(), Some(&partition_info.name));
1392
1393 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1394 assert_eq!(metadata.name, name);
1395 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1396 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1397 assert_eq!(
1398 metadata.start_block_offset,
1399 Some(partition_info.block_range.as_ref().unwrap().start)
1400 );
1401 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1402 assert_eq!(metadata.flags, Some(partition_info.flags));
1403
1404 std::mem::drop(proxy);
1405 }
1406 );
1407 }
1408
1409 #[fuchsia::test]
1410 async fn test_attach_vmo() {
1411 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1412
1413 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1414 let koid = vmo.get_koid().unwrap();
1415
1416 futures::join!(
1417 async {
1418 let block_server = BlockServer::new(
1419 BLOCK_SIZE,
1420 Arc::new(MockInterface {
1421 read_hook: Some(Box::new(move |_, _, vmo, _| {
1422 assert_eq!(vmo.get_koid().unwrap(), koid);
1423 Box::pin(async { Ok(()) })
1424 })),
1425 ..MockInterface::default()
1426 }),
1427 );
1428 block_server.handle_requests(stream).await.unwrap();
1429 },
1430 async move {
1431 let (session_proxy, server) = fidl::endpoints::create_proxy();
1432
1433 proxy.open_session(server).unwrap();
1434
1435 let vmo_id = session_proxy
1436 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1437 .await
1438 .unwrap()
1439 .unwrap();
1440 assert_ne!(vmo_id.id, 0);
1441
1442 let mut fifo =
1443 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1444 let (mut reader, mut writer) = fifo.async_io();
1445
1446 let mut count = 1;
1448 loop {
1449 match session_proxy
1450 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1451 .await
1452 .unwrap()
1453 {
1454 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1455 Err(e) => {
1456 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1457 break;
1458 }
1459 }
1460
1461 if count % 10 == 0 {
1463 writer
1464 .write_entries(&BlockFifoRequest {
1465 command: BlockFifoCommand {
1466 opcode: BlockOpcode::Read.into_primitive(),
1467 ..Default::default()
1468 },
1469 vmoid: vmo_id.id,
1470 length: 1,
1471 ..Default::default()
1472 })
1473 .await
1474 .unwrap();
1475
1476 let mut response = BlockFifoResponse::default();
1477 reader.read_entries(&mut response).await.unwrap();
1478 assert_eq!(response.status, zx::sys::ZX_OK);
1479 }
1480
1481 count += 1;
1482 }
1483
1484 assert_eq!(count, u16::MAX as u64);
1485
1486 writer
1488 .write_entries(&BlockFifoRequest {
1489 command: BlockFifoCommand {
1490 opcode: BlockOpcode::CloseVmo.into_primitive(),
1491 ..Default::default()
1492 },
1493 vmoid: vmo_id.id,
1494 ..Default::default()
1495 })
1496 .await
1497 .unwrap();
1498
1499 let mut response = BlockFifoResponse::default();
1500 reader.read_entries(&mut response).await.unwrap();
1501 assert_eq!(response.status, zx::sys::ZX_OK);
1502
1503 let new_vmo_id = session_proxy
1504 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1505 .await
1506 .unwrap()
1507 .unwrap();
1508 assert_eq!(new_vmo_id.id, vmo_id.id);
1510
1511 std::mem::drop(proxy);
1512 }
1513 );
1514 }
1515
1516 #[fuchsia::test]
1517 async fn test_close() {
1518 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1519
1520 let mut server = std::pin::pin!(async {
1521 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1522 block_server.handle_requests(stream).await.unwrap();
1523 }
1524 .fuse());
1525
1526 let mut client = std::pin::pin!(async {
1527 let (session_proxy, server) = fidl::endpoints::create_proxy();
1528
1529 proxy.open_session(server).unwrap();
1530
1531 std::mem::drop(proxy);
1534
1535 session_proxy.close().await.unwrap().unwrap();
1536
1537 let _: () = std::future::pending().await;
1539 }
1540 .fuse());
1541
1542 futures::select!(
1543 _ = server => {}
1544 _ = client => unreachable!(),
1545 );
1546 }
1547
1548 #[derive(Default)]
1549 struct IoMockInterface {
1550 do_checks: bool,
1551 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1552 return_errors: bool,
1553 }
1554
1555 #[derive(Debug)]
1556 enum ExpectedOp {
1557 Read(u64, u32, u64),
1558 Write(u64, u32, u64),
1559 Trim(u64, u32),
1560 Flush,
1561 }
1562
1563 impl super::async_interface::Interface for IoMockInterface {
1564 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1565 Ok(())
1566 }
1567
1568 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1569 Ok(Cow::Owned(test_device_info()))
1570 }
1571
1572 async fn read(
1573 &self,
1574 device_block_offset: u64,
1575 block_count: u32,
1576 _vmo: &Arc<zx::Vmo>,
1577 vmo_offset: u64,
1578 _trace_flow_id: TraceFlowId,
1579 ) -> Result<(), zx::Status> {
1580 if self.return_errors {
1581 Err(zx::Status::INTERNAL)
1582 } else {
1583 if self.do_checks {
1584 assert_matches!(
1585 self.expected_op.lock().take(),
1586 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1587 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1588 "Read {device_block_offset} {block_count} {vmo_offset}"
1589 );
1590 }
1591 Ok(())
1592 }
1593 }
1594
1595 async fn write(
1596 &self,
1597 device_block_offset: u64,
1598 block_count: u32,
1599 _vmo: &Arc<zx::Vmo>,
1600 vmo_offset: u64,
1601 _opts: WriteOptions,
1602 _trace_flow_id: TraceFlowId,
1603 ) -> Result<(), zx::Status> {
1604 if self.return_errors {
1605 Err(zx::Status::NOT_SUPPORTED)
1606 } else {
1607 if self.do_checks {
1608 assert_matches!(
1609 self.expected_op.lock().take(),
1610 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1611 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1612 "Write {device_block_offset} {block_count} {vmo_offset}"
1613 );
1614 }
1615 Ok(())
1616 }
1617 }
1618
1619 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1620 if self.return_errors {
1621 Err(zx::Status::NO_RESOURCES)
1622 } else {
1623 if self.do_checks {
1624 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1625 }
1626 Ok(())
1627 }
1628 }
1629
1630 fn barrier(&self) -> Result<(), zx::Status> {
1631 unreachable!()
1632 }
1633
1634 async fn trim(
1635 &self,
1636 device_block_offset: u64,
1637 block_count: u32,
1638 _trace_flow_id: TraceFlowId,
1639 ) -> Result<(), zx::Status> {
1640 if self.return_errors {
1641 Err(zx::Status::NO_MEMORY)
1642 } else {
1643 if self.do_checks {
1644 assert_matches!(
1645 self.expected_op.lock().take(),
1646 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1647 block_count == b,
1648 "Trim {device_block_offset} {block_count}"
1649 );
1650 }
1651 Ok(())
1652 }
1653 }
1654 }
1655
1656 #[fuchsia::test]
1657 async fn test_io() {
1658 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1659
1660 let expected_op = Arc::new(Mutex::new(None));
1661 let expected_op_clone = expected_op.clone();
1662
1663 let server = async {
1664 let block_server = BlockServer::new(
1665 BLOCK_SIZE,
1666 Arc::new(IoMockInterface {
1667 return_errors: false,
1668 do_checks: true,
1669 expected_op: expected_op_clone,
1670 }),
1671 );
1672 block_server.handle_requests(stream).await.unwrap();
1673 };
1674
1675 let client = async move {
1676 let (session_proxy, server) = fidl::endpoints::create_proxy();
1677
1678 proxy.open_session(server).unwrap();
1679
1680 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1681 let vmo_id = session_proxy
1682 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1683 .await
1684 .unwrap()
1685 .unwrap();
1686
1687 let mut fifo =
1688 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1689 let (mut reader, mut writer) = fifo.async_io();
1690
1691 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1693 writer
1694 .write_entries(&BlockFifoRequest {
1695 command: BlockFifoCommand {
1696 opcode: BlockOpcode::Read.into_primitive(),
1697 ..Default::default()
1698 },
1699 vmoid: vmo_id.id,
1700 dev_offset: 1,
1701 length: 2,
1702 vmo_offset: 3,
1703 ..Default::default()
1704 })
1705 .await
1706 .unwrap();
1707
1708 let mut response = BlockFifoResponse::default();
1709 reader.read_entries(&mut response).await.unwrap();
1710 assert_eq!(response.status, zx::sys::ZX_OK);
1711
1712 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1714 writer
1715 .write_entries(&BlockFifoRequest {
1716 command: BlockFifoCommand {
1717 opcode: BlockOpcode::Write.into_primitive(),
1718 ..Default::default()
1719 },
1720 vmoid: vmo_id.id,
1721 dev_offset: 4,
1722 length: 5,
1723 vmo_offset: 6,
1724 ..Default::default()
1725 })
1726 .await
1727 .unwrap();
1728
1729 let mut response = BlockFifoResponse::default();
1730 reader.read_entries(&mut response).await.unwrap();
1731 assert_eq!(response.status, zx::sys::ZX_OK);
1732
1733 *expected_op.lock() = Some(ExpectedOp::Flush);
1735 writer
1736 .write_entries(&BlockFifoRequest {
1737 command: BlockFifoCommand {
1738 opcode: BlockOpcode::Flush.into_primitive(),
1739 ..Default::default()
1740 },
1741 ..Default::default()
1742 })
1743 .await
1744 .unwrap();
1745
1746 reader.read_entries(&mut response).await.unwrap();
1747 assert_eq!(response.status, zx::sys::ZX_OK);
1748
1749 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1751 writer
1752 .write_entries(&BlockFifoRequest {
1753 command: BlockFifoCommand {
1754 opcode: BlockOpcode::Trim.into_primitive(),
1755 ..Default::default()
1756 },
1757 dev_offset: 7,
1758 length: 8,
1759 ..Default::default()
1760 })
1761 .await
1762 .unwrap();
1763
1764 reader.read_entries(&mut response).await.unwrap();
1765 assert_eq!(response.status, zx::sys::ZX_OK);
1766
1767 std::mem::drop(proxy);
1768 };
1769
1770 futures::join!(server, client);
1771 }
1772
1773 #[fuchsia::test]
1774 async fn test_io_errors() {
1775 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1776
1777 futures::join!(
1778 async {
1779 let block_server = BlockServer::new(
1780 BLOCK_SIZE,
1781 Arc::new(IoMockInterface {
1782 return_errors: true,
1783 do_checks: false,
1784 expected_op: Arc::new(Mutex::new(None)),
1785 }),
1786 );
1787 block_server.handle_requests(stream).await.unwrap();
1788 },
1789 async move {
1790 let (session_proxy, server) = fidl::endpoints::create_proxy();
1791
1792 proxy.open_session(server).unwrap();
1793
1794 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1795 let vmo_id = session_proxy
1796 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1797 .await
1798 .unwrap()
1799 .unwrap();
1800
1801 let mut fifo =
1802 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1803 let (mut reader, mut writer) = fifo.async_io();
1804
1805 writer
1807 .write_entries(&BlockFifoRequest {
1808 command: BlockFifoCommand {
1809 opcode: BlockOpcode::Read.into_primitive(),
1810 ..Default::default()
1811 },
1812 vmoid: vmo_id.id,
1813 length: 1,
1814 reqid: 1,
1815 ..Default::default()
1816 })
1817 .await
1818 .unwrap();
1819
1820 let mut response = BlockFifoResponse::default();
1821 reader.read_entries(&mut response).await.unwrap();
1822 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1823
1824 writer
1826 .write_entries(&BlockFifoRequest {
1827 command: BlockFifoCommand {
1828 opcode: BlockOpcode::Write.into_primitive(),
1829 ..Default::default()
1830 },
1831 vmoid: vmo_id.id,
1832 length: 1,
1833 reqid: 2,
1834 ..Default::default()
1835 })
1836 .await
1837 .unwrap();
1838
1839 reader.read_entries(&mut response).await.unwrap();
1840 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1841
1842 writer
1844 .write_entries(&BlockFifoRequest {
1845 command: BlockFifoCommand {
1846 opcode: BlockOpcode::Flush.into_primitive(),
1847 ..Default::default()
1848 },
1849 reqid: 3,
1850 ..Default::default()
1851 })
1852 .await
1853 .unwrap();
1854
1855 reader.read_entries(&mut response).await.unwrap();
1856 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1857
1858 writer
1860 .write_entries(&BlockFifoRequest {
1861 command: BlockFifoCommand {
1862 opcode: BlockOpcode::Trim.into_primitive(),
1863 ..Default::default()
1864 },
1865 reqid: 4,
1866 length: 1,
1867 ..Default::default()
1868 })
1869 .await
1870 .unwrap();
1871
1872 reader.read_entries(&mut response).await.unwrap();
1873 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1874
1875 std::mem::drop(proxy);
1876 }
1877 );
1878 }
1879
1880 #[fuchsia::test]
1881 async fn test_invalid_args() {
1882 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1883
1884 futures::join!(
1885 async {
1886 let block_server = BlockServer::new(
1887 BLOCK_SIZE,
1888 Arc::new(IoMockInterface {
1889 return_errors: false,
1890 do_checks: false,
1891 expected_op: Arc::new(Mutex::new(None)),
1892 }),
1893 );
1894 block_server.handle_requests(stream).await.unwrap();
1895 },
1896 async move {
1897 let (session_proxy, server) = fidl::endpoints::create_proxy();
1898
1899 proxy.open_session(server).unwrap();
1900
1901 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1902 let vmo_id = session_proxy
1903 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1904 .await
1905 .unwrap()
1906 .unwrap();
1907
1908 let mut fifo =
1909 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1910
1911 async fn test(
1912 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1913 request: BlockFifoRequest,
1914 ) -> Result<(), zx::Status> {
1915 let (mut reader, mut writer) = fifo.async_io();
1916 writer.write_entries(&request).await.unwrap();
1917 let mut response = BlockFifoResponse::default();
1918 reader.read_entries(&mut response).await.unwrap();
1919 zx::Status::ok(response.status)
1920 }
1921
1922 let good_read_request = || BlockFifoRequest {
1925 command: BlockFifoCommand {
1926 opcode: BlockOpcode::Read.into_primitive(),
1927 ..Default::default()
1928 },
1929 length: 1,
1930 vmoid: vmo_id.id,
1931 ..Default::default()
1932 };
1933
1934 assert_eq!(
1935 test(
1936 &mut fifo,
1937 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1938 )
1939 .await,
1940 Err(zx::Status::IO)
1941 );
1942
1943 assert_eq!(
1944 test(
1945 &mut fifo,
1946 BlockFifoRequest {
1947 vmo_offset: 0xffff_ffff_ffff_ffff,
1948 ..good_read_request()
1949 }
1950 )
1951 .await,
1952 Err(zx::Status::OUT_OF_RANGE)
1953 );
1954
1955 assert_eq!(
1956 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1957 Err(zx::Status::INVALID_ARGS)
1958 );
1959
1960 let good_write_request = || BlockFifoRequest {
1963 command: BlockFifoCommand {
1964 opcode: BlockOpcode::Write.into_primitive(),
1965 ..Default::default()
1966 },
1967 length: 1,
1968 vmoid: vmo_id.id,
1969 ..Default::default()
1970 };
1971
1972 assert_eq!(
1973 test(
1974 &mut fifo,
1975 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1976 )
1977 .await,
1978 Err(zx::Status::IO)
1979 );
1980
1981 assert_eq!(
1982 test(
1983 &mut fifo,
1984 BlockFifoRequest {
1985 vmo_offset: 0xffff_ffff_ffff_ffff,
1986 ..good_write_request()
1987 }
1988 )
1989 .await,
1990 Err(zx::Status::OUT_OF_RANGE)
1991 );
1992
1993 assert_eq!(
1994 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
1995 Err(zx::Status::INVALID_ARGS)
1996 );
1997
1998 assert_eq!(
2001 test(
2002 &mut fifo,
2003 BlockFifoRequest {
2004 command: BlockFifoCommand {
2005 opcode: BlockOpcode::CloseVmo.into_primitive(),
2006 ..Default::default()
2007 },
2008 vmoid: vmo_id.id + 1,
2009 ..Default::default()
2010 }
2011 )
2012 .await,
2013 Err(zx::Status::IO)
2014 );
2015
2016 std::mem::drop(proxy);
2017 }
2018 );
2019 }
2020
2021 #[fuchsia::test]
2022 async fn test_concurrent_requests() {
2023 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2024
2025 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2026 let waiting_readers_clone = waiting_readers.clone();
2027
2028 futures::join!(
2029 async move {
2030 let block_server = BlockServer::new(
2031 BLOCK_SIZE,
2032 Arc::new(MockInterface {
2033 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2034 let (tx, rx) = oneshot::channel();
2035 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2036 Box::pin(async move {
2037 let _ = rx.await;
2038 Ok(())
2039 })
2040 })),
2041 ..MockInterface::default()
2042 }),
2043 );
2044 block_server.handle_requests(stream).await.unwrap();
2045 },
2046 async move {
2047 let (session_proxy, server) = fidl::endpoints::create_proxy();
2048
2049 proxy.open_session(server).unwrap();
2050
2051 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2052 let vmo_id = session_proxy
2053 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2054 .await
2055 .unwrap()
2056 .unwrap();
2057
2058 let mut fifo =
2059 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2060 let (mut reader, mut writer) = fifo.async_io();
2061
2062 writer
2063 .write_entries(&BlockFifoRequest {
2064 command: BlockFifoCommand {
2065 opcode: BlockOpcode::Read.into_primitive(),
2066 ..Default::default()
2067 },
2068 reqid: 1,
2069 dev_offset: 1, vmoid: vmo_id.id,
2071 length: 1,
2072 ..Default::default()
2073 })
2074 .await
2075 .unwrap();
2076
2077 writer
2078 .write_entries(&BlockFifoRequest {
2079 command: BlockFifoCommand {
2080 opcode: BlockOpcode::Read.into_primitive(),
2081 ..Default::default()
2082 },
2083 reqid: 2,
2084 dev_offset: 2,
2085 vmoid: vmo_id.id,
2086 length: 1,
2087 ..Default::default()
2088 })
2089 .await
2090 .unwrap();
2091
2092 poll_fn(|cx: &mut Context<'_>| {
2094 if waiting_readers.lock().len() == 2 {
2095 Poll::Ready(())
2096 } else {
2097 cx.waker().wake_by_ref();
2099 Poll::Pending
2100 }
2101 })
2102 .await;
2103
2104 let mut response = BlockFifoResponse::default();
2105 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2106
2107 let (id, tx) = waiting_readers.lock().pop().unwrap();
2108 tx.send(()).unwrap();
2109
2110 reader.read_entries(&mut response).await.unwrap();
2111 assert_eq!(response.status, zx::sys::ZX_OK);
2112 assert_eq!(response.reqid, id);
2113
2114 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2115
2116 let (id, tx) = waiting_readers.lock().pop().unwrap();
2117 tx.send(()).unwrap();
2118
2119 reader.read_entries(&mut response).await.unwrap();
2120 assert_eq!(response.status, zx::sys::ZX_OK);
2121 assert_eq!(response.reqid, id);
2122 }
2123 );
2124 }
2125
2126 #[fuchsia::test]
2127 async fn test_groups() {
2128 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2129
2130 futures::join!(
2131 async move {
2132 let block_server = BlockServer::new(
2133 BLOCK_SIZE,
2134 Arc::new(MockInterface {
2135 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2136 ..MockInterface::default()
2137 }),
2138 );
2139 block_server.handle_requests(stream).await.unwrap();
2140 },
2141 async move {
2142 let (session_proxy, server) = fidl::endpoints::create_proxy();
2143
2144 proxy.open_session(server).unwrap();
2145
2146 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2147 let vmo_id = session_proxy
2148 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2149 .await
2150 .unwrap()
2151 .unwrap();
2152
2153 let mut fifo =
2154 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2155 let (mut reader, mut writer) = fifo.async_io();
2156
2157 writer
2158 .write_entries(&BlockFifoRequest {
2159 command: BlockFifoCommand {
2160 opcode: BlockOpcode::Read.into_primitive(),
2161 flags: BlockIoFlag::GROUP_ITEM.bits(),
2162 ..Default::default()
2163 },
2164 group: 1,
2165 vmoid: vmo_id.id,
2166 length: 1,
2167 ..Default::default()
2168 })
2169 .await
2170 .unwrap();
2171
2172 writer
2173 .write_entries(&BlockFifoRequest {
2174 command: BlockFifoCommand {
2175 opcode: BlockOpcode::Read.into_primitive(),
2176 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2177 ..Default::default()
2178 },
2179 reqid: 2,
2180 group: 1,
2181 vmoid: vmo_id.id,
2182 length: 1,
2183 ..Default::default()
2184 })
2185 .await
2186 .unwrap();
2187
2188 let mut response = BlockFifoResponse::default();
2189 reader.read_entries(&mut response).await.unwrap();
2190 assert_eq!(response.status, zx::sys::ZX_OK);
2191 assert_eq!(response.reqid, 2);
2192 assert_eq!(response.group, 1);
2193 }
2194 );
2195 }
2196
2197 #[fuchsia::test]
2198 async fn test_group_error() {
2199 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2200
2201 let counter = Arc::new(AtomicU64::new(0));
2202 let counter_clone = counter.clone();
2203
2204 futures::join!(
2205 async move {
2206 let block_server = BlockServer::new(
2207 BLOCK_SIZE,
2208 Arc::new(MockInterface {
2209 read_hook: Some(Box::new(move |_, _, _, _| {
2210 counter_clone.fetch_add(1, Ordering::Relaxed);
2211 Box::pin(async { Err(zx::Status::BAD_STATE) })
2212 })),
2213 ..MockInterface::default()
2214 }),
2215 );
2216 block_server.handle_requests(stream).await.unwrap();
2217 },
2218 async move {
2219 let (session_proxy, server) = fidl::endpoints::create_proxy();
2220
2221 proxy.open_session(server).unwrap();
2222
2223 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2224 let vmo_id = session_proxy
2225 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2226 .await
2227 .unwrap()
2228 .unwrap();
2229
2230 let mut fifo =
2231 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2232 let (mut reader, mut writer) = fifo.async_io();
2233
2234 writer
2235 .write_entries(&BlockFifoRequest {
2236 command: BlockFifoCommand {
2237 opcode: BlockOpcode::Read.into_primitive(),
2238 flags: BlockIoFlag::GROUP_ITEM.bits(),
2239 ..Default::default()
2240 },
2241 group: 1,
2242 vmoid: vmo_id.id,
2243 length: 1,
2244 ..Default::default()
2245 })
2246 .await
2247 .unwrap();
2248
2249 poll_fn(|cx: &mut Context<'_>| {
2251 if counter.load(Ordering::Relaxed) == 1 {
2252 Poll::Ready(())
2253 } else {
2254 cx.waker().wake_by_ref();
2256 Poll::Pending
2257 }
2258 })
2259 .await;
2260
2261 let mut response = BlockFifoResponse::default();
2262 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2263
2264 writer
2265 .write_entries(&BlockFifoRequest {
2266 command: BlockFifoCommand {
2267 opcode: BlockOpcode::Read.into_primitive(),
2268 flags: BlockIoFlag::GROUP_ITEM.bits(),
2269 ..Default::default()
2270 },
2271 group: 1,
2272 vmoid: vmo_id.id,
2273 length: 1,
2274 ..Default::default()
2275 })
2276 .await
2277 .unwrap();
2278
2279 writer
2280 .write_entries(&BlockFifoRequest {
2281 command: BlockFifoCommand {
2282 opcode: BlockOpcode::Read.into_primitive(),
2283 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2284 ..Default::default()
2285 },
2286 reqid: 2,
2287 group: 1,
2288 vmoid: vmo_id.id,
2289 length: 1,
2290 ..Default::default()
2291 })
2292 .await
2293 .unwrap();
2294
2295 reader.read_entries(&mut response).await.unwrap();
2296 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2297 assert_eq!(response.reqid, 2);
2298 assert_eq!(response.group, 1);
2299
2300 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2301
2302 assert_eq!(counter.load(Ordering::Relaxed), 1);
2304 }
2305 );
2306 }
2307
2308 #[fuchsia::test]
2309 async fn test_group_with_two_lasts() {
2310 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2311
2312 let (tx, rx) = oneshot::channel();
2313
2314 futures::join!(
2315 async move {
2316 let rx = Mutex::new(Some(rx));
2317 let block_server = BlockServer::new(
2318 BLOCK_SIZE,
2319 Arc::new(MockInterface {
2320 read_hook: Some(Box::new(move |_, _, _, _| {
2321 let rx = rx.lock().take().unwrap();
2322 Box::pin(async {
2323 let _ = rx.await;
2324 Ok(())
2325 })
2326 })),
2327 ..MockInterface::default()
2328 }),
2329 );
2330 block_server.handle_requests(stream).await.unwrap();
2331 },
2332 async move {
2333 let (session_proxy, server) = fidl::endpoints::create_proxy();
2334
2335 proxy.open_session(server).unwrap();
2336
2337 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2338 let vmo_id = session_proxy
2339 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2340 .await
2341 .unwrap()
2342 .unwrap();
2343
2344 let mut fifo =
2345 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2346 let (mut reader, mut writer) = fifo.async_io();
2347
2348 writer
2349 .write_entries(&BlockFifoRequest {
2350 command: BlockFifoCommand {
2351 opcode: BlockOpcode::Read.into_primitive(),
2352 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2353 ..Default::default()
2354 },
2355 reqid: 1,
2356 group: 1,
2357 vmoid: vmo_id.id,
2358 length: 1,
2359 ..Default::default()
2360 })
2361 .await
2362 .unwrap();
2363
2364 writer
2365 .write_entries(&BlockFifoRequest {
2366 command: BlockFifoCommand {
2367 opcode: BlockOpcode::Read.into_primitive(),
2368 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2369 ..Default::default()
2370 },
2371 reqid: 2,
2372 group: 1,
2373 vmoid: vmo_id.id,
2374 length: 1,
2375 ..Default::default()
2376 })
2377 .await
2378 .unwrap();
2379
2380 writer
2382 .write_entries(&BlockFifoRequest {
2383 command: BlockFifoCommand {
2384 opcode: BlockOpcode::CloseVmo.into_primitive(),
2385 ..Default::default()
2386 },
2387 reqid: 3,
2388 vmoid: vmo_id.id,
2389 ..Default::default()
2390 })
2391 .await
2392 .unwrap();
2393
2394 let mut response = BlockFifoResponse::default();
2396 reader.read_entries(&mut response).await.unwrap();
2397 assert_eq!(response.status, zx::sys::ZX_OK);
2398 assert_eq!(response.reqid, 3);
2399
2400 tx.send(()).unwrap();
2402
2403 let mut response = BlockFifoResponse::default();
2406 reader.read_entries(&mut response).await.unwrap();
2407 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2408 assert_eq!(response.reqid, 1);
2409 assert_eq!(response.group, 1);
2410 }
2411 );
2412 }
2413
2414 #[fuchsia::test(allow_stalls = false)]
2415 async fn test_requests_dont_block_sessions() {
2416 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2417
2418 let (tx, rx) = oneshot::channel();
2419
2420 fasync::Task::local(async move {
2421 let rx = Mutex::new(Some(rx));
2422 let block_server = BlockServer::new(
2423 BLOCK_SIZE,
2424 Arc::new(MockInterface {
2425 read_hook: Some(Box::new(move |_, _, _, _| {
2426 let rx = rx.lock().take().unwrap();
2427 Box::pin(async {
2428 let _ = rx.await;
2429 Ok(())
2430 })
2431 })),
2432 ..MockInterface::default()
2433 }),
2434 );
2435 block_server.handle_requests(stream).await.unwrap();
2436 })
2437 .detach();
2438
2439 let mut fut = pin!(async {
2440 let (session_proxy, server) = fidl::endpoints::create_proxy();
2441
2442 proxy.open_session(server).unwrap();
2443
2444 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2445 let vmo_id = session_proxy
2446 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2447 .await
2448 .unwrap()
2449 .unwrap();
2450
2451 let mut fifo =
2452 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2453 let (mut reader, mut writer) = fifo.async_io();
2454
2455 writer
2456 .write_entries(&BlockFifoRequest {
2457 command: BlockFifoCommand {
2458 opcode: BlockOpcode::Read.into_primitive(),
2459 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2460 ..Default::default()
2461 },
2462 reqid: 1,
2463 group: 1,
2464 vmoid: vmo_id.id,
2465 length: 1,
2466 ..Default::default()
2467 })
2468 .await
2469 .unwrap();
2470
2471 let mut response = BlockFifoResponse::default();
2472 reader.read_entries(&mut response).await.unwrap();
2473 assert_eq!(response.status, zx::sys::ZX_OK);
2474 });
2475
2476 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2478
2479 let mut fut2 = pin!(proxy.get_volume_info());
2480
2481 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2483
2484 let _ = tx.send(());
2487
2488 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2489 }
2490
2491 #[fuchsia::test]
2492 async fn test_request_flow_control() {
2493 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2494
2495 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2498 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2499 let event_clone = event.clone();
2500 futures::join!(
2501 async move {
2502 let block_server = BlockServer::new(
2503 BLOCK_SIZE,
2504 Arc::new(MockInterface {
2505 read_hook: Some(Box::new(move |_, _, _, _| {
2506 let event_clone = event_clone.clone();
2507 Box::pin(async move {
2508 if !event_clone.1.load(Ordering::SeqCst) {
2509 event_clone.0.listen().await;
2510 }
2511 Ok(())
2512 })
2513 })),
2514 ..MockInterface::default()
2515 }),
2516 );
2517 block_server.handle_requests(stream).await.unwrap();
2518 },
2519 async move {
2520 let (session_proxy, server) = fidl::endpoints::create_proxy();
2521
2522 proxy.open_session(server).unwrap();
2523
2524 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2525 let vmo_id = session_proxy
2526 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2527 .await
2528 .unwrap()
2529 .unwrap();
2530
2531 let mut fifo =
2532 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2533 let (mut reader, mut writer) = fifo.async_io();
2534
2535 for i in 0..MAX_REQUESTS {
2536 writer
2537 .write_entries(&BlockFifoRequest {
2538 command: BlockFifoCommand {
2539 opcode: BlockOpcode::Read.into_primitive(),
2540 ..Default::default()
2541 },
2542 reqid: (i + 1) as u32,
2543 dev_offset: i,
2544 vmoid: vmo_id.id,
2545 length: 1,
2546 ..Default::default()
2547 })
2548 .await
2549 .unwrap();
2550 }
2551 assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2552 command: BlockFifoCommand {
2553 opcode: BlockOpcode::Read.into_primitive(),
2554 ..Default::default()
2555 },
2556 reqid: u32::MAX,
2557 dev_offset: MAX_REQUESTS,
2558 vmoid: vmo_id.id,
2559 length: 1,
2560 ..Default::default()
2561 })))
2562 .is_pending());
2563 event.1.store(true, Ordering::SeqCst);
2565 event.0.notify(usize::MAX);
2566 let mut finished_reqids = vec![];
2568 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2569 let mut response = BlockFifoResponse::default();
2570 reader.read_entries(&mut response).await.unwrap();
2571 assert_eq!(response.status, zx::sys::ZX_OK);
2572 finished_reqids.push(response.reqid);
2573 writer
2574 .write_entries(&BlockFifoRequest {
2575 command: BlockFifoCommand {
2576 opcode: BlockOpcode::Read.into_primitive(),
2577 ..Default::default()
2578 },
2579 reqid: (i + 1) as u32,
2580 dev_offset: i,
2581 vmoid: vmo_id.id,
2582 length: 1,
2583 ..Default::default()
2584 })
2585 .await
2586 .unwrap();
2587 }
2588 let mut response = BlockFifoResponse::default();
2589 for _ in 0..MAX_REQUESTS {
2590 reader.read_entries(&mut response).await.unwrap();
2591 assert_eq!(response.status, zx::sys::ZX_OK);
2592 finished_reqids.push(response.reqid);
2593 }
2594 finished_reqids.sort();
2597 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2598 let mut i = 1;
2599 for reqid in finished_reqids {
2600 assert_eq!(reqid, i);
2601 i += 1;
2602 }
2603 }
2604 );
2605 }
2606
2607 #[fuchsia::test]
2608 async fn test_passthrough_io_with_fixed_map() {
2609 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2610
2611 let expected_op = Arc::new(Mutex::new(None));
2612 let expected_op_clone = expected_op.clone();
2613 futures::join!(
2614 async {
2615 let block_server = BlockServer::new(
2616 BLOCK_SIZE,
2617 Arc::new(IoMockInterface {
2618 return_errors: false,
2619 do_checks: true,
2620 expected_op: expected_op_clone,
2621 }),
2622 );
2623 block_server.handle_requests(stream).await.unwrap();
2624 },
2625 async move {
2626 let (session_proxy, server) = fidl::endpoints::create_proxy();
2627
2628 let mapping = fblock::BlockOffsetMapping {
2629 source_block_offset: 0,
2630 target_block_offset: 10,
2631 length: 20,
2632 };
2633 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2634
2635 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2636 let vmo_id = session_proxy
2637 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2638 .await
2639 .unwrap()
2640 .unwrap();
2641
2642 let mut fifo =
2643 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2644 let (mut reader, mut writer) = fifo.async_io();
2645
2646 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2648 writer
2649 .write_entries(&BlockFifoRequest {
2650 command: BlockFifoCommand {
2651 opcode: BlockOpcode::Read.into_primitive(),
2652 ..Default::default()
2653 },
2654 vmoid: vmo_id.id,
2655 dev_offset: 1,
2656 length: 2,
2657 vmo_offset: 3,
2658 ..Default::default()
2659 })
2660 .await
2661 .unwrap();
2662
2663 let mut response = BlockFifoResponse::default();
2664 reader.read_entries(&mut response).await.unwrap();
2665 assert_eq!(response.status, zx::sys::ZX_OK);
2666
2667 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2669 writer
2670 .write_entries(&BlockFifoRequest {
2671 command: BlockFifoCommand {
2672 opcode: BlockOpcode::Write.into_primitive(),
2673 ..Default::default()
2674 },
2675 vmoid: vmo_id.id,
2676 dev_offset: 4,
2677 length: 5,
2678 vmo_offset: 6,
2679 ..Default::default()
2680 })
2681 .await
2682 .unwrap();
2683
2684 reader.read_entries(&mut response).await.unwrap();
2685 assert_eq!(response.status, zx::sys::ZX_OK);
2686
2687 *expected_op.lock() = Some(ExpectedOp::Flush);
2689 writer
2690 .write_entries(&BlockFifoRequest {
2691 command: BlockFifoCommand {
2692 opcode: BlockOpcode::Flush.into_primitive(),
2693 ..Default::default()
2694 },
2695 ..Default::default()
2696 })
2697 .await
2698 .unwrap();
2699
2700 reader.read_entries(&mut response).await.unwrap();
2701 assert_eq!(response.status, zx::sys::ZX_OK);
2702
2703 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2705 writer
2706 .write_entries(&BlockFifoRequest {
2707 command: BlockFifoCommand {
2708 opcode: BlockOpcode::Trim.into_primitive(),
2709 ..Default::default()
2710 },
2711 dev_offset: 7,
2712 length: 3,
2713 ..Default::default()
2714 })
2715 .await
2716 .unwrap();
2717
2718 reader.read_entries(&mut response).await.unwrap();
2719 assert_eq!(response.status, zx::sys::ZX_OK);
2720
2721 *expected_op.lock() = None;
2723 writer
2724 .write_entries(&BlockFifoRequest {
2725 command: BlockFifoCommand {
2726 opcode: BlockOpcode::Read.into_primitive(),
2727 ..Default::default()
2728 },
2729 vmoid: vmo_id.id,
2730 dev_offset: 19,
2731 length: 2,
2732 vmo_offset: 3,
2733 ..Default::default()
2734 })
2735 .await
2736 .unwrap();
2737
2738 reader.read_entries(&mut response).await.unwrap();
2739 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2740
2741 std::mem::drop(proxy);
2742 }
2743 );
2744 }
2745
2746 #[fuchsia::test]
2747 fn operation_map() {
2748 fn expect_map_result(
2749 mut operation: Operation,
2750 mapping: Option<BlockOffsetMapping>,
2751 max_blocks: Option<NonZero<u32>>,
2752 expected_operations: Vec<Operation>,
2753 ) {
2754 let mut ops = vec![];
2755 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2756 ops.push(operation);
2757 operation = remainder;
2758 }
2759 ops.push(operation);
2760 assert_eq!(ops, expected_operations);
2761 }
2762
2763 expect_map_result(
2765 Operation::Read {
2766 device_block_offset: 10,
2767 block_count: 200,
2768 _unused: 0,
2769 vmo_offset: 0,
2770 },
2771 None,
2772 None,
2773 vec![Operation::Read {
2774 device_block_offset: 10,
2775 block_count: 200,
2776 _unused: 0,
2777 vmo_offset: 0,
2778 }],
2779 );
2780
2781 expect_map_result(
2783 Operation::Read {
2784 device_block_offset: 10,
2785 block_count: 200,
2786 _unused: 0,
2787 vmo_offset: 0,
2788 },
2789 None,
2790 NonZero::new(120),
2791 vec![
2792 Operation::Read {
2793 device_block_offset: 10,
2794 block_count: 120,
2795 _unused: 0,
2796 vmo_offset: 0,
2797 },
2798 Operation::Read {
2799 device_block_offset: 130,
2800 block_count: 80,
2801 _unused: 0,
2802 vmo_offset: 120 * 512,
2803 },
2804 ],
2805 );
2806 expect_map_result(
2807 Operation::Write {
2808 device_block_offset: 10,
2809 block_count: 200,
2810 options: WriteOptions::PRE_BARRIER,
2811 vmo_offset: 0,
2812 },
2813 None,
2814 NonZero::new(120),
2815 vec![
2816 Operation::Write {
2817 device_block_offset: 10,
2818 block_count: 120,
2819 options: WriteOptions::PRE_BARRIER,
2820 vmo_offset: 0,
2821 },
2822 Operation::Write {
2823 device_block_offset: 130,
2824 block_count: 80,
2825 options: WriteOptions::empty(),
2826 vmo_offset: 120 * 512,
2827 },
2828 ],
2829 );
2830 expect_map_result(
2831 Operation::Trim { device_block_offset: 10, block_count: 200 },
2832 None,
2833 NonZero::new(120),
2834 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2835 );
2836
2837 expect_map_result(
2839 Operation::Read {
2840 device_block_offset: 10,
2841 block_count: 200,
2842 _unused: 0,
2843 vmo_offset: 0,
2844 },
2845 Some(BlockOffsetMapping {
2846 source_block_offset: 10,
2847 target_block_offset: 100,
2848 length: 200,
2849 }),
2850 NonZero::new(120),
2851 vec![
2852 Operation::Read {
2853 device_block_offset: 100,
2854 block_count: 120,
2855 _unused: 0,
2856 vmo_offset: 0,
2857 },
2858 Operation::Read {
2859 device_block_offset: 220,
2860 block_count: 80,
2861 _unused: 0,
2862 vmo_offset: 120 * 512,
2863 },
2864 ],
2865 );
2866 expect_map_result(
2867 Operation::Write {
2868 device_block_offset: 10,
2869 block_count: 200,
2870 options: WriteOptions::PRE_BARRIER,
2871 vmo_offset: 0,
2872 },
2873 Some(BlockOffsetMapping {
2874 source_block_offset: 10,
2875 target_block_offset: 100,
2876 length: 200,
2877 }),
2878 NonZero::new(120),
2879 vec![
2880 Operation::Write {
2881 device_block_offset: 100,
2882 block_count: 120,
2883 options: WriteOptions::PRE_BARRIER,
2884 vmo_offset: 0,
2885 },
2886 Operation::Write {
2887 device_block_offset: 220,
2888 block_count: 80,
2889 options: WriteOptions::empty(),
2890 vmo_offset: 120 * 512,
2891 },
2892 ],
2893 );
2894 expect_map_result(
2895 Operation::Trim { device_block_offset: 10, block_count: 200 },
2896 Some(BlockOffsetMapping {
2897 source_block_offset: 10,
2898 target_block_offset: 100,
2899 length: 200,
2900 }),
2901 NonZero::new(120),
2902 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2903 );
2904 }
2905}