1use crate::bin::Bin;
5use anyhow::{Error, anyhow};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use zx::HandleBased;
19use {
20 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn block_count(&self) -> Option<u64> {
40 match self {
41 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42 Self::Partition(PartitionInfo { block_range, .. }) => {
43 block_range.as_ref().map(|range| range.end - range.start)
44 }
45 }
46 }
47
48 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49 match self {
50 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52 max_transfer_blocks.clone()
53 }
54 }
55 }
56
57 fn max_transfer_size(&self, block_size: u32) -> u32 {
58 if let Some(max_blocks) = self.max_transfer_blocks() {
59 max_blocks.get() * block_size
60 } else {
61 MAX_TRANSFER_UNBOUNDED
62 }
63 }
64}
65
66#[derive(Clone, Default)]
68pub struct BlockInfo {
69 pub device_flags: fblock::Flag,
70 pub block_count: u64,
71 pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74#[derive(Clone, Default)]
76pub struct PartitionInfo {
77 pub device_flags: fblock::Flag,
79 pub max_transfer_blocks: Option<NonZero<u32>>,
80 pub block_range: Option<Range<u64>>,
84 pub type_guid: [u8; 16],
85 pub instance_guid: [u8; 16],
86 pub name: String,
87 pub flags: u64,
88}
89
90struct ActiveRequest<S> {
93 session: S,
94 group_or_request: GroupOrRequest,
95 trace_flow_id: TraceFlowId,
96 vmo_bin_key: Option<usize>,
97 status: zx::Status,
98 count: u32,
99 req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105 fn default() -> Self {
106 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107 }
108}
109
110impl<S> ActiveRequests<S> {
111 fn complete_and_take_response(
112 &self,
113 request_id: RequestId,
114 status: zx::Status,
115 ) -> Option<(S, BlockFifoResponse)> {
116 self.0.lock().complete_and_take_response(request_id, status)
117 }
118}
119
120struct ActiveRequestsInner<S> {
121 requests: Slab<ActiveRequest<S>>,
122 vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125impl<S> ActiveRequestsInner<S> {
127 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129 let group = &mut self.requests[request_id.0];
130
131 group.count = group.count.checked_sub(1).unwrap();
132 if status != zx::Status::OK && group.status == zx::Status::OK {
133 group.status = status
134 }
135
136 fuchsia_trace::duration!(
137 c"storage",
138 c"block_server::finish_transaction",
139 "request_id" => request_id.0,
140 "group_completed" => group.count == 0,
141 "status" => status.into_raw());
142 if let Some(trace_flow_id) = group.trace_flow_id {
143 fuchsia_trace::flow_step!(
144 c"storage",
145 c"block_server::finish_request",
146 trace_flow_id.get().into()
147 );
148 }
149 }
150
151 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153 let group = &self.requests[request_id.0];
154 match group.req_id {
155 Some(reqid) if group.count == 0 => {
156 let group = self.requests.remove(request_id.0);
157 if let Some(vmo_bin_key) = group.vmo_bin_key {
158 self.vmo_bin.release(vmo_bin_key);
159 }
160 Some((
161 group.session,
162 BlockFifoResponse {
163 status: group.status.into_raw(),
164 reqid,
165 group: group.group_or_request.group_id().unwrap_or(0),
166 ..Default::default()
167 },
168 ))
169 }
170 _ => None,
171 }
172 }
173
174 fn complete_and_take_response(
176 &mut self,
177 request_id: RequestId,
178 status: zx::Status,
179 ) -> Option<(S, BlockFifoResponse)> {
180 self.complete(request_id, status);
181 self.take_response(request_id)
182 }
183}
184
185pub struct BlockServer<SM> {
188 block_size: u32,
189 session_manager: Arc<SM>,
190}
191
192#[derive(Debug)]
194pub struct BlockOffsetMapping {
195 source_block_offset: u64,
196 target_block_offset: u64,
197 length: u64,
198}
199
200impl BlockOffsetMapping {
201 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202 blocks.0 >= self.source_block_offset
203 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204 }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208 type Error = zx::Status;
209
210 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213 Ok(Self {
214 source_block_offset: wire.source_block_offset,
215 target_block_offset: wire.target_block_offset,
216 length: wire.length,
217 })
218 }
219}
220
221pub struct OffsetMap {
223 mapping: Option<BlockOffsetMapping>,
224 max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230 Self { mapping: Some(mapping), max_transfer_blocks }
231 }
232
233 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235 Self { mapping: None, max_transfer_blocks }
236 }
237
238 pub fn is_empty(&self) -> bool {
239 self.mapping.is_none()
240 }
241
242 fn mapping(&self) -> Option<&BlockOffsetMapping> {
243 self.mapping.as_ref()
244 }
245
246 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247 self.max_transfer_blocks
248 }
249}
250
251pub trait SessionManager: 'static {
254 type Session;
255
256 fn on_attach_vmo(
257 self: Arc<Self>,
258 vmo: &Arc<zx::Vmo>,
259 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261 fn open_session(
266 self: Arc<Self>,
267 stream: fblock::SessionRequestStream,
268 offset_map: OffsetMap,
269 block_size: u32,
270 ) -> impl Future<Output = Result<(), Error>> + Send;
271
272 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275 fn get_volume_info(
277 &self,
278 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279 {
280 async { Err(zx::Status::NOT_SUPPORTED) }
281 }
282
283 fn query_slices(
285 &self,
286 _start_slices: &[u64],
287 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288 async { Err(zx::Status::NOT_SUPPORTED) }
289 }
290
291 fn extend(
293 &self,
294 _start_slice: u64,
295 _slice_count: u64,
296 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297 async { Err(zx::Status::NOT_SUPPORTED) }
298 }
299
300 fn shrink(
302 &self,
303 _start_slice: u64,
304 _slice_count: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306 async { Err(zx::Status::NOT_SUPPORTED) }
307 }
308
309 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314 type SM: SessionManager;
315
316 fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321 Self { block_size, session_manager: session_manager.into_session_manager() }
322 }
323
324 pub fn session_manager(&self) -> &SM {
325 self.session_manager.as_ref()
326 }
327
328 pub async fn handle_requests(
330 &self,
331 mut requests: fvolume::VolumeRequestStream,
332 ) -> Result<(), Error> {
333 let scope = fasync::Scope::new();
334 loop {
335 match requests.try_next().await {
336 Ok(Some(request)) => {
337 if let Some(session) = self.handle_request(request).await? {
338 scope.spawn(session.map(|_| ()));
339 }
340 }
341 Ok(None) => break,
342 Err(err) => log::warn!(err:?; "Invalid request"),
343 }
344 }
345 scope.await;
346 Ok(())
347 }
348
349 async fn handle_request(
352 &self,
353 request: fvolume::VolumeRequest,
354 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
355 match request {
356 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
357 Ok(info) => {
358 let max_transfer_size = info.max_transfer_size(self.block_size);
359 let (block_count, flags) = match info.as_ref() {
360 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
361 (*block_count, *device_flags)
362 }
363 DeviceInfo::Partition(partition_info) => {
364 let block_count = if let Some(range) =
365 partition_info.block_range.as_ref()
366 {
367 range.end - range.start
368 } else {
369 let volume_info = self.session_manager.get_volume_info().await?;
370 volume_info.0.slice_size * volume_info.1.partition_slice_count
371 / self.block_size as u64
372 };
373 (block_count, partition_info.device_flags)
374 }
375 };
376 responder.send(Ok(&fblock::BlockInfo {
377 block_count,
378 block_size: self.block_size,
379 max_transfer_size,
380 flags,
381 }))?;
382 }
383 Err(status) => responder.send(Err(status.into_raw()))?,
384 },
385 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
386 match self.device_info().await {
387 Ok(info) => {
388 return Ok(Some(self.session_manager.clone().open_session(
389 session.into_stream(),
390 OffsetMap::empty(info.max_transfer_blocks()),
391 self.block_size,
392 )));
393 }
394 Err(status) => session.close_with_epitaph(status)?,
395 }
396 }
397 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
398 session,
399 mapping,
400 control_handle: _,
401 } => match self.device_info().await {
402 Ok(info) => {
403 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
404 Ok(m) => m,
405 Err(status) => {
406 session.close_with_epitaph(status)?;
407 return Ok(None);
408 }
409 };
410 if let Some(max) = info.block_count() {
411 if initial_mapping.target_block_offset + initial_mapping.length > max {
412 log::warn!(
413 "Invalid mapping for session: {initial_mapping:?} (max {max})"
414 );
415 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
416 return Ok(None);
417 }
418 }
419 return Ok(Some(self.session_manager.clone().open_session(
420 session.into_stream(),
421 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
422 self.block_size,
423 )));
424 }
425 Err(status) => session.close_with_epitaph(status)?,
426 },
427 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
428 Ok(info) => {
429 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
430 let mut guid =
431 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
432 guid.value.copy_from_slice(&partition_info.type_guid);
433 responder.send(zx::sys::ZX_OK, Some(&guid))?;
434 } else {
435 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
436 }
437 }
438 Err(status) => {
439 responder.send(status.into_raw(), None)?;
440 }
441 },
442 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
443 match self.device_info().await {
444 Ok(info) => {
445 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
446 let mut guid =
447 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
448 guid.value.copy_from_slice(&partition_info.instance_guid);
449 responder.send(zx::sys::ZX_OK, Some(&guid))?;
450 } else {
451 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
452 }
453 }
454 Err(status) => {
455 responder.send(status.into_raw(), None)?;
456 }
457 }
458 }
459 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
460 Ok(info) => {
461 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
462 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
463 } else {
464 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
465 }
466 }
467 Err(status) => {
468 responder.send(status.into_raw(), None)?;
469 }
470 },
471 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
472 Ok(info) => {
473 if let DeviceInfo::Partition(info) = info.as_ref() {
474 let mut type_guid =
475 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
476 type_guid.value.copy_from_slice(&info.type_guid);
477 let mut instance_guid =
478 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
479 instance_guid.value.copy_from_slice(&info.instance_guid);
480 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
481 name: Some(info.name.clone()),
482 type_guid: Some(type_guid),
483 instance_guid: Some(instance_guid),
484 start_block_offset: info.block_range.as_ref().map(|range| range.start),
485 num_blocks: info
486 .block_range
487 .as_ref()
488 .map(|range| range.end - range.start),
489 flags: Some(info.flags),
490 ..Default::default()
491 }))?;
492 }
493 }
494 Err(status) => responder.send(Err(status.into_raw()))?,
495 },
496 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
497 match self.session_manager.query_slices(&start_slices).await {
498 Ok(mut results) => {
499 let results_len = results.len();
500 assert!(results_len <= 16);
501 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
502 responder.send(
503 zx::sys::ZX_OK,
504 &results.try_into().unwrap(),
505 results_len as u64,
506 )?;
507 }
508 Err(s) => {
509 responder.send(
510 s.into_raw(),
511 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
512 0,
513 )?;
514 }
515 }
516 }
517 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
518 match self.session_manager.get_volume_info().await {
519 Ok((manager_info, volume_info)) => {
520 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
521 }
522 Err(s) => responder.send(s.into_raw(), None, None)?,
523 }
524 }
525 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
526 responder.send(
527 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
528 .into_raw(),
529 )?;
530 }
531 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
532 responder.send(
533 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
534 .into_raw(),
535 )?;
536 }
537 fvolume::VolumeRequest::Destroy { responder, .. } => {
538 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
539 }
540 }
541 Ok(None)
542 }
543
544 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
545 self.session_manager.get_info().await
546 }
547}
548
549struct SessionHelper<SM: SessionManager> {
550 session_manager: Arc<SM>,
551 offset_map: OffsetMap,
552 block_size: u32,
553 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
554 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
555}
556
557impl<SM: SessionManager> SessionHelper<SM> {
558 fn new(
559 session_manager: Arc<SM>,
560 offset_map: OffsetMap,
561 block_size: u32,
562 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
563 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
564 Ok((
565 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
566 fifo,
567 ))
568 }
569
570 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
571 match request {
572 fblock::SessionRequest::GetFifo { responder } => {
573 let rights = zx::Rights::TRANSFER
574 | zx::Rights::READ
575 | zx::Rights::WRITE
576 | zx::Rights::SIGNAL
577 | zx::Rights::WAIT;
578 match self.peer_fifo.duplicate_handle(rights) {
579 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
580 Err(s) => responder.send(Err(s.into_raw()))?,
581 }
582 Ok(())
583 }
584 fblock::SessionRequest::AttachVmo { vmo, responder } => {
585 let vmo = Arc::new(vmo);
586 let vmo_id = {
587 let mut vmos = self.vmos.lock();
588 if vmos.len() == u16::MAX as usize {
589 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
590 return Ok(());
591 } else {
592 let vmo_id = match vmos.last_entry() {
593 None => 1,
594 Some(o) => {
595 o.key().checked_add(1).unwrap_or_else(|| {
596 let mut vmo_id = 1;
597 for (&id, _) in &*vmos {
599 if id > vmo_id {
600 break;
601 }
602 vmo_id = id + 1;
603 }
604 vmo_id
605 })
606 }
607 };
608 vmos.insert(vmo_id, vmo.clone());
609 vmo_id
610 }
611 };
612 self.session_manager.clone().on_attach_vmo(&vmo).await?;
613 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
614 Ok(())
615 }
616 fblock::SessionRequest::Close { responder } => {
617 responder.send(Ok(()))?;
618 Err(anyhow!("Closed"))
619 }
620 }
621 }
622
623 fn decode_fifo_request(
625 &self,
626 session: SM::Session,
627 request: &BlockFifoRequest,
628 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
629 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
630 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
631 .ok_or(zx::Status::INVALID_ARGS)
632 .and_then(|code| {
633 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
634 if request.length == 0 {
635 return Err(zx::Status::INVALID_ARGS);
636 }
637 if request.dev_offset.checked_add(request.length as u64).is_none()
639 || (code != BlockOpcode::Trim
640 && (request.length as u64 * self.block_size as u64)
641 .checked_add(request.vmo_offset)
642 .is_none())
643 {
644 return Err(zx::Status::OUT_OF_RANGE);
645 }
646 }
647 Ok(match code {
648 BlockOpcode::Read => Operation::Read {
649 device_block_offset: request.dev_offset,
650 block_count: request.length,
651 _unused: 0,
652 vmo_offset: request
653 .vmo_offset
654 .checked_mul(self.block_size as u64)
655 .ok_or(zx::Status::OUT_OF_RANGE)?,
656 options: ReadOptions {
657 inline_crypto_options: InlineCryptoOptions {
658 dun: request.dun,
659 slot: request.slot,
660 },
661 },
662 },
663 BlockOpcode::Write => {
664 let mut options = WriteOptions {
665 inline_crypto_options: InlineCryptoOptions {
666 dun: request.dun,
667 slot: request.slot,
668 },
669 ..WriteOptions::default()
670 };
671 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
672 options.flags |= WriteFlags::FORCE_ACCESS;
673 }
674 if flags.contains(BlockIoFlag::PRE_BARRIER) {
675 options.flags |= WriteFlags::PRE_BARRIER;
676 }
677 Operation::Write {
678 device_block_offset: request.dev_offset,
679 block_count: request.length,
680 _unused: 0,
681 options,
682 vmo_offset: request
683 .vmo_offset
684 .checked_mul(self.block_size as u64)
685 .ok_or(zx::Status::OUT_OF_RANGE)?,
686 }
687 }
688 BlockOpcode::Flush => Operation::Flush,
689 BlockOpcode::Trim => Operation::Trim {
690 device_block_offset: request.dev_offset,
691 block_count: request.length,
692 },
693 BlockOpcode::CloseVmo => Operation::CloseVmo,
694 })
695 });
696
697 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
698 GroupOrRequest::Group(request.group)
699 } else {
700 GroupOrRequest::Request(request.reqid)
701 };
702
703 let mut active_requests = self.session_manager.active_requests().0.lock();
704 let mut request_id = None;
705
706 if group_or_request.is_group() {
716 for (key, group) in &mut active_requests.requests {
720 if group.group_or_request == group_or_request {
721 if group.req_id.is_some() {
722 if group.status == zx::Status::OK {
724 group.status = zx::Status::INVALID_ARGS;
725 }
726 return Err(None);
728 }
729 if flags.contains(BlockIoFlag::GROUP_LAST) {
730 group.req_id = Some(request.reqid);
731 if group.status != zx::Status::OK {
734 operation = Err(group.status);
735 }
736 } else if group.status != zx::Status::OK {
737 return Err(None);
740 }
741 request_id = Some(RequestId(key));
742 group.count += 1;
743 break;
744 }
745 }
746 }
747
748 let mut retain_vmo = false;
749 let vmo = match &operation {
750 Ok(Operation::Read { .. } | Operation::Write { .. }) => {
751 self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
752 retain_vmo = true;
753 Ok(Some(vmo))
754 })
755 }
756 Ok(Operation::CloseVmo) => {
757 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
758 active_requests.vmo_bin.add(vmo.clone());
759 Ok(Some(vmo))
760 })
761 }
762 _ => Ok(None),
763 }
764 .unwrap_or_else(|e| {
765 operation = Err(e);
766 None
767 });
768
769 let trace_flow_id = NonZero::new(request.trace_flow_id);
770 let request_id = request_id.unwrap_or_else(|| {
771 let vmo_bin_key =
772 if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
773 RequestId(active_requests.requests.insert(ActiveRequest {
774 session,
775 group_or_request,
776 trace_flow_id,
777 vmo_bin_key,
778 status: zx::Status::OK,
779 count: 1,
780 req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
781 || flags.contains(BlockIoFlag::GROUP_LAST)
782 {
783 Some(request.reqid)
784 } else {
785 None
786 },
787 }))
788 });
789
790 Ok(DecodedRequest {
791 request_id,
792 trace_flow_id,
793 operation: operation.map_err(|status| {
794 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
795 })?,
796 vmo,
797 })
798 }
799
800 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
801 std::mem::take(&mut *self.vmos.lock())
802 }
803
804 fn map_request(
806 &self,
807 mut request: DecodedRequest,
808 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
809 let mut active_requests = self.session_manager.active_requests().0.lock();
810 let active_request = &mut active_requests.requests[request.request_id.0];
811 if active_request.status != zx::Status::OK {
812 return Err(active_requests
813 .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
814 .map(|(_, r)| r));
815 }
816 let mapping = self.offset_map.mapping();
817 match (mapping, request.operation.blocks()) {
818 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
819 return Err(active_requests
820 .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
821 .map(|(_, r)| r));
822 }
823 _ => {}
824 }
825 let remainder = request.operation.map(
826 self.offset_map.mapping(),
827 self.offset_map.max_transfer_blocks(),
828 self.block_size,
829 );
830 if remainder.is_some() {
831 active_request.count += 1;
832 }
833 static CACHE: AtomicU64 = AtomicU64::new(0);
834 if let Some(context) =
835 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
836 {
837 use fuchsia_trace::ArgValue;
838 let trace_args = [
839 ArgValue::of("request_id", request.request_id.0),
840 ArgValue::of("opcode", request.operation.trace_label()),
841 ];
842 let _scope = fuchsia_trace::duration(
843 c"storage",
844 c"block_server::start_transaction",
845 &trace_args,
846 );
847 if let Some(trace_flow_id) = active_request.trace_flow_id {
848 fuchsia_trace::flow_step(
849 &context,
850 c"block_server::start_transaction",
851 trace_flow_id.get().into(),
852 &[],
853 );
854 }
855 }
856 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
857 Ok((request, remainder))
858 }
859
860 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
861 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
862 }
863}
864
865#[repr(transparent)]
866#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
867pub struct RequestId(usize);
868
869#[derive(Clone, Debug)]
870struct DecodedRequest {
871 request_id: RequestId,
872 trace_flow_id: TraceFlowId,
873 operation: Operation,
874 vmo: Option<Arc<zx::Vmo>>,
875}
876
877pub type WriteFlags = block_protocol::WriteFlags;
879pub type WriteOptions = block_protocol::WriteOptions;
880pub type ReadOptions = block_protocol::ReadOptions;
881
882#[repr(C)]
883#[derive(Clone, Debug, PartialEq, Eq)]
884pub enum Operation {
885 Read {
890 device_block_offset: u64,
891 block_count: u32,
892 _unused: u32,
893 vmo_offset: u64,
894 options: ReadOptions,
895 },
896 Write {
897 device_block_offset: u64,
898 block_count: u32,
899 _unused: u32,
900 vmo_offset: u64,
901 options: WriteOptions,
902 },
903 Flush,
904 Trim {
905 device_block_offset: u64,
906 block_count: u32,
907 },
908 CloseVmo,
910}
911
912impl Operation {
913 fn trace_label(&self) -> &'static str {
914 match self {
915 Operation::Read { .. } => "read",
916 Operation::Write { .. } => "write",
917 Operation::Flush { .. } => "flush",
918 Operation::Trim { .. } => "trim",
919 Operation::CloseVmo { .. } => "close_vmo",
920 }
921 }
922
923 fn blocks(&self) -> Option<(u64, u32)> {
925 match self {
926 Operation::Read { device_block_offset, block_count, .. }
927 | Operation::Write { device_block_offset, block_count, .. }
928 | Operation::Trim { device_block_offset, block_count, .. } => {
929 Some((*device_block_offset, *block_count))
930 }
931 _ => None,
932 }
933 }
934
935 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
937 match self {
938 Operation::Read { device_block_offset, block_count, .. }
939 | Operation::Write { device_block_offset, block_count, .. }
940 | Operation::Trim { device_block_offset, block_count, .. } => {
941 Some((device_block_offset, block_count))
942 }
943 _ => None,
944 }
945 }
946
947 fn map(
950 &mut self,
951 mapping: Option<&BlockOffsetMapping>,
952 max_blocks: Option<NonZero<u32>>,
953 block_size: u32,
954 ) -> Option<Self> {
955 let mut max = match self {
956 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
957 _ => None,
958 };
959 let (offset, length) = self.blocks_mut()?;
960 let orig_offset = *offset;
961 if let Some(mapping) = mapping {
962 let delta = *offset - mapping.source_block_offset;
963 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
964 *offset = mapping.target_block_offset + delta;
965 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
966 max = match max {
967 None => Some(mapping_max),
968 Some(m) => Some(std::cmp::min(m, mapping_max)),
969 };
970 };
971 if let Some(max) = max {
972 if *length as u64 > max {
973 let rem = (*length as u64 - max) as u32;
974 *length = max as u32;
975 return Some(match self {
976 Operation::Read {
977 device_block_offset: _,
978 block_count: _,
979 vmo_offset,
980 _unused,
981 options,
982 } => Operation::Read {
983 device_block_offset: orig_offset + max,
984 block_count: rem,
985 vmo_offset: *vmo_offset + max * block_size as u64,
986 _unused: *_unused,
987 options: *options,
988 },
989 Operation::Write {
990 device_block_offset: _,
991 block_count: _,
992 _unused,
993 vmo_offset,
994 options,
995 } => {
996 let new_options = WriteOptions {
998 flags: options.flags & !WriteFlags::PRE_BARRIER,
999 ..*options
1000 };
1001
1002 Operation::Write {
1003 device_block_offset: orig_offset + max,
1004 block_count: rem,
1005 _unused: *_unused,
1006 vmo_offset: *vmo_offset + max * block_size as u64,
1007 options: new_options,
1008 }
1009 }
1010 Operation::Trim { device_block_offset: _, block_count: _ } => {
1011 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1012 }
1013 _ => unreachable!(),
1014 });
1015 }
1016 }
1017 None
1018 }
1019}
1020
1021#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1022pub enum GroupOrRequest {
1023 Group(u16),
1024 Request(u32),
1025}
1026
1027impl GroupOrRequest {
1028 fn is_group(&self) -> bool {
1029 matches!(self, Self::Group(_))
1030 }
1031
1032 fn group_id(&self) -> Option<u16> {
1033 match self {
1034 Self::Group(id) => Some(*id),
1035 Self::Request(_) => None,
1036 }
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use super::{
1043 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1044 TraceFlowId,
1045 };
1046 use assert_matches::assert_matches;
1047 use block_protocol::{
1048 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1049 WriteOptions,
1050 };
1051 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1052 use fuchsia_sync::Mutex;
1053 use futures::FutureExt as _;
1054 use futures::channel::oneshot;
1055 use futures::future::BoxFuture;
1056 use std::borrow::Cow;
1057 use std::future::poll_fn;
1058 use std::num::NonZero;
1059 use std::pin::pin;
1060 use std::sync::Arc;
1061 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1062 use std::task::{Context, Poll};
1063 use zx::{AsHandleRef as _, HandleBased as _};
1064 use {
1065 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1066 fuchsia_async as fasync,
1067 };
1068
1069 #[derive(Default)]
1070 struct MockInterface {
1071 read_hook: Option<
1072 Box<
1073 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1074 + Send
1075 + Sync,
1076 >,
1077 >,
1078 write_hook:
1079 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1080 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1081 }
1082
1083 impl super::async_interface::Interface for MockInterface {
1084 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1085 Ok(())
1086 }
1087
1088 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1089 Ok(Cow::Owned(test_device_info()))
1090 }
1091
1092 async fn read(
1093 &self,
1094 device_block_offset: u64,
1095 block_count: u32,
1096 vmo: &Arc<zx::Vmo>,
1097 vmo_offset: u64,
1098 _opts: ReadOptions,
1099 _trace_flow_id: TraceFlowId,
1100 ) -> Result<(), zx::Status> {
1101 if let Some(read_hook) = &self.read_hook {
1102 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1103 } else {
1104 unimplemented!();
1105 }
1106 }
1107
1108 async fn write(
1109 &self,
1110 device_block_offset: u64,
1111 _block_count: u32,
1112 _vmo: &Arc<zx::Vmo>,
1113 _vmo_offset: u64,
1114 _opts: WriteOptions,
1115 _trace_flow_id: TraceFlowId,
1116 ) -> Result<(), zx::Status> {
1117 if let Some(write_hook) = &self.write_hook {
1118 write_hook(device_block_offset).await
1119 } else {
1120 unimplemented!();
1121 }
1122 }
1123
1124 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1125 unreachable!();
1126 }
1127
1128 fn barrier(&self) -> Result<(), zx::Status> {
1129 if let Some(barrier_hook) = &self.barrier_hook {
1130 barrier_hook()
1131 } else {
1132 unimplemented!();
1133 }
1134 }
1135
1136 async fn trim(
1137 &self,
1138 _device_block_offset: u64,
1139 _block_count: u32,
1140 _trace_flow_id: TraceFlowId,
1141 ) -> Result<(), zx::Status> {
1142 unreachable!();
1143 }
1144
1145 async fn get_volume_info(
1146 &self,
1147 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1148 let () = std::future::pending().await;
1150 unreachable!();
1151 }
1152 }
1153
1154 const BLOCK_SIZE: u32 = 512;
1155 const MAX_TRANSFER_BLOCKS: u32 = 10;
1156
1157 fn test_device_info() -> DeviceInfo {
1158 DeviceInfo::Partition(PartitionInfo {
1159 device_flags: fblock::Flag::READONLY,
1160 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1161 block_range: Some(0..100),
1162 type_guid: [1; 16],
1163 instance_guid: [2; 16],
1164 name: "foo".to_string(),
1165 flags: 0xabcd,
1166 })
1167 }
1168
1169 #[fuchsia::test]
1170 async fn test_split_request_only_issues_single_barrier() {
1171 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1172 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1173 let barrier_called = Arc::new(AtomicU32::new(0));
1174 let barrier_called_clone = barrier_called.clone();
1175
1176 futures::join!(
1177 async move {
1178 let block_server = BlockServer::new(
1179 BLOCK_SIZE,
1180 Arc::new(MockInterface {
1181 barrier_hook: Some(Box::new(move || {
1182 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1183 Ok(())
1184 })),
1185 write_hook: Some(Box::new(move |_device_block_offset| {
1186 Box::pin(async move { Ok(()) })
1187 })),
1188 ..MockInterface::default()
1189 }),
1190 );
1191 block_server.handle_requests(stream).await.unwrap();
1192 },
1193 async move {
1194 let (session_proxy, server) = fidl::endpoints::create_proxy();
1195
1196 proxy.open_session(server).unwrap();
1197
1198 let vmo_id = session_proxy
1199 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1200 .await
1201 .unwrap()
1202 .unwrap();
1203 assert_ne!(vmo_id.id, 0);
1204
1205 let mut fifo =
1206 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1207 let (mut reader, mut writer) = fifo.async_io();
1208 writer
1211 .write_entries(&BlockFifoRequest {
1212 command: BlockFifoCommand {
1213 opcode: BlockOpcode::Write.into_primitive(),
1214 flags: BlockIoFlag::PRE_BARRIER.bits(),
1215 ..Default::default()
1216 },
1217 vmoid: vmo_id.id,
1218 dev_offset: 0,
1219 length: MAX_TRANSFER_BLOCKS + 1,
1220 vmo_offset: 6,
1221 ..Default::default()
1222 })
1223 .await
1224 .unwrap();
1225
1226 let mut response = BlockFifoResponse::default();
1227 reader.read_entries(&mut response).await.unwrap();
1228 assert_eq!(response.status, zx::sys::ZX_OK);
1229
1230 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1231
1232 std::mem::drop(proxy);
1233 }
1234 );
1235 }
1236
1237 #[fuchsia::test]
1238 async fn test_barriers_not_supported() {
1239 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1240 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1241
1242 futures::join!(
1243 async move {
1244 let block_server = BlockServer::new(
1245 BLOCK_SIZE,
1246 Arc::new(MockInterface {
1247 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1248 write_hook: Some(Box::new(move |_device_block_offset| {
1249 Box::pin(async move { Ok(()) })
1250 })),
1251 ..MockInterface::default()
1252 }),
1253 );
1254 block_server.handle_requests(stream).await.unwrap();
1255 },
1256 async move {
1257 let (session_proxy, server) = fidl::endpoints::create_proxy();
1258
1259 proxy.open_session(server).unwrap();
1260
1261 let vmo_id = session_proxy
1262 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1263 .await
1264 .unwrap()
1265 .unwrap();
1266 assert_ne!(vmo_id.id, 0);
1267
1268 let mut fifo =
1269 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1270 let (mut reader, mut writer) = fifo.async_io();
1271
1272 writer
1275 .write_entries(&BlockFifoRequest {
1276 command: BlockFifoCommand {
1277 opcode: BlockOpcode::Write.into_primitive(),
1278 flags: BlockIoFlag::PRE_BARRIER.bits(),
1279 ..Default::default()
1280 },
1281 vmoid: vmo_id.id,
1282 dev_offset: 0,
1283 length: MAX_TRANSFER_BLOCKS + 1,
1284 vmo_offset: 6,
1285 ..Default::default()
1286 })
1287 .await
1288 .unwrap();
1289
1290 let mut response = BlockFifoResponse::default();
1291 reader.read_entries(&mut response).await.unwrap();
1292 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1293
1294 std::mem::drop(proxy);
1295 }
1296 );
1297 }
1298
1299 #[fuchsia::test]
1300 async fn test_barriers_ordering() {
1301 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1302 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1303 let barrier_called = Arc::new(AtomicBool::new(false));
1304
1305 futures::join!(
1306 async move {
1307 let barrier_called_clone = barrier_called.clone();
1308 let block_server = BlockServer::new(
1309 BLOCK_SIZE,
1310 Arc::new(MockInterface {
1311 barrier_hook: Some(Box::new(move || {
1312 barrier_called.store(true, Ordering::Relaxed);
1313 Ok(())
1314 })),
1315 write_hook: Some(Box::new(move |device_block_offset| {
1316 let barrier_called = barrier_called_clone.clone();
1317 Box::pin(async move {
1318 if device_block_offset % 2 == 0 {
1320 fasync::Timer::new(fasync::MonotonicInstant::after(
1321 zx::MonotonicDuration::from_millis(200),
1322 ))
1323 .await;
1324 }
1325 assert!(barrier_called.load(Ordering::Relaxed));
1326 Ok(())
1327 })
1328 })),
1329 ..MockInterface::default()
1330 }),
1331 );
1332 block_server.handle_requests(stream).await.unwrap();
1333 },
1334 async move {
1335 let (session_proxy, server) = fidl::endpoints::create_proxy();
1336
1337 proxy.open_session(server).unwrap();
1338
1339 let vmo_id = session_proxy
1340 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1341 .await
1342 .unwrap()
1343 .unwrap();
1344 assert_ne!(vmo_id.id, 0);
1345
1346 let mut fifo =
1347 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1348 let (mut reader, mut writer) = fifo.async_io();
1349
1350 writer
1351 .write_entries(&BlockFifoRequest {
1352 command: BlockFifoCommand {
1353 opcode: BlockOpcode::Write.into_primitive(),
1354 flags: BlockIoFlag::PRE_BARRIER.bits(),
1355 ..Default::default()
1356 },
1357 vmoid: vmo_id.id,
1358 dev_offset: 0,
1359 length: 5,
1360 vmo_offset: 6,
1361 ..Default::default()
1362 })
1363 .await
1364 .unwrap();
1365
1366 for i in 0..10 {
1367 writer
1368 .write_entries(&BlockFifoRequest {
1369 command: BlockFifoCommand {
1370 opcode: BlockOpcode::Write.into_primitive(),
1371 ..Default::default()
1372 },
1373 vmoid: vmo_id.id,
1374 dev_offset: i + 1,
1375 length: 5,
1376 vmo_offset: 6,
1377 ..Default::default()
1378 })
1379 .await
1380 .unwrap();
1381 }
1382 for _ in 0..11 {
1383 let mut response = BlockFifoResponse::default();
1384 reader.read_entries(&mut response).await.unwrap();
1385 assert_eq!(response.status, zx::sys::ZX_OK);
1386 }
1387
1388 std::mem::drop(proxy);
1389 }
1390 );
1391 }
1392
1393 #[fuchsia::test]
1394 async fn test_info() {
1395 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1396
1397 futures::join!(
1398 async {
1399 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1400 block_server.handle_requests(stream).await.unwrap();
1401 },
1402 async {
1403 let expected_info = test_device_info();
1404 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1405 info
1406 } else {
1407 unreachable!()
1408 };
1409
1410 let block_info = proxy.get_info().await.unwrap().unwrap();
1411 assert_eq!(
1412 block_info.block_count,
1413 partition_info.block_range.as_ref().unwrap().end
1414 - partition_info.block_range.as_ref().unwrap().start
1415 );
1416 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1417
1418 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1419
1420 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1421 assert_eq!(status, zx::sys::ZX_OK);
1422 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1423
1424 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1425 assert_eq!(status, zx::sys::ZX_OK);
1426 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1427
1428 let (status, name) = proxy.get_name().await.unwrap();
1429 assert_eq!(status, zx::sys::ZX_OK);
1430 assert_eq!(name.as_ref(), Some(&partition_info.name));
1431
1432 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1433 assert_eq!(metadata.name, name);
1434 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1435 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1436 assert_eq!(
1437 metadata.start_block_offset,
1438 Some(partition_info.block_range.as_ref().unwrap().start)
1439 );
1440 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1441 assert_eq!(metadata.flags, Some(partition_info.flags));
1442
1443 std::mem::drop(proxy);
1444 }
1445 );
1446 }
1447
1448 #[fuchsia::test]
1449 async fn test_attach_vmo() {
1450 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1451
1452 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1453 let koid = vmo.get_koid().unwrap();
1454
1455 futures::join!(
1456 async {
1457 let block_server = BlockServer::new(
1458 BLOCK_SIZE,
1459 Arc::new(MockInterface {
1460 read_hook: Some(Box::new(move |_, _, vmo, _| {
1461 assert_eq!(vmo.get_koid().unwrap(), koid);
1462 Box::pin(async { Ok(()) })
1463 })),
1464 ..MockInterface::default()
1465 }),
1466 );
1467 block_server.handle_requests(stream).await.unwrap();
1468 },
1469 async move {
1470 let (session_proxy, server) = fidl::endpoints::create_proxy();
1471
1472 proxy.open_session(server).unwrap();
1473
1474 let vmo_id = session_proxy
1475 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1476 .await
1477 .unwrap()
1478 .unwrap();
1479 assert_ne!(vmo_id.id, 0);
1480
1481 let mut fifo =
1482 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1483 let (mut reader, mut writer) = fifo.async_io();
1484
1485 let mut count = 1;
1487 loop {
1488 match session_proxy
1489 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1490 .await
1491 .unwrap()
1492 {
1493 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1494 Err(e) => {
1495 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1496 break;
1497 }
1498 }
1499
1500 if count % 10 == 0 {
1502 writer
1503 .write_entries(&BlockFifoRequest {
1504 command: BlockFifoCommand {
1505 opcode: BlockOpcode::Read.into_primitive(),
1506 ..Default::default()
1507 },
1508 vmoid: vmo_id.id,
1509 length: 1,
1510 ..Default::default()
1511 })
1512 .await
1513 .unwrap();
1514
1515 let mut response = BlockFifoResponse::default();
1516 reader.read_entries(&mut response).await.unwrap();
1517 assert_eq!(response.status, zx::sys::ZX_OK);
1518 }
1519
1520 count += 1;
1521 }
1522
1523 assert_eq!(count, u16::MAX as u64);
1524
1525 writer
1527 .write_entries(&BlockFifoRequest {
1528 command: BlockFifoCommand {
1529 opcode: BlockOpcode::CloseVmo.into_primitive(),
1530 ..Default::default()
1531 },
1532 vmoid: vmo_id.id,
1533 ..Default::default()
1534 })
1535 .await
1536 .unwrap();
1537
1538 let mut response = BlockFifoResponse::default();
1539 reader.read_entries(&mut response).await.unwrap();
1540 assert_eq!(response.status, zx::sys::ZX_OK);
1541
1542 let new_vmo_id = session_proxy
1543 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1544 .await
1545 .unwrap()
1546 .unwrap();
1547 assert_eq!(new_vmo_id.id, vmo_id.id);
1549
1550 std::mem::drop(proxy);
1551 }
1552 );
1553 }
1554
1555 #[fuchsia::test]
1556 async fn test_close() {
1557 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1558
1559 let mut server = std::pin::pin!(
1560 async {
1561 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1562 block_server.handle_requests(stream).await.unwrap();
1563 }
1564 .fuse()
1565 );
1566
1567 let mut client = std::pin::pin!(
1568 async {
1569 let (session_proxy, server) = fidl::endpoints::create_proxy();
1570
1571 proxy.open_session(server).unwrap();
1572
1573 std::mem::drop(proxy);
1576
1577 session_proxy.close().await.unwrap().unwrap();
1578
1579 let _: () = std::future::pending().await;
1581 }
1582 .fuse()
1583 );
1584
1585 futures::select!(
1586 _ = server => {}
1587 _ = client => unreachable!(),
1588 );
1589 }
1590
1591 #[derive(Default)]
1592 struct IoMockInterface {
1593 do_checks: bool,
1594 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1595 return_errors: bool,
1596 }
1597
1598 #[derive(Debug)]
1599 enum ExpectedOp {
1600 Read(u64, u32, u64),
1601 Write(u64, u32, u64),
1602 Trim(u64, u32),
1603 Flush,
1604 }
1605
1606 impl super::async_interface::Interface for IoMockInterface {
1607 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1608 Ok(())
1609 }
1610
1611 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1612 Ok(Cow::Owned(test_device_info()))
1613 }
1614
1615 async fn read(
1616 &self,
1617 device_block_offset: u64,
1618 block_count: u32,
1619 _vmo: &Arc<zx::Vmo>,
1620 vmo_offset: u64,
1621 _opts: ReadOptions,
1622 _trace_flow_id: TraceFlowId,
1623 ) -> Result<(), zx::Status> {
1624 if self.return_errors {
1625 Err(zx::Status::INTERNAL)
1626 } else {
1627 if self.do_checks {
1628 assert_matches!(
1629 self.expected_op.lock().take(),
1630 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1631 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1632 "Read {device_block_offset} {block_count} {vmo_offset}"
1633 );
1634 }
1635 Ok(())
1636 }
1637 }
1638
1639 async fn write(
1640 &self,
1641 device_block_offset: u64,
1642 block_count: u32,
1643 _vmo: &Arc<zx::Vmo>,
1644 vmo_offset: u64,
1645 _write_opts: WriteOptions,
1646 _trace_flow_id: TraceFlowId,
1647 ) -> Result<(), zx::Status> {
1648 if self.return_errors {
1649 Err(zx::Status::NOT_SUPPORTED)
1650 } else {
1651 if self.do_checks {
1652 assert_matches!(
1653 self.expected_op.lock().take(),
1654 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1655 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1656 "Write {device_block_offset} {block_count} {vmo_offset}"
1657 );
1658 }
1659 Ok(())
1660 }
1661 }
1662
1663 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1664 if self.return_errors {
1665 Err(zx::Status::NO_RESOURCES)
1666 } else {
1667 if self.do_checks {
1668 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1669 }
1670 Ok(())
1671 }
1672 }
1673
1674 fn barrier(&self) -> Result<(), zx::Status> {
1675 unreachable!()
1676 }
1677
1678 async fn trim(
1679 &self,
1680 device_block_offset: u64,
1681 block_count: u32,
1682 _trace_flow_id: TraceFlowId,
1683 ) -> Result<(), zx::Status> {
1684 if self.return_errors {
1685 Err(zx::Status::NO_MEMORY)
1686 } else {
1687 if self.do_checks {
1688 assert_matches!(
1689 self.expected_op.lock().take(),
1690 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1691 block_count == b,
1692 "Trim {device_block_offset} {block_count}"
1693 );
1694 }
1695 Ok(())
1696 }
1697 }
1698 }
1699
1700 #[fuchsia::test]
1701 async fn test_io() {
1702 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1703
1704 let expected_op = Arc::new(Mutex::new(None));
1705 let expected_op_clone = expected_op.clone();
1706
1707 let server = async {
1708 let block_server = BlockServer::new(
1709 BLOCK_SIZE,
1710 Arc::new(IoMockInterface {
1711 return_errors: false,
1712 do_checks: true,
1713 expected_op: expected_op_clone,
1714 }),
1715 );
1716 block_server.handle_requests(stream).await.unwrap();
1717 };
1718
1719 let client = async move {
1720 let (session_proxy, server) = fidl::endpoints::create_proxy();
1721
1722 proxy.open_session(server).unwrap();
1723
1724 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1725 let vmo_id = session_proxy
1726 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1727 .await
1728 .unwrap()
1729 .unwrap();
1730
1731 let mut fifo =
1732 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1733 let (mut reader, mut writer) = fifo.async_io();
1734
1735 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1737 writer
1738 .write_entries(&BlockFifoRequest {
1739 command: BlockFifoCommand {
1740 opcode: BlockOpcode::Read.into_primitive(),
1741 ..Default::default()
1742 },
1743 vmoid: vmo_id.id,
1744 dev_offset: 1,
1745 length: 2,
1746 vmo_offset: 3,
1747 ..Default::default()
1748 })
1749 .await
1750 .unwrap();
1751
1752 let mut response = BlockFifoResponse::default();
1753 reader.read_entries(&mut response).await.unwrap();
1754 assert_eq!(response.status, zx::sys::ZX_OK);
1755
1756 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1758 writer
1759 .write_entries(&BlockFifoRequest {
1760 command: BlockFifoCommand {
1761 opcode: BlockOpcode::Write.into_primitive(),
1762 ..Default::default()
1763 },
1764 vmoid: vmo_id.id,
1765 dev_offset: 4,
1766 length: 5,
1767 vmo_offset: 6,
1768 ..Default::default()
1769 })
1770 .await
1771 .unwrap();
1772
1773 let mut response = BlockFifoResponse::default();
1774 reader.read_entries(&mut response).await.unwrap();
1775 assert_eq!(response.status, zx::sys::ZX_OK);
1776
1777 *expected_op.lock() = Some(ExpectedOp::Flush);
1779 writer
1780 .write_entries(&BlockFifoRequest {
1781 command: BlockFifoCommand {
1782 opcode: BlockOpcode::Flush.into_primitive(),
1783 ..Default::default()
1784 },
1785 ..Default::default()
1786 })
1787 .await
1788 .unwrap();
1789
1790 reader.read_entries(&mut response).await.unwrap();
1791 assert_eq!(response.status, zx::sys::ZX_OK);
1792
1793 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1795 writer
1796 .write_entries(&BlockFifoRequest {
1797 command: BlockFifoCommand {
1798 opcode: BlockOpcode::Trim.into_primitive(),
1799 ..Default::default()
1800 },
1801 dev_offset: 7,
1802 length: 8,
1803 ..Default::default()
1804 })
1805 .await
1806 .unwrap();
1807
1808 reader.read_entries(&mut response).await.unwrap();
1809 assert_eq!(response.status, zx::sys::ZX_OK);
1810
1811 std::mem::drop(proxy);
1812 };
1813
1814 futures::join!(server, client);
1815 }
1816
1817 #[fuchsia::test]
1818 async fn test_io_errors() {
1819 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1820
1821 futures::join!(
1822 async {
1823 let block_server = BlockServer::new(
1824 BLOCK_SIZE,
1825 Arc::new(IoMockInterface {
1826 return_errors: true,
1827 do_checks: false,
1828 expected_op: Arc::new(Mutex::new(None)),
1829 }),
1830 );
1831 block_server.handle_requests(stream).await.unwrap();
1832 },
1833 async move {
1834 let (session_proxy, server) = fidl::endpoints::create_proxy();
1835
1836 proxy.open_session(server).unwrap();
1837
1838 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1839 let vmo_id = session_proxy
1840 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1841 .await
1842 .unwrap()
1843 .unwrap();
1844
1845 let mut fifo =
1846 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1847 let (mut reader, mut writer) = fifo.async_io();
1848
1849 writer
1851 .write_entries(&BlockFifoRequest {
1852 command: BlockFifoCommand {
1853 opcode: BlockOpcode::Read.into_primitive(),
1854 ..Default::default()
1855 },
1856 vmoid: vmo_id.id,
1857 length: 1,
1858 reqid: 1,
1859 ..Default::default()
1860 })
1861 .await
1862 .unwrap();
1863
1864 let mut response = BlockFifoResponse::default();
1865 reader.read_entries(&mut response).await.unwrap();
1866 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1867
1868 writer
1870 .write_entries(&BlockFifoRequest {
1871 command: BlockFifoCommand {
1872 opcode: BlockOpcode::Write.into_primitive(),
1873 ..Default::default()
1874 },
1875 vmoid: vmo_id.id,
1876 length: 1,
1877 reqid: 2,
1878 ..Default::default()
1879 })
1880 .await
1881 .unwrap();
1882
1883 reader.read_entries(&mut response).await.unwrap();
1884 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1885
1886 writer
1888 .write_entries(&BlockFifoRequest {
1889 command: BlockFifoCommand {
1890 opcode: BlockOpcode::Flush.into_primitive(),
1891 ..Default::default()
1892 },
1893 reqid: 3,
1894 ..Default::default()
1895 })
1896 .await
1897 .unwrap();
1898
1899 reader.read_entries(&mut response).await.unwrap();
1900 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1901
1902 writer
1904 .write_entries(&BlockFifoRequest {
1905 command: BlockFifoCommand {
1906 opcode: BlockOpcode::Trim.into_primitive(),
1907 ..Default::default()
1908 },
1909 reqid: 4,
1910 length: 1,
1911 ..Default::default()
1912 })
1913 .await
1914 .unwrap();
1915
1916 reader.read_entries(&mut response).await.unwrap();
1917 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1918
1919 std::mem::drop(proxy);
1920 }
1921 );
1922 }
1923
1924 #[fuchsia::test]
1925 async fn test_invalid_args() {
1926 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1927
1928 futures::join!(
1929 async {
1930 let block_server = BlockServer::new(
1931 BLOCK_SIZE,
1932 Arc::new(IoMockInterface {
1933 return_errors: false,
1934 do_checks: false,
1935 expected_op: Arc::new(Mutex::new(None)),
1936 }),
1937 );
1938 block_server.handle_requests(stream).await.unwrap();
1939 },
1940 async move {
1941 let (session_proxy, server) = fidl::endpoints::create_proxy();
1942
1943 proxy.open_session(server).unwrap();
1944
1945 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1946 let vmo_id = session_proxy
1947 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1948 .await
1949 .unwrap()
1950 .unwrap();
1951
1952 let mut fifo =
1953 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1954
1955 async fn test(
1956 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1957 request: BlockFifoRequest,
1958 ) -> Result<(), zx::Status> {
1959 let (mut reader, mut writer) = fifo.async_io();
1960 writer.write_entries(&request).await.unwrap();
1961 let mut response = BlockFifoResponse::default();
1962 reader.read_entries(&mut response).await.unwrap();
1963 zx::Status::ok(response.status)
1964 }
1965
1966 let good_read_request = || BlockFifoRequest {
1969 command: BlockFifoCommand {
1970 opcode: BlockOpcode::Read.into_primitive(),
1971 ..Default::default()
1972 },
1973 length: 1,
1974 vmoid: vmo_id.id,
1975 ..Default::default()
1976 };
1977
1978 assert_eq!(
1979 test(
1980 &mut fifo,
1981 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1982 )
1983 .await,
1984 Err(zx::Status::IO)
1985 );
1986
1987 assert_eq!(
1988 test(
1989 &mut fifo,
1990 BlockFifoRequest {
1991 vmo_offset: 0xffff_ffff_ffff_ffff,
1992 ..good_read_request()
1993 }
1994 )
1995 .await,
1996 Err(zx::Status::OUT_OF_RANGE)
1997 );
1998
1999 assert_eq!(
2000 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2001 Err(zx::Status::INVALID_ARGS)
2002 );
2003
2004 let good_write_request = || BlockFifoRequest {
2007 command: BlockFifoCommand {
2008 opcode: BlockOpcode::Write.into_primitive(),
2009 ..Default::default()
2010 },
2011 length: 1,
2012 vmoid: vmo_id.id,
2013 ..Default::default()
2014 };
2015
2016 assert_eq!(
2017 test(
2018 &mut fifo,
2019 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2020 )
2021 .await,
2022 Err(zx::Status::IO)
2023 );
2024
2025 assert_eq!(
2026 test(
2027 &mut fifo,
2028 BlockFifoRequest {
2029 vmo_offset: 0xffff_ffff_ffff_ffff,
2030 ..good_write_request()
2031 }
2032 )
2033 .await,
2034 Err(zx::Status::OUT_OF_RANGE)
2035 );
2036
2037 assert_eq!(
2038 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2039 Err(zx::Status::INVALID_ARGS)
2040 );
2041
2042 assert_eq!(
2045 test(
2046 &mut fifo,
2047 BlockFifoRequest {
2048 command: BlockFifoCommand {
2049 opcode: BlockOpcode::CloseVmo.into_primitive(),
2050 ..Default::default()
2051 },
2052 vmoid: vmo_id.id + 1,
2053 ..Default::default()
2054 }
2055 )
2056 .await,
2057 Err(zx::Status::IO)
2058 );
2059
2060 std::mem::drop(proxy);
2061 }
2062 );
2063 }
2064
2065 #[fuchsia::test]
2066 async fn test_concurrent_requests() {
2067 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2068
2069 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2070 let waiting_readers_clone = waiting_readers.clone();
2071
2072 futures::join!(
2073 async move {
2074 let block_server = BlockServer::new(
2075 BLOCK_SIZE,
2076 Arc::new(MockInterface {
2077 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2078 let (tx, rx) = oneshot::channel();
2079 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2080 Box::pin(async move {
2081 let _ = rx.await;
2082 Ok(())
2083 })
2084 })),
2085 ..MockInterface::default()
2086 }),
2087 );
2088 block_server.handle_requests(stream).await.unwrap();
2089 },
2090 async move {
2091 let (session_proxy, server) = fidl::endpoints::create_proxy();
2092
2093 proxy.open_session(server).unwrap();
2094
2095 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2096 let vmo_id = session_proxy
2097 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2098 .await
2099 .unwrap()
2100 .unwrap();
2101
2102 let mut fifo =
2103 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2104 let (mut reader, mut writer) = fifo.async_io();
2105
2106 writer
2107 .write_entries(&BlockFifoRequest {
2108 command: BlockFifoCommand {
2109 opcode: BlockOpcode::Read.into_primitive(),
2110 ..Default::default()
2111 },
2112 reqid: 1,
2113 dev_offset: 1, vmoid: vmo_id.id,
2115 length: 1,
2116 ..Default::default()
2117 })
2118 .await
2119 .unwrap();
2120
2121 writer
2122 .write_entries(&BlockFifoRequest {
2123 command: BlockFifoCommand {
2124 opcode: BlockOpcode::Read.into_primitive(),
2125 ..Default::default()
2126 },
2127 reqid: 2,
2128 dev_offset: 2,
2129 vmoid: vmo_id.id,
2130 length: 1,
2131 ..Default::default()
2132 })
2133 .await
2134 .unwrap();
2135
2136 poll_fn(|cx: &mut Context<'_>| {
2138 if waiting_readers.lock().len() == 2 {
2139 Poll::Ready(())
2140 } else {
2141 cx.waker().wake_by_ref();
2143 Poll::Pending
2144 }
2145 })
2146 .await;
2147
2148 let mut response = BlockFifoResponse::default();
2149 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2150
2151 let (id, tx) = waiting_readers.lock().pop().unwrap();
2152 tx.send(()).unwrap();
2153
2154 reader.read_entries(&mut response).await.unwrap();
2155 assert_eq!(response.status, zx::sys::ZX_OK);
2156 assert_eq!(response.reqid, id);
2157
2158 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2159
2160 let (id, tx) = waiting_readers.lock().pop().unwrap();
2161 tx.send(()).unwrap();
2162
2163 reader.read_entries(&mut response).await.unwrap();
2164 assert_eq!(response.status, zx::sys::ZX_OK);
2165 assert_eq!(response.reqid, id);
2166 }
2167 );
2168 }
2169
2170 #[fuchsia::test]
2171 async fn test_groups() {
2172 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2173
2174 futures::join!(
2175 async move {
2176 let block_server = BlockServer::new(
2177 BLOCK_SIZE,
2178 Arc::new(MockInterface {
2179 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2180 ..MockInterface::default()
2181 }),
2182 );
2183 block_server.handle_requests(stream).await.unwrap();
2184 },
2185 async move {
2186 let (session_proxy, server) = fidl::endpoints::create_proxy();
2187
2188 proxy.open_session(server).unwrap();
2189
2190 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2191 let vmo_id = session_proxy
2192 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2193 .await
2194 .unwrap()
2195 .unwrap();
2196
2197 let mut fifo =
2198 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2199 let (mut reader, mut writer) = fifo.async_io();
2200
2201 writer
2202 .write_entries(&BlockFifoRequest {
2203 command: BlockFifoCommand {
2204 opcode: BlockOpcode::Read.into_primitive(),
2205 flags: BlockIoFlag::GROUP_ITEM.bits(),
2206 ..Default::default()
2207 },
2208 group: 1,
2209 vmoid: vmo_id.id,
2210 length: 1,
2211 ..Default::default()
2212 })
2213 .await
2214 .unwrap();
2215
2216 writer
2217 .write_entries(&BlockFifoRequest {
2218 command: BlockFifoCommand {
2219 opcode: BlockOpcode::Read.into_primitive(),
2220 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2221 ..Default::default()
2222 },
2223 reqid: 2,
2224 group: 1,
2225 vmoid: vmo_id.id,
2226 length: 1,
2227 ..Default::default()
2228 })
2229 .await
2230 .unwrap();
2231
2232 let mut response = BlockFifoResponse::default();
2233 reader.read_entries(&mut response).await.unwrap();
2234 assert_eq!(response.status, zx::sys::ZX_OK);
2235 assert_eq!(response.reqid, 2);
2236 assert_eq!(response.group, 1);
2237 }
2238 );
2239 }
2240
2241 #[fuchsia::test]
2242 async fn test_group_error() {
2243 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2244
2245 let counter = Arc::new(AtomicU64::new(0));
2246 let counter_clone = counter.clone();
2247
2248 futures::join!(
2249 async move {
2250 let block_server = BlockServer::new(
2251 BLOCK_SIZE,
2252 Arc::new(MockInterface {
2253 read_hook: Some(Box::new(move |_, _, _, _| {
2254 counter_clone.fetch_add(1, Ordering::Relaxed);
2255 Box::pin(async { Err(zx::Status::BAD_STATE) })
2256 })),
2257 ..MockInterface::default()
2258 }),
2259 );
2260 block_server.handle_requests(stream).await.unwrap();
2261 },
2262 async move {
2263 let (session_proxy, server) = fidl::endpoints::create_proxy();
2264
2265 proxy.open_session(server).unwrap();
2266
2267 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2268 let vmo_id = session_proxy
2269 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2270 .await
2271 .unwrap()
2272 .unwrap();
2273
2274 let mut fifo =
2275 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2276 let (mut reader, mut writer) = fifo.async_io();
2277
2278 writer
2279 .write_entries(&BlockFifoRequest {
2280 command: BlockFifoCommand {
2281 opcode: BlockOpcode::Read.into_primitive(),
2282 flags: BlockIoFlag::GROUP_ITEM.bits(),
2283 ..Default::default()
2284 },
2285 group: 1,
2286 vmoid: vmo_id.id,
2287 length: 1,
2288 ..Default::default()
2289 })
2290 .await
2291 .unwrap();
2292
2293 poll_fn(|cx: &mut Context<'_>| {
2295 if counter.load(Ordering::Relaxed) == 1 {
2296 Poll::Ready(())
2297 } else {
2298 cx.waker().wake_by_ref();
2300 Poll::Pending
2301 }
2302 })
2303 .await;
2304
2305 let mut response = BlockFifoResponse::default();
2306 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2307
2308 writer
2309 .write_entries(&BlockFifoRequest {
2310 command: BlockFifoCommand {
2311 opcode: BlockOpcode::Read.into_primitive(),
2312 flags: BlockIoFlag::GROUP_ITEM.bits(),
2313 ..Default::default()
2314 },
2315 group: 1,
2316 vmoid: vmo_id.id,
2317 length: 1,
2318 ..Default::default()
2319 })
2320 .await
2321 .unwrap();
2322
2323 writer
2324 .write_entries(&BlockFifoRequest {
2325 command: BlockFifoCommand {
2326 opcode: BlockOpcode::Read.into_primitive(),
2327 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2328 ..Default::default()
2329 },
2330 reqid: 2,
2331 group: 1,
2332 vmoid: vmo_id.id,
2333 length: 1,
2334 ..Default::default()
2335 })
2336 .await
2337 .unwrap();
2338
2339 reader.read_entries(&mut response).await.unwrap();
2340 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2341 assert_eq!(response.reqid, 2);
2342 assert_eq!(response.group, 1);
2343
2344 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2345
2346 assert_eq!(counter.load(Ordering::Relaxed), 1);
2348 }
2349 );
2350 }
2351
2352 #[fuchsia::test]
2353 async fn test_group_with_two_lasts() {
2354 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2355
2356 let (tx, rx) = oneshot::channel();
2357
2358 futures::join!(
2359 async move {
2360 let rx = Mutex::new(Some(rx));
2361 let block_server = BlockServer::new(
2362 BLOCK_SIZE,
2363 Arc::new(MockInterface {
2364 read_hook: Some(Box::new(move |_, _, _, _| {
2365 let rx = rx.lock().take().unwrap();
2366 Box::pin(async {
2367 let _ = rx.await;
2368 Ok(())
2369 })
2370 })),
2371 ..MockInterface::default()
2372 }),
2373 );
2374 block_server.handle_requests(stream).await.unwrap();
2375 },
2376 async move {
2377 let (session_proxy, server) = fidl::endpoints::create_proxy();
2378
2379 proxy.open_session(server).unwrap();
2380
2381 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2382 let vmo_id = session_proxy
2383 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2384 .await
2385 .unwrap()
2386 .unwrap();
2387
2388 let mut fifo =
2389 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2390 let (mut reader, mut writer) = fifo.async_io();
2391
2392 writer
2393 .write_entries(&BlockFifoRequest {
2394 command: BlockFifoCommand {
2395 opcode: BlockOpcode::Read.into_primitive(),
2396 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2397 ..Default::default()
2398 },
2399 reqid: 1,
2400 group: 1,
2401 vmoid: vmo_id.id,
2402 length: 1,
2403 ..Default::default()
2404 })
2405 .await
2406 .unwrap();
2407
2408 writer
2409 .write_entries(&BlockFifoRequest {
2410 command: BlockFifoCommand {
2411 opcode: BlockOpcode::Read.into_primitive(),
2412 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2413 ..Default::default()
2414 },
2415 reqid: 2,
2416 group: 1,
2417 vmoid: vmo_id.id,
2418 length: 1,
2419 ..Default::default()
2420 })
2421 .await
2422 .unwrap();
2423
2424 writer
2426 .write_entries(&BlockFifoRequest {
2427 command: BlockFifoCommand {
2428 opcode: BlockOpcode::CloseVmo.into_primitive(),
2429 ..Default::default()
2430 },
2431 reqid: 3,
2432 vmoid: vmo_id.id,
2433 ..Default::default()
2434 })
2435 .await
2436 .unwrap();
2437
2438 let mut response = BlockFifoResponse::default();
2440 reader.read_entries(&mut response).await.unwrap();
2441 assert_eq!(response.status, zx::sys::ZX_OK);
2442 assert_eq!(response.reqid, 3);
2443
2444 tx.send(()).unwrap();
2446
2447 let mut response = BlockFifoResponse::default();
2450 reader.read_entries(&mut response).await.unwrap();
2451 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2452 assert_eq!(response.reqid, 1);
2453 assert_eq!(response.group, 1);
2454 }
2455 );
2456 }
2457
2458 #[fuchsia::test(allow_stalls = false)]
2459 async fn test_requests_dont_block_sessions() {
2460 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2461
2462 let (tx, rx) = oneshot::channel();
2463
2464 fasync::Task::local(async move {
2465 let rx = Mutex::new(Some(rx));
2466 let block_server = BlockServer::new(
2467 BLOCK_SIZE,
2468 Arc::new(MockInterface {
2469 read_hook: Some(Box::new(move |_, _, _, _| {
2470 let rx = rx.lock().take().unwrap();
2471 Box::pin(async {
2472 let _ = rx.await;
2473 Ok(())
2474 })
2475 })),
2476 ..MockInterface::default()
2477 }),
2478 );
2479 block_server.handle_requests(stream).await.unwrap();
2480 })
2481 .detach();
2482
2483 let mut fut = pin!(async {
2484 let (session_proxy, server) = fidl::endpoints::create_proxy();
2485
2486 proxy.open_session(server).unwrap();
2487
2488 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2489 let vmo_id = session_proxy
2490 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2491 .await
2492 .unwrap()
2493 .unwrap();
2494
2495 let mut fifo =
2496 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2497 let (mut reader, mut writer) = fifo.async_io();
2498
2499 writer
2500 .write_entries(&BlockFifoRequest {
2501 command: BlockFifoCommand {
2502 opcode: BlockOpcode::Read.into_primitive(),
2503 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2504 ..Default::default()
2505 },
2506 reqid: 1,
2507 group: 1,
2508 vmoid: vmo_id.id,
2509 length: 1,
2510 ..Default::default()
2511 })
2512 .await
2513 .unwrap();
2514
2515 let mut response = BlockFifoResponse::default();
2516 reader.read_entries(&mut response).await.unwrap();
2517 assert_eq!(response.status, zx::sys::ZX_OK);
2518 });
2519
2520 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2522
2523 let mut fut2 = pin!(proxy.get_volume_info());
2524
2525 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2527
2528 let _ = tx.send(());
2531
2532 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2533 }
2534
2535 #[fuchsia::test]
2536 async fn test_request_flow_control() {
2537 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2538
2539 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2542 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2543 let event_clone = event.clone();
2544 futures::join!(
2545 async move {
2546 let block_server = BlockServer::new(
2547 BLOCK_SIZE,
2548 Arc::new(MockInterface {
2549 read_hook: Some(Box::new(move |_, _, _, _| {
2550 let event_clone = event_clone.clone();
2551 Box::pin(async move {
2552 if !event_clone.1.load(Ordering::SeqCst) {
2553 event_clone.0.listen().await;
2554 }
2555 Ok(())
2556 })
2557 })),
2558 ..MockInterface::default()
2559 }),
2560 );
2561 block_server.handle_requests(stream).await.unwrap();
2562 },
2563 async move {
2564 let (session_proxy, server) = fidl::endpoints::create_proxy();
2565
2566 proxy.open_session(server).unwrap();
2567
2568 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2569 let vmo_id = session_proxy
2570 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2571 .await
2572 .unwrap()
2573 .unwrap();
2574
2575 let mut fifo =
2576 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2577 let (mut reader, mut writer) = fifo.async_io();
2578
2579 for i in 0..MAX_REQUESTS {
2580 writer
2581 .write_entries(&BlockFifoRequest {
2582 command: BlockFifoCommand {
2583 opcode: BlockOpcode::Read.into_primitive(),
2584 ..Default::default()
2585 },
2586 reqid: (i + 1) as u32,
2587 dev_offset: i,
2588 vmoid: vmo_id.id,
2589 length: 1,
2590 ..Default::default()
2591 })
2592 .await
2593 .unwrap();
2594 }
2595 assert!(
2596 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2597 command: BlockFifoCommand {
2598 opcode: BlockOpcode::Read.into_primitive(),
2599 ..Default::default()
2600 },
2601 reqid: u32::MAX,
2602 dev_offset: MAX_REQUESTS,
2603 vmoid: vmo_id.id,
2604 length: 1,
2605 ..Default::default()
2606 })))
2607 .is_pending()
2608 );
2609 event.1.store(true, Ordering::SeqCst);
2611 event.0.notify(usize::MAX);
2612 let mut finished_reqids = vec![];
2614 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2615 let mut response = BlockFifoResponse::default();
2616 reader.read_entries(&mut response).await.unwrap();
2617 assert_eq!(response.status, zx::sys::ZX_OK);
2618 finished_reqids.push(response.reqid);
2619 writer
2620 .write_entries(&BlockFifoRequest {
2621 command: BlockFifoCommand {
2622 opcode: BlockOpcode::Read.into_primitive(),
2623 ..Default::default()
2624 },
2625 reqid: (i + 1) as u32,
2626 dev_offset: i,
2627 vmoid: vmo_id.id,
2628 length: 1,
2629 ..Default::default()
2630 })
2631 .await
2632 .unwrap();
2633 }
2634 let mut response = BlockFifoResponse::default();
2635 for _ in 0..MAX_REQUESTS {
2636 reader.read_entries(&mut response).await.unwrap();
2637 assert_eq!(response.status, zx::sys::ZX_OK);
2638 finished_reqids.push(response.reqid);
2639 }
2640 finished_reqids.sort();
2643 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2644 let mut i = 1;
2645 for reqid in finished_reqids {
2646 assert_eq!(reqid, i);
2647 i += 1;
2648 }
2649 }
2650 );
2651 }
2652
2653 #[fuchsia::test]
2654 async fn test_passthrough_io_with_fixed_map() {
2655 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2656
2657 let expected_op = Arc::new(Mutex::new(None));
2658 let expected_op_clone = expected_op.clone();
2659 futures::join!(
2660 async {
2661 let block_server = BlockServer::new(
2662 BLOCK_SIZE,
2663 Arc::new(IoMockInterface {
2664 return_errors: false,
2665 do_checks: true,
2666 expected_op: expected_op_clone,
2667 }),
2668 );
2669 block_server.handle_requests(stream).await.unwrap();
2670 },
2671 async move {
2672 let (session_proxy, server) = fidl::endpoints::create_proxy();
2673
2674 let mapping = fblock::BlockOffsetMapping {
2675 source_block_offset: 0,
2676 target_block_offset: 10,
2677 length: 20,
2678 };
2679 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2680
2681 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2682 let vmo_id = session_proxy
2683 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2684 .await
2685 .unwrap()
2686 .unwrap();
2687
2688 let mut fifo =
2689 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2690 let (mut reader, mut writer) = fifo.async_io();
2691
2692 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2694 writer
2695 .write_entries(&BlockFifoRequest {
2696 command: BlockFifoCommand {
2697 opcode: BlockOpcode::Read.into_primitive(),
2698 ..Default::default()
2699 },
2700 vmoid: vmo_id.id,
2701 dev_offset: 1,
2702 length: 2,
2703 vmo_offset: 3,
2704 ..Default::default()
2705 })
2706 .await
2707 .unwrap();
2708
2709 let mut response = BlockFifoResponse::default();
2710 reader.read_entries(&mut response).await.unwrap();
2711 assert_eq!(response.status, zx::sys::ZX_OK);
2712
2713 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2715 writer
2716 .write_entries(&BlockFifoRequest {
2717 command: BlockFifoCommand {
2718 opcode: BlockOpcode::Write.into_primitive(),
2719 ..Default::default()
2720 },
2721 vmoid: vmo_id.id,
2722 dev_offset: 4,
2723 length: 5,
2724 vmo_offset: 6,
2725 ..Default::default()
2726 })
2727 .await
2728 .unwrap();
2729
2730 reader.read_entries(&mut response).await.unwrap();
2731 assert_eq!(response.status, zx::sys::ZX_OK);
2732
2733 *expected_op.lock() = Some(ExpectedOp::Flush);
2735 writer
2736 .write_entries(&BlockFifoRequest {
2737 command: BlockFifoCommand {
2738 opcode: BlockOpcode::Flush.into_primitive(),
2739 ..Default::default()
2740 },
2741 ..Default::default()
2742 })
2743 .await
2744 .unwrap();
2745
2746 reader.read_entries(&mut response).await.unwrap();
2747 assert_eq!(response.status, zx::sys::ZX_OK);
2748
2749 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2751 writer
2752 .write_entries(&BlockFifoRequest {
2753 command: BlockFifoCommand {
2754 opcode: BlockOpcode::Trim.into_primitive(),
2755 ..Default::default()
2756 },
2757 dev_offset: 7,
2758 length: 3,
2759 ..Default::default()
2760 })
2761 .await
2762 .unwrap();
2763
2764 reader.read_entries(&mut response).await.unwrap();
2765 assert_eq!(response.status, zx::sys::ZX_OK);
2766
2767 *expected_op.lock() = None;
2769 writer
2770 .write_entries(&BlockFifoRequest {
2771 command: BlockFifoCommand {
2772 opcode: BlockOpcode::Read.into_primitive(),
2773 ..Default::default()
2774 },
2775 vmoid: vmo_id.id,
2776 dev_offset: 19,
2777 length: 2,
2778 vmo_offset: 3,
2779 ..Default::default()
2780 })
2781 .await
2782 .unwrap();
2783
2784 reader.read_entries(&mut response).await.unwrap();
2785 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2786
2787 std::mem::drop(proxy);
2788 }
2789 );
2790 }
2791
2792 #[fuchsia::test]
2793 fn operation_map() {
2794 fn expect_map_result(
2795 mut operation: Operation,
2796 mapping: Option<BlockOffsetMapping>,
2797 max_blocks: Option<NonZero<u32>>,
2798 expected_operations: Vec<Operation>,
2799 ) {
2800 let mut ops = vec![];
2801 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2802 ops.push(operation);
2803 operation = remainder;
2804 }
2805 ops.push(operation);
2806 assert_eq!(ops, expected_operations);
2807 }
2808
2809 expect_map_result(
2811 Operation::Read {
2812 device_block_offset: 10,
2813 block_count: 200,
2814 _unused: 0,
2815 vmo_offset: 0,
2816 options: ReadOptions::default(),
2817 },
2818 None,
2819 None,
2820 vec![Operation::Read {
2821 device_block_offset: 10,
2822 block_count: 200,
2823 _unused: 0,
2824 vmo_offset: 0,
2825 options: ReadOptions::default(),
2826 }],
2827 );
2828
2829 expect_map_result(
2831 Operation::Read {
2832 device_block_offset: 10,
2833 block_count: 200,
2834 _unused: 0,
2835 vmo_offset: 0,
2836 options: ReadOptions::default(),
2837 },
2838 None,
2839 NonZero::new(120),
2840 vec![
2841 Operation::Read {
2842 device_block_offset: 10,
2843 block_count: 120,
2844 _unused: 0,
2845 vmo_offset: 0,
2846 options: ReadOptions::default(),
2847 },
2848 Operation::Read {
2849 device_block_offset: 130,
2850 block_count: 80,
2851 _unused: 0,
2852 vmo_offset: 120 * 512,
2853 options: ReadOptions::default(),
2854 },
2855 ],
2856 );
2857 expect_map_result(
2858 Operation::Write {
2859 device_block_offset: 10,
2860 block_count: 200,
2861 _unused: 0,
2862 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2863 vmo_offset: 0,
2864 },
2865 None,
2866 NonZero::new(120),
2867 vec![
2868 Operation::Write {
2869 device_block_offset: 10,
2870 block_count: 120,
2871 _unused: 0,
2872 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2873 vmo_offset: 0,
2874 },
2875 Operation::Write {
2876 device_block_offset: 130,
2877 block_count: 80,
2878 _unused: 0,
2879 options: WriteOptions::default(),
2880 vmo_offset: 120 * 512,
2881 },
2882 ],
2883 );
2884 expect_map_result(
2885 Operation::Trim { device_block_offset: 10, block_count: 200 },
2886 None,
2887 NonZero::new(120),
2888 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2889 );
2890
2891 expect_map_result(
2893 Operation::Read {
2894 device_block_offset: 10,
2895 block_count: 200,
2896 _unused: 0,
2897 vmo_offset: 0,
2898 options: ReadOptions::default(),
2899 },
2900 Some(BlockOffsetMapping {
2901 source_block_offset: 10,
2902 target_block_offset: 100,
2903 length: 200,
2904 }),
2905 NonZero::new(120),
2906 vec![
2907 Operation::Read {
2908 device_block_offset: 100,
2909 block_count: 120,
2910 _unused: 0,
2911 vmo_offset: 0,
2912 options: ReadOptions::default(),
2913 },
2914 Operation::Read {
2915 device_block_offset: 220,
2916 block_count: 80,
2917 _unused: 0,
2918 vmo_offset: 120 * 512,
2919 options: ReadOptions::default(),
2920 },
2921 ],
2922 );
2923 expect_map_result(
2924 Operation::Write {
2925 device_block_offset: 10,
2926 block_count: 200,
2927 _unused: 0,
2928 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2929 vmo_offset: 0,
2930 },
2931 Some(BlockOffsetMapping {
2932 source_block_offset: 10,
2933 target_block_offset: 100,
2934 length: 200,
2935 }),
2936 NonZero::new(120),
2937 vec![
2938 Operation::Write {
2939 device_block_offset: 100,
2940 block_count: 120,
2941 _unused: 0,
2942 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2943 vmo_offset: 0,
2944 },
2945 Operation::Write {
2946 device_block_offset: 220,
2947 block_count: 80,
2948 _unused: 0,
2949 options: WriteOptions::default(),
2950 vmo_offset: 120 * 512,
2951 },
2952 ],
2953 );
2954 expect_map_result(
2955 Operation::Trim { device_block_offset: 10, block_count: 200 },
2956 Some(BlockOffsetMapping {
2957 source_block_offset: 10,
2958 target_block_offset: 100,
2959 length: 200,
2960 }),
2961 NonZero::new(120),
2962 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2963 );
2964 }
2965}