1use crate::bin::Bin;
5use anyhow::{anyhow, Error};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use zx::HandleBased;
19use {
20 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn block_count(&self) -> Option<u64> {
40 match self {
41 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42 Self::Partition(PartitionInfo { block_range, .. }) => {
43 block_range.as_ref().map(|range| range.end - range.start)
44 }
45 }
46 }
47
48 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49 match self {
50 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52 max_transfer_blocks.clone()
53 }
54 }
55 }
56
57 fn max_transfer_size(&self, block_size: u32) -> u32 {
58 if let Some(max_blocks) = self.max_transfer_blocks() {
59 max_blocks.get() * block_size
60 } else {
61 MAX_TRANSFER_UNBOUNDED
62 }
63 }
64}
65
66#[derive(Clone, Default)]
68pub struct BlockInfo {
69 pub device_flags: fblock::Flag,
70 pub block_count: u64,
71 pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74#[derive(Clone, Default)]
76pub struct PartitionInfo {
77 pub device_flags: fblock::Flag,
79 pub max_transfer_blocks: Option<NonZero<u32>>,
80 pub block_range: Option<Range<u64>>,
84 pub type_guid: [u8; 16],
85 pub instance_guid: [u8; 16],
86 pub name: String,
87 pub flags: u64,
88}
89
90struct ActiveRequest<S> {
93 session: S,
94 group_or_request: GroupOrRequest,
95 trace_flow_id: TraceFlowId,
96 vmo_bin_key: Option<usize>,
97 status: zx::Status,
98 count: u32,
99 req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105 fn default() -> Self {
106 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107 }
108}
109
110impl<S> ActiveRequests<S> {
111 fn complete_and_take_response(
112 &self,
113 request_id: RequestId,
114 status: zx::Status,
115 ) -> Option<(S, BlockFifoResponse)> {
116 self.0.lock().complete_and_take_response(request_id, status)
117 }
118}
119
120struct ActiveRequestsInner<S> {
121 requests: Slab<ActiveRequest<S>>,
122 vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125impl<S> ActiveRequestsInner<S> {
127 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129 let group = &mut self.requests[request_id.0];
130
131 group.count = group.count.checked_sub(1).unwrap();
132 if status != zx::Status::OK && group.status == zx::Status::OK {
133 group.status = status
134 }
135
136 fuchsia_trace::duration!(
137 c"storage",
138 c"block_server::finish_transaction",
139 "request_id" => request_id.0,
140 "group_completed" => group.count == 0,
141 "status" => status.into_raw());
142 if let Some(trace_flow_id) = group.trace_flow_id {
143 fuchsia_trace::flow_step!(
144 c"storage",
145 c"block_server::finish_request",
146 trace_flow_id.get().into()
147 );
148 }
149 }
150
151 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153 let group = &self.requests[request_id.0];
154 match group.req_id {
155 Some(reqid) if group.count == 0 => {
156 let group = self.requests.remove(request_id.0);
157 if let Some(vmo_bin_key) = group.vmo_bin_key {
158 self.vmo_bin.release(vmo_bin_key);
159 }
160 Some((
161 group.session,
162 BlockFifoResponse {
163 status: group.status.into_raw(),
164 reqid,
165 group: group.group_or_request.group_id().unwrap_or(0),
166 ..Default::default()
167 },
168 ))
169 }
170 _ => None,
171 }
172 }
173
174 fn complete_and_take_response(
176 &mut self,
177 request_id: RequestId,
178 status: zx::Status,
179 ) -> Option<(S, BlockFifoResponse)> {
180 self.complete(request_id, status);
181 self.take_response(request_id)
182 }
183}
184
185pub struct BlockServer<SM> {
188 block_size: u32,
189 session_manager: Arc<SM>,
190}
191
192#[derive(Debug)]
194pub struct BlockOffsetMapping {
195 source_block_offset: u64,
196 target_block_offset: u64,
197 length: u64,
198}
199
200impl BlockOffsetMapping {
201 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202 blocks.0 >= self.source_block_offset
203 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204 }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208 type Error = zx::Status;
209
210 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213 Ok(Self {
214 source_block_offset: wire.source_block_offset,
215 target_block_offset: wire.target_block_offset,
216 length: wire.length,
217 })
218 }
219}
220
221pub struct OffsetMap {
223 mapping: Option<BlockOffsetMapping>,
224 max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230 Self { mapping: Some(mapping), max_transfer_blocks }
231 }
232
233 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235 Self { mapping: None, max_transfer_blocks }
236 }
237
238 pub fn is_empty(&self) -> bool {
239 self.mapping.is_none()
240 }
241
242 fn mapping(&self) -> Option<&BlockOffsetMapping> {
243 self.mapping.as_ref()
244 }
245
246 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247 self.max_transfer_blocks
248 }
249}
250
251pub trait SessionManager: 'static {
254 type Session;
255
256 fn on_attach_vmo(
257 self: Arc<Self>,
258 vmo: &Arc<zx::Vmo>,
259 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261 fn open_session(
266 self: Arc<Self>,
267 stream: fblock::SessionRequestStream,
268 offset_map: OffsetMap,
269 block_size: u32,
270 ) -> impl Future<Output = Result<(), Error>> + Send;
271
272 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275 fn get_volume_info(
277 &self,
278 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279 {
280 async { Err(zx::Status::NOT_SUPPORTED) }
281 }
282
283 fn query_slices(
285 &self,
286 _start_slices: &[u64],
287 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288 async { Err(zx::Status::NOT_SUPPORTED) }
289 }
290
291 fn extend(
293 &self,
294 _start_slice: u64,
295 _slice_count: u64,
296 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297 async { Err(zx::Status::NOT_SUPPORTED) }
298 }
299
300 fn shrink(
302 &self,
303 _start_slice: u64,
304 _slice_count: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306 async { Err(zx::Status::NOT_SUPPORTED) }
307 }
308
309 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314 type SM: SessionManager;
315
316 fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321 Self { block_size, session_manager: session_manager.into_session_manager() }
322 }
323
324 pub fn session_manager(&self) -> &SM {
325 self.session_manager.as_ref()
326 }
327
328 pub async fn handle_requests(
330 &self,
331 mut requests: fvolume::VolumeRequestStream,
332 ) -> Result<(), Error> {
333 let scope = fasync::Scope::new();
334 while let Some(request) = requests.try_next().await.unwrap() {
335 if let Some(session) = self.handle_request(request).await? {
336 scope.spawn(session.map(|_| ()));
337 }
338 }
339 scope.await;
340 Ok(())
341 }
342
343 async fn handle_request(
346 &self,
347 request: fvolume::VolumeRequest,
348 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
349 match request {
350 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
351 Ok(info) => {
352 let max_transfer_size = info.max_transfer_size(self.block_size);
353 let (block_count, flags) = match info.as_ref() {
354 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
355 (*block_count, *device_flags)
356 }
357 DeviceInfo::Partition(partition_info) => {
358 let block_count = if let Some(range) =
359 partition_info.block_range.as_ref()
360 {
361 range.end - range.start
362 } else {
363 let volume_info = self.session_manager.get_volume_info().await?;
364 volume_info.0.slice_size * volume_info.1.partition_slice_count
365 / self.block_size as u64
366 };
367 (block_count, partition_info.device_flags)
368 }
369 };
370 responder.send(Ok(&fblock::BlockInfo {
371 block_count,
372 block_size: self.block_size,
373 max_transfer_size,
374 flags,
375 }))?;
376 }
377 Err(status) => responder.send(Err(status.into_raw()))?,
378 },
379 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
380 match self.device_info().await {
381 Ok(info) => {
382 return Ok(Some(self.session_manager.clone().open_session(
383 session.into_stream(),
384 OffsetMap::empty(info.max_transfer_blocks()),
385 self.block_size,
386 )));
387 }
388 Err(status) => session.close_with_epitaph(status)?,
389 }
390 }
391 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
392 session,
393 mapping,
394 control_handle: _,
395 } => match self.device_info().await {
396 Ok(info) => {
397 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
398 Ok(m) => m,
399 Err(status) => {
400 session.close_with_epitaph(status)?;
401 return Ok(None);
402 }
403 };
404 if let Some(max) = info.block_count() {
405 if initial_mapping.target_block_offset + initial_mapping.length > max {
406 log::warn!(
407 "Invalid mapping for session: {initial_mapping:?} (max {max})"
408 );
409 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
410 return Ok(None);
411 }
412 }
413 return Ok(Some(self.session_manager.clone().open_session(
414 session.into_stream(),
415 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
416 self.block_size,
417 )));
418 }
419 Err(status) => session.close_with_epitaph(status)?,
420 },
421 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
422 Ok(info) => {
423 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
424 let mut guid =
425 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
426 guid.value.copy_from_slice(&partition_info.type_guid);
427 responder.send(zx::sys::ZX_OK, Some(&guid))?;
428 } else {
429 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
430 }
431 }
432 Err(status) => {
433 responder.send(status.into_raw(), None)?;
434 }
435 },
436 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
437 match self.device_info().await {
438 Ok(info) => {
439 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
440 let mut guid =
441 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
442 guid.value.copy_from_slice(&partition_info.instance_guid);
443 responder.send(zx::sys::ZX_OK, Some(&guid))?;
444 } else {
445 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
446 }
447 }
448 Err(status) => {
449 responder.send(status.into_raw(), None)?;
450 }
451 }
452 }
453 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
454 Ok(info) => {
455 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
456 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
457 } else {
458 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
459 }
460 }
461 Err(status) => {
462 responder.send(status.into_raw(), None)?;
463 }
464 },
465 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
466 Ok(info) => {
467 if let DeviceInfo::Partition(info) = info.as_ref() {
468 let mut type_guid =
469 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
470 type_guid.value.copy_from_slice(&info.type_guid);
471 let mut instance_guid =
472 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
473 instance_guid.value.copy_from_slice(&info.instance_guid);
474 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
475 name: Some(info.name.clone()),
476 type_guid: Some(type_guid),
477 instance_guid: Some(instance_guid),
478 start_block_offset: info.block_range.as_ref().map(|range| range.start),
479 num_blocks: info
480 .block_range
481 .as_ref()
482 .map(|range| range.end - range.start),
483 flags: Some(info.flags),
484 ..Default::default()
485 }))?;
486 }
487 }
488 Err(status) => responder.send(Err(status.into_raw()))?,
489 },
490 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
491 match self.session_manager.query_slices(&start_slices).await {
492 Ok(mut results) => {
493 let results_len = results.len();
494 assert!(results_len <= 16);
495 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
496 responder.send(
497 zx::sys::ZX_OK,
498 &results.try_into().unwrap(),
499 results_len as u64,
500 )?;
501 }
502 Err(s) => {
503 responder.send(
504 s.into_raw(),
505 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
506 0,
507 )?;
508 }
509 }
510 }
511 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
512 match self.session_manager.get_volume_info().await {
513 Ok((manager_info, volume_info)) => {
514 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
515 }
516 Err(s) => responder.send(s.into_raw(), None, None)?,
517 }
518 }
519 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
520 responder.send(
521 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
522 .into_raw(),
523 )?;
524 }
525 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
526 responder.send(
527 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
528 .into_raw(),
529 )?;
530 }
531 fvolume::VolumeRequest::Destroy { responder, .. } => {
532 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
533 }
534 }
535 Ok(None)
536 }
537
538 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
539 self.session_manager.get_info().await
540 }
541}
542
543struct SessionHelper<SM: SessionManager> {
544 session_manager: Arc<SM>,
545 offset_map: OffsetMap,
546 block_size: u32,
547 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
548 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
549}
550
551impl<SM: SessionManager> SessionHelper<SM> {
552 fn new(
553 session_manager: Arc<SM>,
554 offset_map: OffsetMap,
555 block_size: u32,
556 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
557 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
558 Ok((
559 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
560 fifo,
561 ))
562 }
563
564 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
565 match request {
566 fblock::SessionRequest::GetFifo { responder } => {
567 let rights = zx::Rights::TRANSFER
568 | zx::Rights::READ
569 | zx::Rights::WRITE
570 | zx::Rights::SIGNAL
571 | zx::Rights::WAIT;
572 match self.peer_fifo.duplicate_handle(rights) {
573 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
574 Err(s) => responder.send(Err(s.into_raw()))?,
575 }
576 Ok(())
577 }
578 fblock::SessionRequest::AttachVmo { vmo, responder } => {
579 let vmo = Arc::new(vmo);
580 let vmo_id = {
581 let mut vmos = self.vmos.lock();
582 if vmos.len() == u16::MAX as usize {
583 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
584 return Ok(());
585 } else {
586 let vmo_id = match vmos.last_entry() {
587 None => 1,
588 Some(o) => {
589 o.key().checked_add(1).unwrap_or_else(|| {
590 let mut vmo_id = 1;
591 for (&id, _) in &*vmos {
593 if id > vmo_id {
594 break;
595 }
596 vmo_id = id + 1;
597 }
598 vmo_id
599 })
600 }
601 };
602 vmos.insert(vmo_id, vmo.clone());
603 vmo_id
604 }
605 };
606 self.session_manager.clone().on_attach_vmo(&vmo).await?;
607 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
608 Ok(())
609 }
610 fblock::SessionRequest::Close { responder } => {
611 responder.send(Ok(()))?;
612 Err(anyhow!("Closed"))
613 }
614 }
615 }
616
617 fn decode_fifo_request(
619 &self,
620 session: SM::Session,
621 request: &BlockFifoRequest,
622 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
623 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
624 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
625 .ok_or(zx::Status::INVALID_ARGS)
626 .and_then(|code| {
627 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
628 if request.length == 0 {
629 return Err(zx::Status::INVALID_ARGS);
630 }
631 if request.dev_offset.checked_add(request.length as u64).is_none()
633 || (code != BlockOpcode::Trim
634 && (request.length as u64 * self.block_size as u64)
635 .checked_add(request.vmo_offset)
636 .is_none())
637 {
638 return Err(zx::Status::OUT_OF_RANGE);
639 }
640 }
641 Ok(match code {
642 BlockOpcode::Read => Operation::Read {
643 device_block_offset: request.dev_offset,
644 block_count: request.length,
645 _unused: 0,
646 vmo_offset: request
647 .vmo_offset
648 .checked_mul(self.block_size as u64)
649 .ok_or(zx::Status::OUT_OF_RANGE)?,
650 },
651 BlockOpcode::Write => {
652 let mut options = WriteOptions::empty();
653 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
654 options |= WriteOptions::FORCE_ACCESS;
655 }
656 if flags.contains(BlockIoFlag::PRE_BARRIER) {
657 options |= WriteOptions::PRE_BARRIER;
658 }
659 Operation::Write {
660 device_block_offset: request.dev_offset,
661 block_count: request.length,
662 options: options,
663 vmo_offset: request
664 .vmo_offset
665 .checked_mul(self.block_size as u64)
666 .ok_or(zx::Status::OUT_OF_RANGE)?,
667 }
668 }
669 BlockOpcode::Flush => Operation::Flush,
670 BlockOpcode::Trim => Operation::Trim {
671 device_block_offset: request.dev_offset,
672 block_count: request.length,
673 },
674 BlockOpcode::CloseVmo => Operation::CloseVmo,
675 })
676 });
677
678 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
679 GroupOrRequest::Group(request.group)
680 } else {
681 GroupOrRequest::Request(request.reqid)
682 };
683
684 let mut active_requests = self.session_manager.active_requests().0.lock();
685 let mut request_id = None;
686
687 if group_or_request.is_group() {
697 for (key, group) in &mut active_requests.requests {
701 if group.group_or_request == group_or_request {
702 if group.req_id.is_some() {
703 if group.status == zx::Status::OK {
705 group.status = zx::Status::INVALID_ARGS;
706 }
707 return Err(None);
709 }
710 if flags.contains(BlockIoFlag::GROUP_LAST) {
711 group.req_id = Some(request.reqid);
712 if group.status != zx::Status::OK {
715 operation = Err(group.status);
716 }
717 } else if group.status != zx::Status::OK {
718 return Err(None);
721 }
722 request_id = Some(RequestId(key));
723 group.count += 1;
724 break;
725 }
726 }
727 }
728
729 let mut retain_vmo = false;
730 let vmo = match &operation {
731 Ok(Operation::Read { .. } | Operation::Write { .. }) => {
732 self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
733 retain_vmo = true;
734 Ok(Some(vmo))
735 })
736 }
737 Ok(Operation::CloseVmo) => {
738 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
739 active_requests.vmo_bin.add(vmo.clone());
740 Ok(Some(vmo))
741 })
742 }
743 _ => Ok(None),
744 }
745 .unwrap_or_else(|e| {
746 operation = Err(e);
747 None
748 });
749
750 let trace_flow_id = NonZero::new(request.trace_flow_id);
751 let request_id = request_id.unwrap_or_else(|| {
752 let vmo_bin_key =
753 if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
754 RequestId(active_requests.requests.insert(ActiveRequest {
755 session,
756 group_or_request,
757 trace_flow_id,
758 vmo_bin_key,
759 status: zx::Status::OK,
760 count: 1,
761 req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
762 || flags.contains(BlockIoFlag::GROUP_LAST)
763 {
764 Some(request.reqid)
765 } else {
766 None
767 },
768 }))
769 });
770
771 Ok(DecodedRequest {
772 request_id,
773 trace_flow_id,
774 operation: operation.map_err(|status| {
775 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
776 })?,
777 vmo,
778 })
779 }
780
781 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
782 std::mem::take(&mut *self.vmos.lock())
783 }
784
785 fn map_request(
787 &self,
788 mut request: DecodedRequest,
789 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
790 let mut active_requests = self.session_manager.active_requests().0.lock();
791 let active_request = &mut active_requests.requests[request.request_id.0];
792 if active_request.status != zx::Status::OK {
793 return Err(active_requests
794 .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
795 .map(|(_, r)| r));
796 }
797 let mapping = self.offset_map.mapping();
798 match (mapping, request.operation.blocks()) {
799 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
800 return Err(active_requests
801 .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
802 .map(|(_, r)| r));
803 }
804 _ => {}
805 }
806 let remainder = request.operation.map(
807 self.offset_map.mapping(),
808 self.offset_map.max_transfer_blocks(),
809 self.block_size,
810 );
811 if remainder.is_some() {
812 active_request.count += 1;
813 }
814 static CACHE: AtomicU64 = AtomicU64::new(0);
815 if let Some(context) =
816 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
817 {
818 use fuchsia_trace::ArgValue;
819 let trace_args = [
820 ArgValue::of("request_id", request.request_id.0),
821 ArgValue::of("opcode", request.operation.trace_label()),
822 ];
823 let _scope = fuchsia_trace::duration(
824 c"storage",
825 c"block_server::start_transaction",
826 &trace_args,
827 );
828 if let Some(trace_flow_id) = active_request.trace_flow_id {
829 fuchsia_trace::flow_step(
830 &context,
831 c"block_server::start_transaction",
832 trace_flow_id.get().into(),
833 &[],
834 );
835 }
836 }
837 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
838 Ok((request, remainder))
839 }
840
841 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
842 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
843 }
844}
845
846#[repr(transparent)]
847#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
848pub struct RequestId(usize);
849
850#[derive(Clone, Debug)]
851struct DecodedRequest {
852 request_id: RequestId,
853 trace_flow_id: TraceFlowId,
854 operation: Operation,
855 vmo: Option<Arc<zx::Vmo>>,
856}
857
858pub type WriteOptions = block_protocol::WriteOptions;
860
861#[repr(C)]
862#[derive(Clone, Debug, PartialEq, Eq)]
863pub enum Operation {
864 Read {
869 device_block_offset: u64,
870 block_count: u32,
871 _unused: u32,
872 vmo_offset: u64,
873 },
874 Write {
875 device_block_offset: u64,
876 block_count: u32,
877 options: WriteOptions,
878 vmo_offset: u64,
879 },
880 Flush,
881 Trim {
882 device_block_offset: u64,
883 block_count: u32,
884 },
885 CloseVmo,
887}
888
889impl Operation {
890 fn trace_label(&self) -> &'static str {
891 match self {
892 Operation::Read { .. } => "read",
893 Operation::Write { .. } => "write",
894 Operation::Flush { .. } => "flush",
895 Operation::Trim { .. } => "trim",
896 Operation::CloseVmo { .. } => "close_vmo",
897 }
898 }
899
900 fn blocks(&self) -> Option<(u64, u32)> {
902 match self {
903 Operation::Read { device_block_offset, block_count, .. }
904 | Operation::Write { device_block_offset, block_count, .. }
905 | Operation::Trim { device_block_offset, block_count, .. } => {
906 Some((*device_block_offset, *block_count))
907 }
908 _ => None,
909 }
910 }
911
912 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
914 match self {
915 Operation::Read { device_block_offset, block_count, .. }
916 | Operation::Write { device_block_offset, block_count, .. }
917 | Operation::Trim { device_block_offset, block_count, .. } => {
918 Some((device_block_offset, block_count))
919 }
920 _ => None,
921 }
922 }
923
924 fn map(
927 &mut self,
928 mapping: Option<&BlockOffsetMapping>,
929 max_blocks: Option<NonZero<u32>>,
930 block_size: u32,
931 ) -> Option<Self> {
932 let mut max = match self {
933 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
934 _ => None,
935 };
936 let (offset, length) = self.blocks_mut()?;
937 let orig_offset = *offset;
938 if let Some(mapping) = mapping {
939 let delta = *offset - mapping.source_block_offset;
940 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
941 *offset = mapping.target_block_offset + delta;
942 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
943 max = match max {
944 None => Some(mapping_max),
945 Some(m) => Some(std::cmp::min(m, mapping_max)),
946 };
947 };
948 if let Some(max) = max {
949 if *length as u64 > max {
950 let rem = (*length as u64 - max) as u32;
951 *length = max as u32;
952 return Some(match self {
953 Operation::Read {
954 device_block_offset: _,
955 block_count: _,
956 vmo_offset,
957 _unused,
958 } => Operation::Read {
959 device_block_offset: orig_offset + max,
960 block_count: rem,
961 vmo_offset: *vmo_offset + max * block_size as u64,
962 _unused: *_unused,
963 },
964 Operation::Write {
965 device_block_offset: _,
966 block_count: _,
967 vmo_offset,
968 options,
969 } => {
970 let options = *options & !WriteOptions::PRE_BARRIER;
972 Operation::Write {
973 device_block_offset: orig_offset + max,
974 block_count: rem,
975 vmo_offset: *vmo_offset + max * block_size as u64,
976 options,
977 }
978 }
979 Operation::Trim { device_block_offset: _, block_count: _ } => {
980 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
981 }
982 _ => unreachable!(),
983 });
984 }
985 }
986 None
987 }
988}
989
990#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
991pub enum GroupOrRequest {
992 Group(u16),
993 Request(u32),
994}
995
996impl GroupOrRequest {
997 fn is_group(&self) -> bool {
998 matches!(self, Self::Group(_))
999 }
1000
1001 fn group_id(&self) -> Option<u16> {
1002 match self {
1003 Self::Group(id) => Some(*id),
1004 Self::Request(_) => None,
1005 }
1006 }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use super::{
1012 BlockOffsetMapping, BlockServer, DeviceInfo, Operation, PartitionInfo, TraceFlowId,
1013 FIFO_MAX_REQUESTS,
1014 };
1015 use assert_matches::assert_matches;
1016 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
1017 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1018 use fuchsia_sync::Mutex;
1019 use futures::channel::oneshot;
1020 use futures::future::BoxFuture;
1021 use futures::FutureExt as _;
1022 use std::borrow::Cow;
1023 use std::future::poll_fn;
1024 use std::num::NonZero;
1025 use std::pin::pin;
1026 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1027 use std::sync::Arc;
1028 use std::task::{Context, Poll};
1029 use zx::{AsHandleRef as _, HandleBased as _};
1030 use {
1031 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1032 fuchsia_async as fasync,
1033 };
1034
1035 #[derive(Default)]
1036 struct MockInterface {
1037 read_hook: Option<
1038 Box<
1039 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1040 + Send
1041 + Sync,
1042 >,
1043 >,
1044 write_hook:
1045 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1046 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1047 }
1048
1049 impl super::async_interface::Interface for MockInterface {
1050 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1051 Ok(())
1052 }
1053
1054 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1055 Ok(Cow::Owned(test_device_info()))
1056 }
1057
1058 async fn read(
1059 &self,
1060 device_block_offset: u64,
1061 block_count: u32,
1062 vmo: &Arc<zx::Vmo>,
1063 vmo_offset: u64,
1064 _trace_flow_id: TraceFlowId,
1065 ) -> Result<(), zx::Status> {
1066 if let Some(read_hook) = &self.read_hook {
1067 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1068 } else {
1069 unimplemented!();
1070 }
1071 }
1072
1073 async fn write(
1074 &self,
1075 device_block_offset: u64,
1076 _block_count: u32,
1077 _vmo: &Arc<zx::Vmo>,
1078 _vmo_offset: u64,
1079 _opts: WriteOptions,
1080 _trace_flow_id: TraceFlowId,
1081 ) -> Result<(), zx::Status> {
1082 if let Some(write_hook) = &self.write_hook {
1083 write_hook(device_block_offset).await
1084 } else {
1085 unimplemented!();
1086 }
1087 }
1088
1089 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1090 unreachable!();
1091 }
1092
1093 fn barrier(&self) -> Result<(), zx::Status> {
1094 if let Some(barrier_hook) = &self.barrier_hook {
1095 barrier_hook()
1096 } else {
1097 unimplemented!();
1098 }
1099 }
1100
1101 async fn trim(
1102 &self,
1103 _device_block_offset: u64,
1104 _block_count: u32,
1105 _trace_flow_id: TraceFlowId,
1106 ) -> Result<(), zx::Status> {
1107 unreachable!();
1108 }
1109
1110 async fn get_volume_info(
1111 &self,
1112 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1113 let () = std::future::pending().await;
1115 unreachable!();
1116 }
1117 }
1118
1119 const BLOCK_SIZE: u32 = 512;
1120 const MAX_TRANSFER_BLOCKS: u32 = 10;
1121
1122 fn test_device_info() -> DeviceInfo {
1123 DeviceInfo::Partition(PartitionInfo {
1124 device_flags: fblock::Flag::READONLY,
1125 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1126 block_range: Some(0..100),
1127 type_guid: [1; 16],
1128 instance_guid: [2; 16],
1129 name: "foo".to_string(),
1130 flags: 0xabcd,
1131 })
1132 }
1133
1134 #[fuchsia::test]
1135 async fn test_split_request_only_issues_single_barrier() {
1136 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1137 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1138 let barrier_called = Arc::new(AtomicU32::new(0));
1139 let barrier_called_clone = barrier_called.clone();
1140
1141 futures::join!(
1142 async move {
1143 let block_server = BlockServer::new(
1144 BLOCK_SIZE,
1145 Arc::new(MockInterface {
1146 barrier_hook: Some(Box::new(move || {
1147 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1148 Ok(())
1149 })),
1150 write_hook: Some(Box::new(move |_device_block_offset| {
1151 Box::pin(async move { Ok(()) })
1152 })),
1153 ..MockInterface::default()
1154 }),
1155 );
1156 block_server.handle_requests(stream).await.unwrap();
1157 },
1158 async move {
1159 let (session_proxy, server) = fidl::endpoints::create_proxy();
1160
1161 proxy.open_session(server).unwrap();
1162
1163 let vmo_id = session_proxy
1164 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1165 .await
1166 .unwrap()
1167 .unwrap();
1168 assert_ne!(vmo_id.id, 0);
1169
1170 let mut fifo =
1171 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1172 let (mut reader, mut writer) = fifo.async_io();
1173 writer
1176 .write_entries(&BlockFifoRequest {
1177 command: BlockFifoCommand {
1178 opcode: BlockOpcode::Write.into_primitive(),
1179 flags: BlockIoFlag::PRE_BARRIER.bits(),
1180 ..Default::default()
1181 },
1182 vmoid: vmo_id.id,
1183 dev_offset: 0,
1184 length: MAX_TRANSFER_BLOCKS + 1,
1185 vmo_offset: 6,
1186 ..Default::default()
1187 })
1188 .await
1189 .unwrap();
1190
1191 let mut response = BlockFifoResponse::default();
1192 reader.read_entries(&mut response).await.unwrap();
1193 assert_eq!(response.status, zx::sys::ZX_OK);
1194
1195 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1196
1197 std::mem::drop(proxy);
1198 }
1199 );
1200 }
1201
1202 #[fuchsia::test]
1203 async fn test_barriers_not_supported() {
1204 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1205 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1206
1207 futures::join!(
1208 async move {
1209 let block_server = BlockServer::new(
1210 BLOCK_SIZE,
1211 Arc::new(MockInterface {
1212 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1213 write_hook: Some(Box::new(move |_device_block_offset| {
1214 Box::pin(async move { Ok(()) })
1215 })),
1216 ..MockInterface::default()
1217 }),
1218 );
1219 block_server.handle_requests(stream).await.unwrap();
1220 },
1221 async move {
1222 let (session_proxy, server) = fidl::endpoints::create_proxy();
1223
1224 proxy.open_session(server).unwrap();
1225
1226 let vmo_id = session_proxy
1227 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1228 .await
1229 .unwrap()
1230 .unwrap();
1231 assert_ne!(vmo_id.id, 0);
1232
1233 let mut fifo =
1234 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1235 let (mut reader, mut writer) = fifo.async_io();
1236
1237 writer
1240 .write_entries(&BlockFifoRequest {
1241 command: BlockFifoCommand {
1242 opcode: BlockOpcode::Write.into_primitive(),
1243 flags: BlockIoFlag::PRE_BARRIER.bits(),
1244 ..Default::default()
1245 },
1246 vmoid: vmo_id.id,
1247 dev_offset: 0,
1248 length: MAX_TRANSFER_BLOCKS + 1,
1249 vmo_offset: 6,
1250 ..Default::default()
1251 })
1252 .await
1253 .unwrap();
1254
1255 let mut response = BlockFifoResponse::default();
1256 reader.read_entries(&mut response).await.unwrap();
1257 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1258
1259 std::mem::drop(proxy);
1260 }
1261 );
1262 }
1263
1264 #[fuchsia::test]
1265 async fn test_barriers_ordering() {
1266 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1267 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1268 let barrier_called = Arc::new(AtomicBool::new(false));
1269
1270 futures::join!(
1271 async move {
1272 let barrier_called_clone = barrier_called.clone();
1273 let block_server = BlockServer::new(
1274 BLOCK_SIZE,
1275 Arc::new(MockInterface {
1276 barrier_hook: Some(Box::new(move || {
1277 barrier_called.store(true, Ordering::Relaxed);
1278 Ok(())
1279 })),
1280 write_hook: Some(Box::new(move |device_block_offset| {
1281 let barrier_called = barrier_called_clone.clone();
1282 Box::pin(async move {
1283 if device_block_offset % 2 == 0 {
1285 fasync::Timer::new(fasync::MonotonicInstant::after(
1286 zx::MonotonicDuration::from_millis(200),
1287 ))
1288 .await;
1289 }
1290 assert!(barrier_called.load(Ordering::Relaxed));
1291 Ok(())
1292 })
1293 })),
1294 ..MockInterface::default()
1295 }),
1296 );
1297 block_server.handle_requests(stream).await.unwrap();
1298 },
1299 async move {
1300 let (session_proxy, server) = fidl::endpoints::create_proxy();
1301
1302 proxy.open_session(server).unwrap();
1303
1304 let vmo_id = session_proxy
1305 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1306 .await
1307 .unwrap()
1308 .unwrap();
1309 assert_ne!(vmo_id.id, 0);
1310
1311 let mut fifo =
1312 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1313 let (mut reader, mut writer) = fifo.async_io();
1314
1315 writer
1316 .write_entries(&BlockFifoRequest {
1317 command: BlockFifoCommand {
1318 opcode: BlockOpcode::Write.into_primitive(),
1319 flags: BlockIoFlag::PRE_BARRIER.bits(),
1320 ..Default::default()
1321 },
1322 vmoid: vmo_id.id,
1323 dev_offset: 0,
1324 length: 5,
1325 vmo_offset: 6,
1326 ..Default::default()
1327 })
1328 .await
1329 .unwrap();
1330
1331 for i in 0..10 {
1332 writer
1333 .write_entries(&BlockFifoRequest {
1334 command: BlockFifoCommand {
1335 opcode: BlockOpcode::Write.into_primitive(),
1336 ..Default::default()
1337 },
1338 vmoid: vmo_id.id,
1339 dev_offset: i + 1,
1340 length: 5,
1341 vmo_offset: 6,
1342 ..Default::default()
1343 })
1344 .await
1345 .unwrap();
1346 }
1347 for _ in 0..11 {
1348 let mut response = BlockFifoResponse::default();
1349 reader.read_entries(&mut response).await.unwrap();
1350 assert_eq!(response.status, zx::sys::ZX_OK);
1351 }
1352
1353 std::mem::drop(proxy);
1354 }
1355 );
1356 }
1357
1358 #[fuchsia::test]
1359 async fn test_info() {
1360 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1361
1362 futures::join!(
1363 async {
1364 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1365 block_server.handle_requests(stream).await.unwrap();
1366 },
1367 async {
1368 let expected_info = test_device_info();
1369 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1370 info
1371 } else {
1372 unreachable!()
1373 };
1374
1375 let block_info = proxy.get_info().await.unwrap().unwrap();
1376 assert_eq!(
1377 block_info.block_count,
1378 partition_info.block_range.as_ref().unwrap().end
1379 - partition_info.block_range.as_ref().unwrap().start
1380 );
1381 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1382
1383 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1384
1385 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1386 assert_eq!(status, zx::sys::ZX_OK);
1387 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1388
1389 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1390 assert_eq!(status, zx::sys::ZX_OK);
1391 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1392
1393 let (status, name) = proxy.get_name().await.unwrap();
1394 assert_eq!(status, zx::sys::ZX_OK);
1395 assert_eq!(name.as_ref(), Some(&partition_info.name));
1396
1397 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1398 assert_eq!(metadata.name, name);
1399 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1400 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1401 assert_eq!(
1402 metadata.start_block_offset,
1403 Some(partition_info.block_range.as_ref().unwrap().start)
1404 );
1405 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1406 assert_eq!(metadata.flags, Some(partition_info.flags));
1407
1408 std::mem::drop(proxy);
1409 }
1410 );
1411 }
1412
1413 #[fuchsia::test]
1414 async fn test_attach_vmo() {
1415 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1416
1417 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1418 let koid = vmo.get_koid().unwrap();
1419
1420 futures::join!(
1421 async {
1422 let block_server = BlockServer::new(
1423 BLOCK_SIZE,
1424 Arc::new(MockInterface {
1425 read_hook: Some(Box::new(move |_, _, vmo, _| {
1426 assert_eq!(vmo.get_koid().unwrap(), koid);
1427 Box::pin(async { Ok(()) })
1428 })),
1429 ..MockInterface::default()
1430 }),
1431 );
1432 block_server.handle_requests(stream).await.unwrap();
1433 },
1434 async move {
1435 let (session_proxy, server) = fidl::endpoints::create_proxy();
1436
1437 proxy.open_session(server).unwrap();
1438
1439 let vmo_id = session_proxy
1440 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1441 .await
1442 .unwrap()
1443 .unwrap();
1444 assert_ne!(vmo_id.id, 0);
1445
1446 let mut fifo =
1447 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1448 let (mut reader, mut writer) = fifo.async_io();
1449
1450 let mut count = 1;
1452 loop {
1453 match session_proxy
1454 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1455 .await
1456 .unwrap()
1457 {
1458 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1459 Err(e) => {
1460 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1461 break;
1462 }
1463 }
1464
1465 if count % 10 == 0 {
1467 writer
1468 .write_entries(&BlockFifoRequest {
1469 command: BlockFifoCommand {
1470 opcode: BlockOpcode::Read.into_primitive(),
1471 ..Default::default()
1472 },
1473 vmoid: vmo_id.id,
1474 length: 1,
1475 ..Default::default()
1476 })
1477 .await
1478 .unwrap();
1479
1480 let mut response = BlockFifoResponse::default();
1481 reader.read_entries(&mut response).await.unwrap();
1482 assert_eq!(response.status, zx::sys::ZX_OK);
1483 }
1484
1485 count += 1;
1486 }
1487
1488 assert_eq!(count, u16::MAX as u64);
1489
1490 writer
1492 .write_entries(&BlockFifoRequest {
1493 command: BlockFifoCommand {
1494 opcode: BlockOpcode::CloseVmo.into_primitive(),
1495 ..Default::default()
1496 },
1497 vmoid: vmo_id.id,
1498 ..Default::default()
1499 })
1500 .await
1501 .unwrap();
1502
1503 let mut response = BlockFifoResponse::default();
1504 reader.read_entries(&mut response).await.unwrap();
1505 assert_eq!(response.status, zx::sys::ZX_OK);
1506
1507 let new_vmo_id = session_proxy
1508 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1509 .await
1510 .unwrap()
1511 .unwrap();
1512 assert_eq!(new_vmo_id.id, vmo_id.id);
1514
1515 std::mem::drop(proxy);
1516 }
1517 );
1518 }
1519
1520 #[fuchsia::test]
1521 async fn test_close() {
1522 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1523
1524 let mut server = std::pin::pin!(async {
1525 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1526 block_server.handle_requests(stream).await.unwrap();
1527 }
1528 .fuse());
1529
1530 let mut client = std::pin::pin!(async {
1531 let (session_proxy, server) = fidl::endpoints::create_proxy();
1532
1533 proxy.open_session(server).unwrap();
1534
1535 std::mem::drop(proxy);
1538
1539 session_proxy.close().await.unwrap().unwrap();
1540
1541 let _: () = std::future::pending().await;
1543 }
1544 .fuse());
1545
1546 futures::select!(
1547 _ = server => {}
1548 _ = client => unreachable!(),
1549 );
1550 }
1551
1552 #[derive(Default)]
1553 struct IoMockInterface {
1554 do_checks: bool,
1555 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1556 return_errors: bool,
1557 }
1558
1559 #[derive(Debug)]
1560 enum ExpectedOp {
1561 Read(u64, u32, u64),
1562 Write(u64, u32, u64),
1563 Trim(u64, u32),
1564 Flush,
1565 }
1566
1567 impl super::async_interface::Interface for IoMockInterface {
1568 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1569 Ok(())
1570 }
1571
1572 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1573 Ok(Cow::Owned(test_device_info()))
1574 }
1575
1576 async fn read(
1577 &self,
1578 device_block_offset: u64,
1579 block_count: u32,
1580 _vmo: &Arc<zx::Vmo>,
1581 vmo_offset: u64,
1582 _trace_flow_id: TraceFlowId,
1583 ) -> Result<(), zx::Status> {
1584 if self.return_errors {
1585 Err(zx::Status::INTERNAL)
1586 } else {
1587 if self.do_checks {
1588 assert_matches!(
1589 self.expected_op.lock().take(),
1590 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1591 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1592 "Read {device_block_offset} {block_count} {vmo_offset}"
1593 );
1594 }
1595 Ok(())
1596 }
1597 }
1598
1599 async fn write(
1600 &self,
1601 device_block_offset: u64,
1602 block_count: u32,
1603 _vmo: &Arc<zx::Vmo>,
1604 vmo_offset: u64,
1605 _opts: WriteOptions,
1606 _trace_flow_id: TraceFlowId,
1607 ) -> Result<(), zx::Status> {
1608 if self.return_errors {
1609 Err(zx::Status::NOT_SUPPORTED)
1610 } else {
1611 if self.do_checks {
1612 assert_matches!(
1613 self.expected_op.lock().take(),
1614 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1615 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1616 "Write {device_block_offset} {block_count} {vmo_offset}"
1617 );
1618 }
1619 Ok(())
1620 }
1621 }
1622
1623 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1624 if self.return_errors {
1625 Err(zx::Status::NO_RESOURCES)
1626 } else {
1627 if self.do_checks {
1628 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1629 }
1630 Ok(())
1631 }
1632 }
1633
1634 fn barrier(&self) -> Result<(), zx::Status> {
1635 unreachable!()
1636 }
1637
1638 async fn trim(
1639 &self,
1640 device_block_offset: u64,
1641 block_count: u32,
1642 _trace_flow_id: TraceFlowId,
1643 ) -> Result<(), zx::Status> {
1644 if self.return_errors {
1645 Err(zx::Status::NO_MEMORY)
1646 } else {
1647 if self.do_checks {
1648 assert_matches!(
1649 self.expected_op.lock().take(),
1650 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1651 block_count == b,
1652 "Trim {device_block_offset} {block_count}"
1653 );
1654 }
1655 Ok(())
1656 }
1657 }
1658 }
1659
1660 #[fuchsia::test]
1661 async fn test_io() {
1662 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1663
1664 let expected_op = Arc::new(Mutex::new(None));
1665 let expected_op_clone = expected_op.clone();
1666
1667 let server = async {
1668 let block_server = BlockServer::new(
1669 BLOCK_SIZE,
1670 Arc::new(IoMockInterface {
1671 return_errors: false,
1672 do_checks: true,
1673 expected_op: expected_op_clone,
1674 }),
1675 );
1676 block_server.handle_requests(stream).await.unwrap();
1677 };
1678
1679 let client = async move {
1680 let (session_proxy, server) = fidl::endpoints::create_proxy();
1681
1682 proxy.open_session(server).unwrap();
1683
1684 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1685 let vmo_id = session_proxy
1686 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1687 .await
1688 .unwrap()
1689 .unwrap();
1690
1691 let mut fifo =
1692 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1693 let (mut reader, mut writer) = fifo.async_io();
1694
1695 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1697 writer
1698 .write_entries(&BlockFifoRequest {
1699 command: BlockFifoCommand {
1700 opcode: BlockOpcode::Read.into_primitive(),
1701 ..Default::default()
1702 },
1703 vmoid: vmo_id.id,
1704 dev_offset: 1,
1705 length: 2,
1706 vmo_offset: 3,
1707 ..Default::default()
1708 })
1709 .await
1710 .unwrap();
1711
1712 let mut response = BlockFifoResponse::default();
1713 reader.read_entries(&mut response).await.unwrap();
1714 assert_eq!(response.status, zx::sys::ZX_OK);
1715
1716 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1718 writer
1719 .write_entries(&BlockFifoRequest {
1720 command: BlockFifoCommand {
1721 opcode: BlockOpcode::Write.into_primitive(),
1722 ..Default::default()
1723 },
1724 vmoid: vmo_id.id,
1725 dev_offset: 4,
1726 length: 5,
1727 vmo_offset: 6,
1728 ..Default::default()
1729 })
1730 .await
1731 .unwrap();
1732
1733 let mut response = BlockFifoResponse::default();
1734 reader.read_entries(&mut response).await.unwrap();
1735 assert_eq!(response.status, zx::sys::ZX_OK);
1736
1737 *expected_op.lock() = Some(ExpectedOp::Flush);
1739 writer
1740 .write_entries(&BlockFifoRequest {
1741 command: BlockFifoCommand {
1742 opcode: BlockOpcode::Flush.into_primitive(),
1743 ..Default::default()
1744 },
1745 ..Default::default()
1746 })
1747 .await
1748 .unwrap();
1749
1750 reader.read_entries(&mut response).await.unwrap();
1751 assert_eq!(response.status, zx::sys::ZX_OK);
1752
1753 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1755 writer
1756 .write_entries(&BlockFifoRequest {
1757 command: BlockFifoCommand {
1758 opcode: BlockOpcode::Trim.into_primitive(),
1759 ..Default::default()
1760 },
1761 dev_offset: 7,
1762 length: 8,
1763 ..Default::default()
1764 })
1765 .await
1766 .unwrap();
1767
1768 reader.read_entries(&mut response).await.unwrap();
1769 assert_eq!(response.status, zx::sys::ZX_OK);
1770
1771 std::mem::drop(proxy);
1772 };
1773
1774 futures::join!(server, client);
1775 }
1776
1777 #[fuchsia::test]
1778 async fn test_io_errors() {
1779 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1780
1781 futures::join!(
1782 async {
1783 let block_server = BlockServer::new(
1784 BLOCK_SIZE,
1785 Arc::new(IoMockInterface {
1786 return_errors: true,
1787 do_checks: false,
1788 expected_op: Arc::new(Mutex::new(None)),
1789 }),
1790 );
1791 block_server.handle_requests(stream).await.unwrap();
1792 },
1793 async move {
1794 let (session_proxy, server) = fidl::endpoints::create_proxy();
1795
1796 proxy.open_session(server).unwrap();
1797
1798 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1799 let vmo_id = session_proxy
1800 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1801 .await
1802 .unwrap()
1803 .unwrap();
1804
1805 let mut fifo =
1806 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1807 let (mut reader, mut writer) = fifo.async_io();
1808
1809 writer
1811 .write_entries(&BlockFifoRequest {
1812 command: BlockFifoCommand {
1813 opcode: BlockOpcode::Read.into_primitive(),
1814 ..Default::default()
1815 },
1816 vmoid: vmo_id.id,
1817 length: 1,
1818 reqid: 1,
1819 ..Default::default()
1820 })
1821 .await
1822 .unwrap();
1823
1824 let mut response = BlockFifoResponse::default();
1825 reader.read_entries(&mut response).await.unwrap();
1826 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1827
1828 writer
1830 .write_entries(&BlockFifoRequest {
1831 command: BlockFifoCommand {
1832 opcode: BlockOpcode::Write.into_primitive(),
1833 ..Default::default()
1834 },
1835 vmoid: vmo_id.id,
1836 length: 1,
1837 reqid: 2,
1838 ..Default::default()
1839 })
1840 .await
1841 .unwrap();
1842
1843 reader.read_entries(&mut response).await.unwrap();
1844 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1845
1846 writer
1848 .write_entries(&BlockFifoRequest {
1849 command: BlockFifoCommand {
1850 opcode: BlockOpcode::Flush.into_primitive(),
1851 ..Default::default()
1852 },
1853 reqid: 3,
1854 ..Default::default()
1855 })
1856 .await
1857 .unwrap();
1858
1859 reader.read_entries(&mut response).await.unwrap();
1860 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1861
1862 writer
1864 .write_entries(&BlockFifoRequest {
1865 command: BlockFifoCommand {
1866 opcode: BlockOpcode::Trim.into_primitive(),
1867 ..Default::default()
1868 },
1869 reqid: 4,
1870 length: 1,
1871 ..Default::default()
1872 })
1873 .await
1874 .unwrap();
1875
1876 reader.read_entries(&mut response).await.unwrap();
1877 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1878
1879 std::mem::drop(proxy);
1880 }
1881 );
1882 }
1883
1884 #[fuchsia::test]
1885 async fn test_invalid_args() {
1886 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1887
1888 futures::join!(
1889 async {
1890 let block_server = BlockServer::new(
1891 BLOCK_SIZE,
1892 Arc::new(IoMockInterface {
1893 return_errors: false,
1894 do_checks: false,
1895 expected_op: Arc::new(Mutex::new(None)),
1896 }),
1897 );
1898 block_server.handle_requests(stream).await.unwrap();
1899 },
1900 async move {
1901 let (session_proxy, server) = fidl::endpoints::create_proxy();
1902
1903 proxy.open_session(server).unwrap();
1904
1905 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1906 let vmo_id = session_proxy
1907 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1908 .await
1909 .unwrap()
1910 .unwrap();
1911
1912 let mut fifo =
1913 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1914
1915 async fn test(
1916 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1917 request: BlockFifoRequest,
1918 ) -> Result<(), zx::Status> {
1919 let (mut reader, mut writer) = fifo.async_io();
1920 writer.write_entries(&request).await.unwrap();
1921 let mut response = BlockFifoResponse::default();
1922 reader.read_entries(&mut response).await.unwrap();
1923 zx::Status::ok(response.status)
1924 }
1925
1926 let good_read_request = || BlockFifoRequest {
1929 command: BlockFifoCommand {
1930 opcode: BlockOpcode::Read.into_primitive(),
1931 ..Default::default()
1932 },
1933 length: 1,
1934 vmoid: vmo_id.id,
1935 ..Default::default()
1936 };
1937
1938 assert_eq!(
1939 test(
1940 &mut fifo,
1941 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1942 )
1943 .await,
1944 Err(zx::Status::IO)
1945 );
1946
1947 assert_eq!(
1948 test(
1949 &mut fifo,
1950 BlockFifoRequest {
1951 vmo_offset: 0xffff_ffff_ffff_ffff,
1952 ..good_read_request()
1953 }
1954 )
1955 .await,
1956 Err(zx::Status::OUT_OF_RANGE)
1957 );
1958
1959 assert_eq!(
1960 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1961 Err(zx::Status::INVALID_ARGS)
1962 );
1963
1964 let good_write_request = || BlockFifoRequest {
1967 command: BlockFifoCommand {
1968 opcode: BlockOpcode::Write.into_primitive(),
1969 ..Default::default()
1970 },
1971 length: 1,
1972 vmoid: vmo_id.id,
1973 ..Default::default()
1974 };
1975
1976 assert_eq!(
1977 test(
1978 &mut fifo,
1979 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1980 )
1981 .await,
1982 Err(zx::Status::IO)
1983 );
1984
1985 assert_eq!(
1986 test(
1987 &mut fifo,
1988 BlockFifoRequest {
1989 vmo_offset: 0xffff_ffff_ffff_ffff,
1990 ..good_write_request()
1991 }
1992 )
1993 .await,
1994 Err(zx::Status::OUT_OF_RANGE)
1995 );
1996
1997 assert_eq!(
1998 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
1999 Err(zx::Status::INVALID_ARGS)
2000 );
2001
2002 assert_eq!(
2005 test(
2006 &mut fifo,
2007 BlockFifoRequest {
2008 command: BlockFifoCommand {
2009 opcode: BlockOpcode::CloseVmo.into_primitive(),
2010 ..Default::default()
2011 },
2012 vmoid: vmo_id.id + 1,
2013 ..Default::default()
2014 }
2015 )
2016 .await,
2017 Err(zx::Status::IO)
2018 );
2019
2020 std::mem::drop(proxy);
2021 }
2022 );
2023 }
2024
2025 #[fuchsia::test]
2026 async fn test_concurrent_requests() {
2027 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2028
2029 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2030 let waiting_readers_clone = waiting_readers.clone();
2031
2032 futures::join!(
2033 async move {
2034 let block_server = BlockServer::new(
2035 BLOCK_SIZE,
2036 Arc::new(MockInterface {
2037 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2038 let (tx, rx) = oneshot::channel();
2039 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2040 Box::pin(async move {
2041 let _ = rx.await;
2042 Ok(())
2043 })
2044 })),
2045 ..MockInterface::default()
2046 }),
2047 );
2048 block_server.handle_requests(stream).await.unwrap();
2049 },
2050 async move {
2051 let (session_proxy, server) = fidl::endpoints::create_proxy();
2052
2053 proxy.open_session(server).unwrap();
2054
2055 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2056 let vmo_id = session_proxy
2057 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2058 .await
2059 .unwrap()
2060 .unwrap();
2061
2062 let mut fifo =
2063 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2064 let (mut reader, mut writer) = fifo.async_io();
2065
2066 writer
2067 .write_entries(&BlockFifoRequest {
2068 command: BlockFifoCommand {
2069 opcode: BlockOpcode::Read.into_primitive(),
2070 ..Default::default()
2071 },
2072 reqid: 1,
2073 dev_offset: 1, vmoid: vmo_id.id,
2075 length: 1,
2076 ..Default::default()
2077 })
2078 .await
2079 .unwrap();
2080
2081 writer
2082 .write_entries(&BlockFifoRequest {
2083 command: BlockFifoCommand {
2084 opcode: BlockOpcode::Read.into_primitive(),
2085 ..Default::default()
2086 },
2087 reqid: 2,
2088 dev_offset: 2,
2089 vmoid: vmo_id.id,
2090 length: 1,
2091 ..Default::default()
2092 })
2093 .await
2094 .unwrap();
2095
2096 poll_fn(|cx: &mut Context<'_>| {
2098 if waiting_readers.lock().len() == 2 {
2099 Poll::Ready(())
2100 } else {
2101 cx.waker().wake_by_ref();
2103 Poll::Pending
2104 }
2105 })
2106 .await;
2107
2108 let mut response = BlockFifoResponse::default();
2109 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2110
2111 let (id, tx) = waiting_readers.lock().pop().unwrap();
2112 tx.send(()).unwrap();
2113
2114 reader.read_entries(&mut response).await.unwrap();
2115 assert_eq!(response.status, zx::sys::ZX_OK);
2116 assert_eq!(response.reqid, id);
2117
2118 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2119
2120 let (id, tx) = waiting_readers.lock().pop().unwrap();
2121 tx.send(()).unwrap();
2122
2123 reader.read_entries(&mut response).await.unwrap();
2124 assert_eq!(response.status, zx::sys::ZX_OK);
2125 assert_eq!(response.reqid, id);
2126 }
2127 );
2128 }
2129
2130 #[fuchsia::test]
2131 async fn test_groups() {
2132 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2133
2134 futures::join!(
2135 async move {
2136 let block_server = BlockServer::new(
2137 BLOCK_SIZE,
2138 Arc::new(MockInterface {
2139 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2140 ..MockInterface::default()
2141 }),
2142 );
2143 block_server.handle_requests(stream).await.unwrap();
2144 },
2145 async move {
2146 let (session_proxy, server) = fidl::endpoints::create_proxy();
2147
2148 proxy.open_session(server).unwrap();
2149
2150 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2151 let vmo_id = session_proxy
2152 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2153 .await
2154 .unwrap()
2155 .unwrap();
2156
2157 let mut fifo =
2158 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2159 let (mut reader, mut writer) = fifo.async_io();
2160
2161 writer
2162 .write_entries(&BlockFifoRequest {
2163 command: BlockFifoCommand {
2164 opcode: BlockOpcode::Read.into_primitive(),
2165 flags: BlockIoFlag::GROUP_ITEM.bits(),
2166 ..Default::default()
2167 },
2168 group: 1,
2169 vmoid: vmo_id.id,
2170 length: 1,
2171 ..Default::default()
2172 })
2173 .await
2174 .unwrap();
2175
2176 writer
2177 .write_entries(&BlockFifoRequest {
2178 command: BlockFifoCommand {
2179 opcode: BlockOpcode::Read.into_primitive(),
2180 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2181 ..Default::default()
2182 },
2183 reqid: 2,
2184 group: 1,
2185 vmoid: vmo_id.id,
2186 length: 1,
2187 ..Default::default()
2188 })
2189 .await
2190 .unwrap();
2191
2192 let mut response = BlockFifoResponse::default();
2193 reader.read_entries(&mut response).await.unwrap();
2194 assert_eq!(response.status, zx::sys::ZX_OK);
2195 assert_eq!(response.reqid, 2);
2196 assert_eq!(response.group, 1);
2197 }
2198 );
2199 }
2200
2201 #[fuchsia::test]
2202 async fn test_group_error() {
2203 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2204
2205 let counter = Arc::new(AtomicU64::new(0));
2206 let counter_clone = counter.clone();
2207
2208 futures::join!(
2209 async move {
2210 let block_server = BlockServer::new(
2211 BLOCK_SIZE,
2212 Arc::new(MockInterface {
2213 read_hook: Some(Box::new(move |_, _, _, _| {
2214 counter_clone.fetch_add(1, Ordering::Relaxed);
2215 Box::pin(async { Err(zx::Status::BAD_STATE) })
2216 })),
2217 ..MockInterface::default()
2218 }),
2219 );
2220 block_server.handle_requests(stream).await.unwrap();
2221 },
2222 async move {
2223 let (session_proxy, server) = fidl::endpoints::create_proxy();
2224
2225 proxy.open_session(server).unwrap();
2226
2227 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2228 let vmo_id = session_proxy
2229 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2230 .await
2231 .unwrap()
2232 .unwrap();
2233
2234 let mut fifo =
2235 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2236 let (mut reader, mut writer) = fifo.async_io();
2237
2238 writer
2239 .write_entries(&BlockFifoRequest {
2240 command: BlockFifoCommand {
2241 opcode: BlockOpcode::Read.into_primitive(),
2242 flags: BlockIoFlag::GROUP_ITEM.bits(),
2243 ..Default::default()
2244 },
2245 group: 1,
2246 vmoid: vmo_id.id,
2247 length: 1,
2248 ..Default::default()
2249 })
2250 .await
2251 .unwrap();
2252
2253 poll_fn(|cx: &mut Context<'_>| {
2255 if counter.load(Ordering::Relaxed) == 1 {
2256 Poll::Ready(())
2257 } else {
2258 cx.waker().wake_by_ref();
2260 Poll::Pending
2261 }
2262 })
2263 .await;
2264
2265 let mut response = BlockFifoResponse::default();
2266 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2267
2268 writer
2269 .write_entries(&BlockFifoRequest {
2270 command: BlockFifoCommand {
2271 opcode: BlockOpcode::Read.into_primitive(),
2272 flags: BlockIoFlag::GROUP_ITEM.bits(),
2273 ..Default::default()
2274 },
2275 group: 1,
2276 vmoid: vmo_id.id,
2277 length: 1,
2278 ..Default::default()
2279 })
2280 .await
2281 .unwrap();
2282
2283 writer
2284 .write_entries(&BlockFifoRequest {
2285 command: BlockFifoCommand {
2286 opcode: BlockOpcode::Read.into_primitive(),
2287 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2288 ..Default::default()
2289 },
2290 reqid: 2,
2291 group: 1,
2292 vmoid: vmo_id.id,
2293 length: 1,
2294 ..Default::default()
2295 })
2296 .await
2297 .unwrap();
2298
2299 reader.read_entries(&mut response).await.unwrap();
2300 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2301 assert_eq!(response.reqid, 2);
2302 assert_eq!(response.group, 1);
2303
2304 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2305
2306 assert_eq!(counter.load(Ordering::Relaxed), 1);
2308 }
2309 );
2310 }
2311
2312 #[fuchsia::test]
2313 async fn test_group_with_two_lasts() {
2314 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2315
2316 let (tx, rx) = oneshot::channel();
2317
2318 futures::join!(
2319 async move {
2320 let rx = Mutex::new(Some(rx));
2321 let block_server = BlockServer::new(
2322 BLOCK_SIZE,
2323 Arc::new(MockInterface {
2324 read_hook: Some(Box::new(move |_, _, _, _| {
2325 let rx = rx.lock().take().unwrap();
2326 Box::pin(async {
2327 let _ = rx.await;
2328 Ok(())
2329 })
2330 })),
2331 ..MockInterface::default()
2332 }),
2333 );
2334 block_server.handle_requests(stream).await.unwrap();
2335 },
2336 async move {
2337 let (session_proxy, server) = fidl::endpoints::create_proxy();
2338
2339 proxy.open_session(server).unwrap();
2340
2341 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2342 let vmo_id = session_proxy
2343 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2344 .await
2345 .unwrap()
2346 .unwrap();
2347
2348 let mut fifo =
2349 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2350 let (mut reader, mut writer) = fifo.async_io();
2351
2352 writer
2353 .write_entries(&BlockFifoRequest {
2354 command: BlockFifoCommand {
2355 opcode: BlockOpcode::Read.into_primitive(),
2356 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2357 ..Default::default()
2358 },
2359 reqid: 1,
2360 group: 1,
2361 vmoid: vmo_id.id,
2362 length: 1,
2363 ..Default::default()
2364 })
2365 .await
2366 .unwrap();
2367
2368 writer
2369 .write_entries(&BlockFifoRequest {
2370 command: BlockFifoCommand {
2371 opcode: BlockOpcode::Read.into_primitive(),
2372 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2373 ..Default::default()
2374 },
2375 reqid: 2,
2376 group: 1,
2377 vmoid: vmo_id.id,
2378 length: 1,
2379 ..Default::default()
2380 })
2381 .await
2382 .unwrap();
2383
2384 writer
2386 .write_entries(&BlockFifoRequest {
2387 command: BlockFifoCommand {
2388 opcode: BlockOpcode::CloseVmo.into_primitive(),
2389 ..Default::default()
2390 },
2391 reqid: 3,
2392 vmoid: vmo_id.id,
2393 ..Default::default()
2394 })
2395 .await
2396 .unwrap();
2397
2398 let mut response = BlockFifoResponse::default();
2400 reader.read_entries(&mut response).await.unwrap();
2401 assert_eq!(response.status, zx::sys::ZX_OK);
2402 assert_eq!(response.reqid, 3);
2403
2404 tx.send(()).unwrap();
2406
2407 let mut response = BlockFifoResponse::default();
2410 reader.read_entries(&mut response).await.unwrap();
2411 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2412 assert_eq!(response.reqid, 1);
2413 assert_eq!(response.group, 1);
2414 }
2415 );
2416 }
2417
2418 #[fuchsia::test(allow_stalls = false)]
2419 async fn test_requests_dont_block_sessions() {
2420 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2421
2422 let (tx, rx) = oneshot::channel();
2423
2424 fasync::Task::local(async move {
2425 let rx = Mutex::new(Some(rx));
2426 let block_server = BlockServer::new(
2427 BLOCK_SIZE,
2428 Arc::new(MockInterface {
2429 read_hook: Some(Box::new(move |_, _, _, _| {
2430 let rx = rx.lock().take().unwrap();
2431 Box::pin(async {
2432 let _ = rx.await;
2433 Ok(())
2434 })
2435 })),
2436 ..MockInterface::default()
2437 }),
2438 );
2439 block_server.handle_requests(stream).await.unwrap();
2440 })
2441 .detach();
2442
2443 let mut fut = pin!(async {
2444 let (session_proxy, server) = fidl::endpoints::create_proxy();
2445
2446 proxy.open_session(server).unwrap();
2447
2448 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2449 let vmo_id = session_proxy
2450 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2451 .await
2452 .unwrap()
2453 .unwrap();
2454
2455 let mut fifo =
2456 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2457 let (mut reader, mut writer) = fifo.async_io();
2458
2459 writer
2460 .write_entries(&BlockFifoRequest {
2461 command: BlockFifoCommand {
2462 opcode: BlockOpcode::Read.into_primitive(),
2463 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2464 ..Default::default()
2465 },
2466 reqid: 1,
2467 group: 1,
2468 vmoid: vmo_id.id,
2469 length: 1,
2470 ..Default::default()
2471 })
2472 .await
2473 .unwrap();
2474
2475 let mut response = BlockFifoResponse::default();
2476 reader.read_entries(&mut response).await.unwrap();
2477 assert_eq!(response.status, zx::sys::ZX_OK);
2478 });
2479
2480 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2482
2483 let mut fut2 = pin!(proxy.get_volume_info());
2484
2485 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2487
2488 let _ = tx.send(());
2491
2492 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2493 }
2494
2495 #[fuchsia::test]
2496 async fn test_request_flow_control() {
2497 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2498
2499 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2502 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2503 let event_clone = event.clone();
2504 futures::join!(
2505 async move {
2506 let block_server = BlockServer::new(
2507 BLOCK_SIZE,
2508 Arc::new(MockInterface {
2509 read_hook: Some(Box::new(move |_, _, _, _| {
2510 let event_clone = event_clone.clone();
2511 Box::pin(async move {
2512 if !event_clone.1.load(Ordering::SeqCst) {
2513 event_clone.0.listen().await;
2514 }
2515 Ok(())
2516 })
2517 })),
2518 ..MockInterface::default()
2519 }),
2520 );
2521 block_server.handle_requests(stream).await.unwrap();
2522 },
2523 async move {
2524 let (session_proxy, server) = fidl::endpoints::create_proxy();
2525
2526 proxy.open_session(server).unwrap();
2527
2528 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2529 let vmo_id = session_proxy
2530 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2531 .await
2532 .unwrap()
2533 .unwrap();
2534
2535 let mut fifo =
2536 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2537 let (mut reader, mut writer) = fifo.async_io();
2538
2539 for i in 0..MAX_REQUESTS {
2540 writer
2541 .write_entries(&BlockFifoRequest {
2542 command: BlockFifoCommand {
2543 opcode: BlockOpcode::Read.into_primitive(),
2544 ..Default::default()
2545 },
2546 reqid: (i + 1) as u32,
2547 dev_offset: i,
2548 vmoid: vmo_id.id,
2549 length: 1,
2550 ..Default::default()
2551 })
2552 .await
2553 .unwrap();
2554 }
2555 assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2556 command: BlockFifoCommand {
2557 opcode: BlockOpcode::Read.into_primitive(),
2558 ..Default::default()
2559 },
2560 reqid: u32::MAX,
2561 dev_offset: MAX_REQUESTS,
2562 vmoid: vmo_id.id,
2563 length: 1,
2564 ..Default::default()
2565 })))
2566 .is_pending());
2567 event.1.store(true, Ordering::SeqCst);
2569 event.0.notify(usize::MAX);
2570 let mut finished_reqids = vec![];
2572 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2573 let mut response = BlockFifoResponse::default();
2574 reader.read_entries(&mut response).await.unwrap();
2575 assert_eq!(response.status, zx::sys::ZX_OK);
2576 finished_reqids.push(response.reqid);
2577 writer
2578 .write_entries(&BlockFifoRequest {
2579 command: BlockFifoCommand {
2580 opcode: BlockOpcode::Read.into_primitive(),
2581 ..Default::default()
2582 },
2583 reqid: (i + 1) as u32,
2584 dev_offset: i,
2585 vmoid: vmo_id.id,
2586 length: 1,
2587 ..Default::default()
2588 })
2589 .await
2590 .unwrap();
2591 }
2592 let mut response = BlockFifoResponse::default();
2593 for _ in 0..MAX_REQUESTS {
2594 reader.read_entries(&mut response).await.unwrap();
2595 assert_eq!(response.status, zx::sys::ZX_OK);
2596 finished_reqids.push(response.reqid);
2597 }
2598 finished_reqids.sort();
2601 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2602 let mut i = 1;
2603 for reqid in finished_reqids {
2604 assert_eq!(reqid, i);
2605 i += 1;
2606 }
2607 }
2608 );
2609 }
2610
2611 #[fuchsia::test]
2612 async fn test_passthrough_io_with_fixed_map() {
2613 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2614
2615 let expected_op = Arc::new(Mutex::new(None));
2616 let expected_op_clone = expected_op.clone();
2617 futures::join!(
2618 async {
2619 let block_server = BlockServer::new(
2620 BLOCK_SIZE,
2621 Arc::new(IoMockInterface {
2622 return_errors: false,
2623 do_checks: true,
2624 expected_op: expected_op_clone,
2625 }),
2626 );
2627 block_server.handle_requests(stream).await.unwrap();
2628 },
2629 async move {
2630 let (session_proxy, server) = fidl::endpoints::create_proxy();
2631
2632 let mapping = fblock::BlockOffsetMapping {
2633 source_block_offset: 0,
2634 target_block_offset: 10,
2635 length: 20,
2636 };
2637 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2638
2639 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2640 let vmo_id = session_proxy
2641 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2642 .await
2643 .unwrap()
2644 .unwrap();
2645
2646 let mut fifo =
2647 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2648 let (mut reader, mut writer) = fifo.async_io();
2649
2650 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2652 writer
2653 .write_entries(&BlockFifoRequest {
2654 command: BlockFifoCommand {
2655 opcode: BlockOpcode::Read.into_primitive(),
2656 ..Default::default()
2657 },
2658 vmoid: vmo_id.id,
2659 dev_offset: 1,
2660 length: 2,
2661 vmo_offset: 3,
2662 ..Default::default()
2663 })
2664 .await
2665 .unwrap();
2666
2667 let mut response = BlockFifoResponse::default();
2668 reader.read_entries(&mut response).await.unwrap();
2669 assert_eq!(response.status, zx::sys::ZX_OK);
2670
2671 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2673 writer
2674 .write_entries(&BlockFifoRequest {
2675 command: BlockFifoCommand {
2676 opcode: BlockOpcode::Write.into_primitive(),
2677 ..Default::default()
2678 },
2679 vmoid: vmo_id.id,
2680 dev_offset: 4,
2681 length: 5,
2682 vmo_offset: 6,
2683 ..Default::default()
2684 })
2685 .await
2686 .unwrap();
2687
2688 reader.read_entries(&mut response).await.unwrap();
2689 assert_eq!(response.status, zx::sys::ZX_OK);
2690
2691 *expected_op.lock() = Some(ExpectedOp::Flush);
2693 writer
2694 .write_entries(&BlockFifoRequest {
2695 command: BlockFifoCommand {
2696 opcode: BlockOpcode::Flush.into_primitive(),
2697 ..Default::default()
2698 },
2699 ..Default::default()
2700 })
2701 .await
2702 .unwrap();
2703
2704 reader.read_entries(&mut response).await.unwrap();
2705 assert_eq!(response.status, zx::sys::ZX_OK);
2706
2707 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2709 writer
2710 .write_entries(&BlockFifoRequest {
2711 command: BlockFifoCommand {
2712 opcode: BlockOpcode::Trim.into_primitive(),
2713 ..Default::default()
2714 },
2715 dev_offset: 7,
2716 length: 3,
2717 ..Default::default()
2718 })
2719 .await
2720 .unwrap();
2721
2722 reader.read_entries(&mut response).await.unwrap();
2723 assert_eq!(response.status, zx::sys::ZX_OK);
2724
2725 *expected_op.lock() = None;
2727 writer
2728 .write_entries(&BlockFifoRequest {
2729 command: BlockFifoCommand {
2730 opcode: BlockOpcode::Read.into_primitive(),
2731 ..Default::default()
2732 },
2733 vmoid: vmo_id.id,
2734 dev_offset: 19,
2735 length: 2,
2736 vmo_offset: 3,
2737 ..Default::default()
2738 })
2739 .await
2740 .unwrap();
2741
2742 reader.read_entries(&mut response).await.unwrap();
2743 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2744
2745 std::mem::drop(proxy);
2746 }
2747 );
2748 }
2749
2750 #[fuchsia::test]
2751 fn operation_map() {
2752 fn expect_map_result(
2753 mut operation: Operation,
2754 mapping: Option<BlockOffsetMapping>,
2755 max_blocks: Option<NonZero<u32>>,
2756 expected_operations: Vec<Operation>,
2757 ) {
2758 let mut ops = vec![];
2759 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2760 ops.push(operation);
2761 operation = remainder;
2762 }
2763 ops.push(operation);
2764 assert_eq!(ops, expected_operations);
2765 }
2766
2767 expect_map_result(
2769 Operation::Read {
2770 device_block_offset: 10,
2771 block_count: 200,
2772 _unused: 0,
2773 vmo_offset: 0,
2774 },
2775 None,
2776 None,
2777 vec![Operation::Read {
2778 device_block_offset: 10,
2779 block_count: 200,
2780 _unused: 0,
2781 vmo_offset: 0,
2782 }],
2783 );
2784
2785 expect_map_result(
2787 Operation::Read {
2788 device_block_offset: 10,
2789 block_count: 200,
2790 _unused: 0,
2791 vmo_offset: 0,
2792 },
2793 None,
2794 NonZero::new(120),
2795 vec![
2796 Operation::Read {
2797 device_block_offset: 10,
2798 block_count: 120,
2799 _unused: 0,
2800 vmo_offset: 0,
2801 },
2802 Operation::Read {
2803 device_block_offset: 130,
2804 block_count: 80,
2805 _unused: 0,
2806 vmo_offset: 120 * 512,
2807 },
2808 ],
2809 );
2810 expect_map_result(
2811 Operation::Write {
2812 device_block_offset: 10,
2813 block_count: 200,
2814 options: WriteOptions::PRE_BARRIER,
2815 vmo_offset: 0,
2816 },
2817 None,
2818 NonZero::new(120),
2819 vec![
2820 Operation::Write {
2821 device_block_offset: 10,
2822 block_count: 120,
2823 options: WriteOptions::PRE_BARRIER,
2824 vmo_offset: 0,
2825 },
2826 Operation::Write {
2827 device_block_offset: 130,
2828 block_count: 80,
2829 options: WriteOptions::empty(),
2830 vmo_offset: 120 * 512,
2831 },
2832 ],
2833 );
2834 expect_map_result(
2835 Operation::Trim { device_block_offset: 10, block_count: 200 },
2836 None,
2837 NonZero::new(120),
2838 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2839 );
2840
2841 expect_map_result(
2843 Operation::Read {
2844 device_block_offset: 10,
2845 block_count: 200,
2846 _unused: 0,
2847 vmo_offset: 0,
2848 },
2849 Some(BlockOffsetMapping {
2850 source_block_offset: 10,
2851 target_block_offset: 100,
2852 length: 200,
2853 }),
2854 NonZero::new(120),
2855 vec![
2856 Operation::Read {
2857 device_block_offset: 100,
2858 block_count: 120,
2859 _unused: 0,
2860 vmo_offset: 0,
2861 },
2862 Operation::Read {
2863 device_block_offset: 220,
2864 block_count: 80,
2865 _unused: 0,
2866 vmo_offset: 120 * 512,
2867 },
2868 ],
2869 );
2870 expect_map_result(
2871 Operation::Write {
2872 device_block_offset: 10,
2873 block_count: 200,
2874 options: WriteOptions::PRE_BARRIER,
2875 vmo_offset: 0,
2876 },
2877 Some(BlockOffsetMapping {
2878 source_block_offset: 10,
2879 target_block_offset: 100,
2880 length: 200,
2881 }),
2882 NonZero::new(120),
2883 vec![
2884 Operation::Write {
2885 device_block_offset: 100,
2886 block_count: 120,
2887 options: WriteOptions::PRE_BARRIER,
2888 vmo_offset: 0,
2889 },
2890 Operation::Write {
2891 device_block_offset: 220,
2892 block_count: 80,
2893 options: WriteOptions::empty(),
2894 vmo_offset: 120 * 512,
2895 },
2896 ],
2897 );
2898 expect_map_result(
2899 Operation::Trim { device_block_offset: 10, block_count: 200 },
2900 Some(BlockOffsetMapping {
2901 source_block_offset: 10,
2902 target_block_offset: 100,
2903 length: 200,
2904 }),
2905 NonZero::new(120),
2906 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2907 );
2908 }
2909}