1use crate::bin::Bin;
5use anyhow::{anyhow, Error};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use zx::HandleBased;
19use {
20 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 fn block_count(&self) -> Option<u64> {
40 match self {
41 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42 Self::Partition(PartitionInfo { block_range, .. }) => {
43 block_range.as_ref().map(|range| range.end - range.start)
44 }
45 }
46 }
47
48 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49 match self {
50 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52 max_transfer_blocks.clone()
53 }
54 }
55 }
56
57 fn max_transfer_size(&self, block_size: u32) -> u32 {
58 if let Some(max_blocks) = self.max_transfer_blocks() {
59 max_blocks.get() * block_size
60 } else {
61 MAX_TRANSFER_UNBOUNDED
62 }
63 }
64}
65
66#[derive(Clone)]
68pub struct BlockInfo {
69 pub device_flags: fblock::Flag,
70 pub block_count: u64,
71 pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74#[derive(Clone)]
76pub struct PartitionInfo {
77 pub device_flags: fblock::Flag,
79 pub max_transfer_blocks: Option<NonZero<u32>>,
80 pub block_range: Option<Range<u64>>,
84 pub type_guid: [u8; 16],
85 pub instance_guid: [u8; 16],
86 pub name: String,
87 pub flags: u64,
88}
89
90struct ActiveRequest<S> {
93 session: S,
94 group_or_request: GroupOrRequest,
95 trace_flow_id: TraceFlowId,
96 vmo_bin_key: Option<usize>,
97 status: zx::Status,
98 count: u32,
99 req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105 fn default() -> Self {
106 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107 }
108}
109
110impl<S> ActiveRequests<S> {
111 fn complete_and_take_response(
112 &self,
113 request_id: RequestId,
114 status: zx::Status,
115 ) -> Option<(S, BlockFifoResponse)> {
116 self.0.lock().complete_and_take_response(request_id, status)
117 }
118}
119
120struct ActiveRequestsInner<S> {
121 requests: Slab<ActiveRequest<S>>,
122 vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125impl<S> ActiveRequestsInner<S> {
127 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129 let group = &mut self.requests[request_id.0];
130
131 group.count = group.count.checked_sub(1).unwrap();
132 if status != zx::Status::OK && group.status == zx::Status::OK {
133 group.status = status
134 }
135
136 fuchsia_trace::duration!(
137 c"storage",
138 c"block_server::finish_transaction",
139 "request_id" => request_id.0,
140 "group_completed" => group.count == 0,
141 "status" => status.into_raw());
142 if let Some(trace_flow_id) = group.trace_flow_id {
143 fuchsia_trace::flow_step!(
144 c"storage",
145 c"block_server::finish_request",
146 trace_flow_id.get().into()
147 );
148 }
149 }
150
151 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153 let group = &self.requests[request_id.0];
154 match group.req_id {
155 Some(reqid) if group.count == 0 => {
156 let group = self.requests.remove(request_id.0);
157 if let Some(vmo_bin_key) = group.vmo_bin_key {
158 self.vmo_bin.release(vmo_bin_key);
159 }
160 Some((
161 group.session,
162 BlockFifoResponse {
163 status: group.status.into_raw(),
164 reqid,
165 group: group.group_or_request.group_id().unwrap_or(0),
166 ..Default::default()
167 },
168 ))
169 }
170 _ => None,
171 }
172 }
173
174 fn complete_and_take_response(
176 &mut self,
177 request_id: RequestId,
178 status: zx::Status,
179 ) -> Option<(S, BlockFifoResponse)> {
180 self.complete(request_id, status);
181 self.take_response(request_id)
182 }
183}
184
185pub struct BlockServer<SM> {
188 block_size: u32,
189 session_manager: Arc<SM>,
190}
191
192#[derive(Debug)]
194pub struct BlockOffsetMapping {
195 source_block_offset: u64,
196 target_block_offset: u64,
197 length: u64,
198}
199
200impl BlockOffsetMapping {
201 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202 blocks.0 >= self.source_block_offset
203 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204 }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208 type Error = zx::Status;
209
210 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213 Ok(Self {
214 source_block_offset: wire.source_block_offset,
215 target_block_offset: wire.target_block_offset,
216 length: wire.length,
217 })
218 }
219}
220
221pub struct OffsetMap {
227 mapping: Option<BlockOffsetMapping>,
228 max_transfer_blocks: Option<NonZero<u32>>,
229}
230
231impl OffsetMap {
232 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
234 Self { mapping: Some(mapping), max_transfer_blocks }
235 }
236
237 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
239 Self { mapping: None, max_transfer_blocks }
240 }
241
242 pub fn is_empty(&self) -> bool {
243 self.mapping.is_none()
244 }
245
246 fn mapping(&self) -> Option<&BlockOffsetMapping> {
247 self.mapping.as_ref()
248 }
249
250 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
251 self.max_transfer_blocks
252 }
253}
254
255pub trait SessionManager: 'static {
258 type Session;
259
260 fn on_attach_vmo(
261 self: Arc<Self>,
262 vmo: &Arc<zx::Vmo>,
263 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
264
265 fn open_session(
270 self: Arc<Self>,
271 stream: fblock::SessionRequestStream,
272 offset_map: OffsetMap,
273 block_size: u32,
274 ) -> impl Future<Output = Result<(), Error>> + Send;
275
276 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
278
279 fn get_volume_info(
281 &self,
282 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
283 {
284 async { Err(zx::Status::NOT_SUPPORTED) }
285 }
286
287 fn query_slices(
289 &self,
290 _start_slices: &[u64],
291 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
292 async { Err(zx::Status::NOT_SUPPORTED) }
293 }
294
295 fn extend(
297 &self,
298 _start_slice: u64,
299 _slice_count: u64,
300 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
301 async { Err(zx::Status::NOT_SUPPORTED) }
302 }
303
304 fn shrink(
306 &self,
307 _start_slice: u64,
308 _slice_count: u64,
309 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
310 async { Err(zx::Status::NOT_SUPPORTED) }
311 }
312
313 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
315}
316
317pub trait IntoSessionManager {
318 type SM: SessionManager;
319
320 fn into_session_manager(self) -> Arc<Self::SM>;
321}
322
323impl<SM: SessionManager> BlockServer<SM> {
324 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
325 Self { block_size, session_manager: session_manager.into_session_manager() }
326 }
327
328 pub async fn handle_requests(
330 &self,
331 mut requests: fvolume::VolumeRequestStream,
332 ) -> Result<(), Error> {
333 let scope = fasync::Scope::new();
334 while let Some(request) = requests.try_next().await.unwrap() {
335 if let Some(session) = self.handle_request(request).await? {
336 scope.spawn(session.map(|_| ()));
337 }
338 }
339 scope.await;
340 Ok(())
341 }
342
343 async fn handle_request(
346 &self,
347 request: fvolume::VolumeRequest,
348 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
349 match request {
350 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
351 Ok(info) => {
352 let max_transfer_size = info.max_transfer_size(self.block_size);
353 let (block_count, flags) = match info.as_ref() {
354 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
355 (*block_count, *device_flags)
356 }
357 DeviceInfo::Partition(partition_info) => {
358 let block_count = if let Some(range) =
359 partition_info.block_range.as_ref()
360 {
361 range.end - range.start
362 } else {
363 let volume_info = self.session_manager.get_volume_info().await?;
364 volume_info.0.slice_size * volume_info.1.partition_slice_count
365 / self.block_size as u64
366 };
367 (block_count, partition_info.device_flags)
368 }
369 };
370 responder.send(Ok(&fblock::BlockInfo {
371 block_count,
372 block_size: self.block_size,
373 max_transfer_size,
374 flags,
375 }))?;
376 }
377 Err(status) => responder.send(Err(status.into_raw()))?,
378 },
379 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
380 match self.device_info().await {
381 Ok(info) => {
382 return Ok(Some(self.session_manager.clone().open_session(
383 session.into_stream(),
384 OffsetMap::empty(info.max_transfer_blocks()),
385 self.block_size,
386 )));
387 }
388 Err(status) => session.close_with_epitaph(status)?,
389 }
390 }
391 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
392 session,
393 offset_map,
394 initial_mappings,
395 control_handle: _,
396 } => match self.device_info().await {
397 Ok(info) => {
398 if offset_map.is_some()
399 || initial_mappings.as_ref().is_none_or(|m| m.len() != 1)
400 {
401 session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
405 return Ok(None);
406 }
407 let initial_mapping: BlockOffsetMapping =
408 match initial_mappings.unwrap().pop().unwrap().try_into() {
409 Ok(m) => m,
410 Err(status) => {
411 session.close_with_epitaph(status)?;
412 return Ok(None);
413 }
414 };
415 if let Some(max) = info.block_count() {
416 if initial_mapping.target_block_offset + initial_mapping.length > max {
417 log::warn!(
418 "Invalid mapping for session: {initial_mapping:?} (max {max})"
419 );
420 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
421 return Ok(None);
422 }
423 }
424 return Ok(Some(self.session_manager.clone().open_session(
425 session.into_stream(),
426 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
427 self.block_size,
428 )));
429 }
430 Err(status) => session.close_with_epitaph(status)?,
431 },
432 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
433 Ok(info) => {
434 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
435 let mut guid =
436 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
437 guid.value.copy_from_slice(&partition_info.type_guid);
438 responder.send(zx::sys::ZX_OK, Some(&guid))?;
439 } else {
440 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
441 }
442 }
443 Err(status) => {
444 responder.send(status.into_raw(), None)?;
445 }
446 },
447 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
448 match self.device_info().await {
449 Ok(info) => {
450 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
451 let mut guid =
452 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
453 guid.value.copy_from_slice(&partition_info.instance_guid);
454 responder.send(zx::sys::ZX_OK, Some(&guid))?;
455 } else {
456 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
457 }
458 }
459 Err(status) => {
460 responder.send(status.into_raw(), None)?;
461 }
462 }
463 }
464 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
465 Ok(info) => {
466 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
467 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
468 } else {
469 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
470 }
471 }
472 Err(status) => {
473 responder.send(status.into_raw(), None)?;
474 }
475 },
476 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
477 Ok(info) => {
478 if let DeviceInfo::Partition(info) = info.as_ref() {
479 let mut type_guid =
480 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
481 type_guid.value.copy_from_slice(&info.type_guid);
482 let mut instance_guid =
483 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
484 instance_guid.value.copy_from_slice(&info.instance_guid);
485 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
486 name: Some(info.name.clone()),
487 type_guid: Some(type_guid),
488 instance_guid: Some(instance_guid),
489 start_block_offset: info.block_range.as_ref().map(|range| range.start),
490 num_blocks: info
491 .block_range
492 .as_ref()
493 .map(|range| range.end - range.start),
494 flags: Some(info.flags),
495 ..Default::default()
496 }))?;
497 }
498 }
499 Err(status) => responder.send(Err(status.into_raw()))?,
500 },
501 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
502 match self.session_manager.query_slices(&start_slices).await {
503 Ok(mut results) => {
504 let results_len = results.len();
505 assert!(results_len <= 16);
506 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
507 responder.send(
508 zx::sys::ZX_OK,
509 &results.try_into().unwrap(),
510 results_len as u64,
511 )?;
512 }
513 Err(s) => {
514 responder.send(
515 s.into_raw(),
516 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
517 0,
518 )?;
519 }
520 }
521 }
522 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
523 match self.session_manager.get_volume_info().await {
524 Ok((manager_info, volume_info)) => {
525 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
526 }
527 Err(s) => responder.send(s.into_raw(), None, None)?,
528 }
529 }
530 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
531 responder.send(
532 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
533 .into_raw(),
534 )?;
535 }
536 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
537 responder.send(
538 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
539 .into_raw(),
540 )?;
541 }
542 fvolume::VolumeRequest::Destroy { responder, .. } => {
543 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
544 }
545 }
546 Ok(None)
547 }
548
549 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
550 self.session_manager.get_info().await
551 }
552}
553
554struct SessionHelper<SM: SessionManager> {
555 session_manager: Arc<SM>,
556 offset_map: OffsetMap,
557 block_size: u32,
558 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
559 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
560}
561
562impl<SM: SessionManager> SessionHelper<SM> {
563 fn new(
564 session_manager: Arc<SM>,
565 offset_map: OffsetMap,
566 block_size: u32,
567 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
568 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
569 Ok((
570 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
571 fifo,
572 ))
573 }
574
575 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
576 match request {
577 fblock::SessionRequest::GetFifo { responder } => {
578 let rights = zx::Rights::TRANSFER
579 | zx::Rights::READ
580 | zx::Rights::WRITE
581 | zx::Rights::SIGNAL
582 | zx::Rights::WAIT;
583 match self.peer_fifo.duplicate_handle(rights) {
584 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
585 Err(s) => responder.send(Err(s.into_raw()))?,
586 }
587 Ok(())
588 }
589 fblock::SessionRequest::AttachVmo { vmo, responder } => {
590 let vmo = Arc::new(vmo);
591 let vmo_id = {
592 let mut vmos = self.vmos.lock();
593 if vmos.len() == u16::MAX as usize {
594 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
595 return Ok(());
596 } else {
597 let vmo_id = match vmos.last_entry() {
598 None => 1,
599 Some(o) => {
600 o.key().checked_add(1).unwrap_or_else(|| {
601 let mut vmo_id = 1;
602 for (&id, _) in &*vmos {
604 if id > vmo_id {
605 break;
606 }
607 vmo_id = id + 1;
608 }
609 vmo_id
610 })
611 }
612 };
613 vmos.insert(vmo_id, vmo.clone());
614 vmo_id
615 }
616 };
617 self.session_manager.clone().on_attach_vmo(&vmo).await?;
618 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
619 Ok(())
620 }
621 fblock::SessionRequest::Close { responder } => {
622 responder.send(Ok(()))?;
623 Err(anyhow!("Closed"))
624 }
625 }
626 }
627
628 fn decode_fifo_request(
630 &self,
631 session: SM::Session,
632 request: &BlockFifoRequest,
633 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
634 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
635 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
636 .ok_or(zx::Status::INVALID_ARGS)
637 .and_then(|code| {
638 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
639 if request.length == 0 {
640 return Err(zx::Status::INVALID_ARGS);
641 }
642 if request.dev_offset.checked_add(request.length as u64).is_none()
644 || (code != BlockOpcode::Trim
645 && (request.length as u64 * self.block_size as u64)
646 .checked_add(request.vmo_offset)
647 .is_none())
648 {
649 return Err(zx::Status::OUT_OF_RANGE);
650 }
651 }
652 Ok(match code {
653 BlockOpcode::Read => Operation::Read {
654 device_block_offset: request.dev_offset,
655 block_count: request.length,
656 _unused: 0,
657 vmo_offset: request
658 .vmo_offset
659 .checked_mul(self.block_size as u64)
660 .ok_or(zx::Status::OUT_OF_RANGE)?,
661 },
662 BlockOpcode::Write => {
663 let mut options = WriteOptions::empty();
664 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
665 options |= WriteOptions::FORCE_ACCESS;
666 }
667 if flags.contains(BlockIoFlag::PRE_BARRIER) {
668 options |= WriteOptions::PRE_BARRIER;
669 }
670 Operation::Write {
671 device_block_offset: request.dev_offset,
672 block_count: request.length,
673 options: options,
674 vmo_offset: request
675 .vmo_offset
676 .checked_mul(self.block_size as u64)
677 .ok_or(zx::Status::OUT_OF_RANGE)?,
678 }
679 }
680 BlockOpcode::Flush => Operation::Flush,
681 BlockOpcode::Trim => Operation::Trim {
682 device_block_offset: request.dev_offset,
683 block_count: request.length,
684 },
685 BlockOpcode::CloseVmo => Operation::CloseVmo,
686 })
687 });
688
689 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
690 GroupOrRequest::Group(request.group)
691 } else {
692 GroupOrRequest::Request(request.reqid)
693 };
694
695 let mut active_requests = self.session_manager.active_requests().0.lock();
696 let mut request_id = None;
697
698 if group_or_request.is_group() {
708 for (key, group) in &mut active_requests.requests {
712 if group.group_or_request == group_or_request {
713 if group.req_id.is_some() {
714 if group.status == zx::Status::OK {
716 group.status = zx::Status::INVALID_ARGS;
717 }
718 return Err(None);
720 }
721 if flags.contains(BlockIoFlag::GROUP_LAST) {
722 group.req_id = Some(request.reqid);
723 if group.status != zx::Status::OK {
726 operation = Err(group.status);
727 }
728 } else if group.status != zx::Status::OK {
729 return Err(None);
732 }
733 request_id = Some(RequestId(key));
734 group.count += 1;
735 break;
736 }
737 }
738 }
739
740 let mut retain_vmo = false;
741 let vmo = match &operation {
742 Ok(Operation::Read { .. } | Operation::Write { .. }) => {
743 self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
744 retain_vmo = true;
745 Ok(Some(vmo))
746 })
747 }
748 Ok(Operation::CloseVmo) => {
749 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
750 active_requests.vmo_bin.add(vmo.clone());
751 Ok(Some(vmo))
752 })
753 }
754 _ => Ok(None),
755 }
756 .unwrap_or_else(|e| {
757 operation = Err(e);
758 None
759 });
760
761 let trace_flow_id = NonZero::new(request.trace_flow_id);
762 let request_id = request_id.unwrap_or_else(|| {
763 let vmo_bin_key =
764 if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
765 RequestId(active_requests.requests.insert(ActiveRequest {
766 session,
767 group_or_request,
768 trace_flow_id,
769 vmo_bin_key,
770 status: zx::Status::OK,
771 count: 1,
772 req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
773 || flags.contains(BlockIoFlag::GROUP_LAST)
774 {
775 Some(request.reqid)
776 } else {
777 None
778 },
779 }))
780 });
781
782 Ok(DecodedRequest {
783 request_id,
784 trace_flow_id,
785 operation: operation.map_err(|status| {
786 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
787 })?,
788 vmo,
789 })
790 }
791
792 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
793 std::mem::take(&mut *self.vmos.lock())
794 }
795
796 fn map_request(
798 &self,
799 mut request: DecodedRequest,
800 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
801 let mut active_requests = self.session_manager.active_requests().0.lock();
802 let active_request = &mut active_requests.requests[request.request_id.0];
803 if active_request.status != zx::Status::OK {
804 return Err(active_requests
805 .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
806 .map(|(_, r)| r));
807 }
808 let mapping = self.offset_map.mapping();
809 match (mapping, request.operation.blocks()) {
810 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
811 return Err(active_requests
812 .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
813 .map(|(_, r)| r));
814 }
815 _ => {}
816 }
817 let remainder = request.operation.map(
818 self.offset_map.mapping(),
819 self.offset_map.max_transfer_blocks(),
820 self.block_size,
821 );
822 if remainder.is_some() {
823 active_request.count += 1;
824 }
825 static CACHE: AtomicU64 = AtomicU64::new(0);
826 if let Some(context) =
827 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
828 {
829 use fuchsia_trace::ArgValue;
830 let trace_args = [
831 ArgValue::of("request_id", request.request_id.0),
832 ArgValue::of("opcode", request.operation.trace_label()),
833 ];
834 let _scope = fuchsia_trace::duration(
835 c"storage",
836 c"block_server::start_transaction",
837 &trace_args,
838 );
839 if let Some(trace_flow_id) = active_request.trace_flow_id {
840 fuchsia_trace::flow_step(
841 &context,
842 c"block_server::start_transaction",
843 trace_flow_id.get().into(),
844 &[],
845 );
846 }
847 }
848 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
849 Ok((request, remainder))
850 }
851
852 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
853 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
854 }
855}
856
857#[repr(transparent)]
858#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
859pub struct RequestId(usize);
860
861#[derive(Clone, Debug)]
862struct DecodedRequest {
863 request_id: RequestId,
864 trace_flow_id: TraceFlowId,
865 operation: Operation,
866 vmo: Option<Arc<zx::Vmo>>,
867}
868
869pub type WriteOptions = block_protocol::WriteOptions;
871
872#[repr(C)]
873#[derive(Clone, Debug, PartialEq, Eq)]
874pub enum Operation {
875 Read {
880 device_block_offset: u64,
881 block_count: u32,
882 _unused: u32,
883 vmo_offset: u64,
884 },
885 Write {
886 device_block_offset: u64,
887 block_count: u32,
888 options: WriteOptions,
889 vmo_offset: u64,
890 },
891 Flush,
892 Trim {
893 device_block_offset: u64,
894 block_count: u32,
895 },
896 CloseVmo,
898}
899
900impl Operation {
901 fn trace_label(&self) -> &'static str {
902 match self {
903 Operation::Read { .. } => "read",
904 Operation::Write { .. } => "write",
905 Operation::Flush { .. } => "flush",
906 Operation::Trim { .. } => "trim",
907 Operation::CloseVmo { .. } => "close_vmo",
908 }
909 }
910
911 fn blocks(&self) -> Option<(u64, u32)> {
913 match self {
914 Operation::Read { device_block_offset, block_count, .. }
915 | Operation::Write { device_block_offset, block_count, .. }
916 | Operation::Trim { device_block_offset, block_count, .. } => {
917 Some((*device_block_offset, *block_count))
918 }
919 _ => None,
920 }
921 }
922
923 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
925 match self {
926 Operation::Read { device_block_offset, block_count, .. }
927 | Operation::Write { device_block_offset, block_count, .. }
928 | Operation::Trim { device_block_offset, block_count, .. } => {
929 Some((device_block_offset, block_count))
930 }
931 _ => None,
932 }
933 }
934
935 fn map(
938 &mut self,
939 mapping: Option<&BlockOffsetMapping>,
940 max_blocks: Option<NonZero<u32>>,
941 block_size: u32,
942 ) -> Option<Self> {
943 let mut max = match self {
944 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
945 _ => None,
946 };
947 let (offset, length) = self.blocks_mut()?;
948 let orig_offset = *offset;
949 if let Some(mapping) = mapping {
950 let delta = *offset - mapping.source_block_offset;
951 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
952 *offset = mapping.target_block_offset + delta;
953 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
954 max = match max {
955 None => Some(mapping_max),
956 Some(m) => Some(std::cmp::min(m, mapping_max)),
957 };
958 };
959 if let Some(max) = max {
960 if *length as u64 > max {
961 let rem = (*length as u64 - max) as u32;
962 *length = max as u32;
963 return Some(match self {
964 Operation::Read {
965 device_block_offset: _,
966 block_count: _,
967 vmo_offset,
968 _unused,
969 } => Operation::Read {
970 device_block_offset: orig_offset + max,
971 block_count: rem,
972 vmo_offset: *vmo_offset + max * block_size as u64,
973 _unused: *_unused,
974 },
975 Operation::Write {
976 device_block_offset: _,
977 block_count: _,
978 vmo_offset,
979 options,
980 } => {
981 let options = *options & !WriteOptions::PRE_BARRIER;
983 Operation::Write {
984 device_block_offset: orig_offset + max,
985 block_count: rem,
986 vmo_offset: *vmo_offset + max * block_size as u64,
987 options,
988 }
989 }
990 Operation::Trim { device_block_offset: _, block_count: _ } => {
991 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
992 }
993 _ => unreachable!(),
994 });
995 }
996 }
997 None
998 }
999}
1000
1001#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1002pub enum GroupOrRequest {
1003 Group(u16),
1004 Request(u32),
1005}
1006
1007impl GroupOrRequest {
1008 fn is_group(&self) -> bool {
1009 matches!(self, Self::Group(_))
1010 }
1011
1012 fn group_id(&self) -> Option<u16> {
1013 match self {
1014 Self::Group(id) => Some(*id),
1015 Self::Request(_) => None,
1016 }
1017 }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use super::{
1023 BlockOffsetMapping, BlockServer, DeviceInfo, Operation, PartitionInfo, TraceFlowId,
1024 FIFO_MAX_REQUESTS,
1025 };
1026 use assert_matches::assert_matches;
1027 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
1028 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1029 use fuchsia_sync::Mutex;
1030 use futures::channel::oneshot;
1031 use futures::future::BoxFuture;
1032 use futures::FutureExt as _;
1033 use std::borrow::Cow;
1034 use std::future::poll_fn;
1035 use std::num::NonZero;
1036 use std::pin::pin;
1037 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1038 use std::sync::Arc;
1039 use std::task::{Context, Poll};
1040 use zx::{AsHandleRef as _, HandleBased as _};
1041 use {
1042 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1043 fuchsia_async as fasync,
1044 };
1045
1046 #[derive(Default)]
1047 struct MockInterface {
1048 read_hook: Option<
1049 Box<
1050 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1051 + Send
1052 + Sync,
1053 >,
1054 >,
1055 write_hook:
1056 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1057 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1058 }
1059
1060 impl super::async_interface::Interface for MockInterface {
1061 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1062 Ok(())
1063 }
1064
1065 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1066 Ok(Cow::Owned(test_device_info()))
1067 }
1068
1069 async fn read(
1070 &self,
1071 device_block_offset: u64,
1072 block_count: u32,
1073 vmo: &Arc<zx::Vmo>,
1074 vmo_offset: u64,
1075 _trace_flow_id: TraceFlowId,
1076 ) -> Result<(), zx::Status> {
1077 if let Some(read_hook) = &self.read_hook {
1078 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1079 } else {
1080 unimplemented!();
1081 }
1082 }
1083
1084 async fn write(
1085 &self,
1086 device_block_offset: u64,
1087 _block_count: u32,
1088 _vmo: &Arc<zx::Vmo>,
1089 _vmo_offset: u64,
1090 _opts: WriteOptions,
1091 _trace_flow_id: TraceFlowId,
1092 ) -> Result<(), zx::Status> {
1093 if let Some(write_hook) = &self.write_hook {
1094 write_hook(device_block_offset).await
1095 } else {
1096 unimplemented!();
1097 }
1098 }
1099
1100 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1101 unreachable!();
1102 }
1103
1104 fn barrier(&self) -> Result<(), zx::Status> {
1105 if let Some(barrier_hook) = &self.barrier_hook {
1106 barrier_hook()
1107 } else {
1108 unimplemented!();
1109 }
1110 }
1111
1112 async fn trim(
1113 &self,
1114 _device_block_offset: u64,
1115 _block_count: u32,
1116 _trace_flow_id: TraceFlowId,
1117 ) -> Result<(), zx::Status> {
1118 unreachable!();
1119 }
1120
1121 async fn get_volume_info(
1122 &self,
1123 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1124 let () = std::future::pending().await;
1126 unreachable!();
1127 }
1128 }
1129
1130 const BLOCK_SIZE: u32 = 512;
1131 const MAX_TRANSFER_BLOCKS: u32 = 10;
1132
1133 fn test_device_info() -> DeviceInfo {
1134 DeviceInfo::Partition(PartitionInfo {
1135 device_flags: fblock::Flag::READONLY,
1136 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1137 block_range: Some(0..100),
1138 type_guid: [1; 16],
1139 instance_guid: [2; 16],
1140 name: "foo".to_string(),
1141 flags: 0xabcd,
1142 })
1143 }
1144
1145 #[fuchsia::test]
1146 async fn test_split_request_only_issues_single_barrier() {
1147 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1148 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1149 let barrier_called = Arc::new(AtomicU32::new(0));
1150 let barrier_called_clone = barrier_called.clone();
1151
1152 futures::join!(
1153 async move {
1154 let block_server = BlockServer::new(
1155 BLOCK_SIZE,
1156 Arc::new(MockInterface {
1157 barrier_hook: Some(Box::new(move || {
1158 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1159 Ok(())
1160 })),
1161 write_hook: Some(Box::new(move |_device_block_offset| {
1162 Box::pin(async move { Ok(()) })
1163 })),
1164 ..MockInterface::default()
1165 }),
1166 );
1167 block_server.handle_requests(stream).await.unwrap();
1168 },
1169 async move {
1170 let (session_proxy, server) = fidl::endpoints::create_proxy();
1171
1172 proxy.open_session(server).unwrap();
1173
1174 let vmo_id = session_proxy
1175 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1176 .await
1177 .unwrap()
1178 .unwrap();
1179 assert_ne!(vmo_id.id, 0);
1180
1181 let mut fifo =
1182 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1183 let (mut reader, mut writer) = fifo.async_io();
1184 writer
1187 .write_entries(&BlockFifoRequest {
1188 command: BlockFifoCommand {
1189 opcode: BlockOpcode::Write.into_primitive(),
1190 flags: BlockIoFlag::PRE_BARRIER.bits(),
1191 ..Default::default()
1192 },
1193 vmoid: vmo_id.id,
1194 dev_offset: 0,
1195 length: MAX_TRANSFER_BLOCKS + 1,
1196 vmo_offset: 6,
1197 ..Default::default()
1198 })
1199 .await
1200 .unwrap();
1201
1202 let mut response = BlockFifoResponse::default();
1203 reader.read_entries(&mut response).await.unwrap();
1204 assert_eq!(response.status, zx::sys::ZX_OK);
1205
1206 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1207
1208 std::mem::drop(proxy);
1209 }
1210 );
1211 }
1212
1213 #[fuchsia::test]
1214 async fn test_barriers_not_supported() {
1215 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1216 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1217
1218 futures::join!(
1219 async move {
1220 let block_server = BlockServer::new(
1221 BLOCK_SIZE,
1222 Arc::new(MockInterface {
1223 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1224 write_hook: Some(Box::new(move |_device_block_offset| {
1225 Box::pin(async move { Ok(()) })
1226 })),
1227 ..MockInterface::default()
1228 }),
1229 );
1230 block_server.handle_requests(stream).await.unwrap();
1231 },
1232 async move {
1233 let (session_proxy, server) = fidl::endpoints::create_proxy();
1234
1235 proxy.open_session(server).unwrap();
1236
1237 let vmo_id = session_proxy
1238 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1239 .await
1240 .unwrap()
1241 .unwrap();
1242 assert_ne!(vmo_id.id, 0);
1243
1244 let mut fifo =
1245 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1246 let (mut reader, mut writer) = fifo.async_io();
1247
1248 writer
1251 .write_entries(&BlockFifoRequest {
1252 command: BlockFifoCommand {
1253 opcode: BlockOpcode::Write.into_primitive(),
1254 flags: BlockIoFlag::PRE_BARRIER.bits(),
1255 ..Default::default()
1256 },
1257 vmoid: vmo_id.id,
1258 dev_offset: 0,
1259 length: MAX_TRANSFER_BLOCKS + 1,
1260 vmo_offset: 6,
1261 ..Default::default()
1262 })
1263 .await
1264 .unwrap();
1265
1266 let mut response = BlockFifoResponse::default();
1267 reader.read_entries(&mut response).await.unwrap();
1268 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1269
1270 std::mem::drop(proxy);
1271 }
1272 );
1273 }
1274
1275 #[fuchsia::test]
1276 async fn test_barriers_ordering() {
1277 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1278 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1279 let barrier_called = Arc::new(AtomicBool::new(false));
1280
1281 futures::join!(
1282 async move {
1283 let barrier_called_clone = barrier_called.clone();
1284 let block_server = BlockServer::new(
1285 BLOCK_SIZE,
1286 Arc::new(MockInterface {
1287 barrier_hook: Some(Box::new(move || {
1288 barrier_called.store(true, Ordering::Relaxed);
1289 Ok(())
1290 })),
1291 write_hook: Some(Box::new(move |device_block_offset| {
1292 let barrier_called = barrier_called_clone.clone();
1293 Box::pin(async move {
1294 if device_block_offset % 2 == 0 {
1296 fasync::Timer::new(fasync::MonotonicInstant::after(
1297 zx::MonotonicDuration::from_millis(200),
1298 ))
1299 .await;
1300 }
1301 assert!(barrier_called.load(Ordering::Relaxed));
1302 Ok(())
1303 })
1304 })),
1305 ..MockInterface::default()
1306 }),
1307 );
1308 block_server.handle_requests(stream).await.unwrap();
1309 },
1310 async move {
1311 let (session_proxy, server) = fidl::endpoints::create_proxy();
1312
1313 proxy.open_session(server).unwrap();
1314
1315 let vmo_id = session_proxy
1316 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1317 .await
1318 .unwrap()
1319 .unwrap();
1320 assert_ne!(vmo_id.id, 0);
1321
1322 let mut fifo =
1323 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1324 let (mut reader, mut writer) = fifo.async_io();
1325
1326 writer
1327 .write_entries(&BlockFifoRequest {
1328 command: BlockFifoCommand {
1329 opcode: BlockOpcode::Write.into_primitive(),
1330 flags: BlockIoFlag::PRE_BARRIER.bits(),
1331 ..Default::default()
1332 },
1333 vmoid: vmo_id.id,
1334 dev_offset: 0,
1335 length: 5,
1336 vmo_offset: 6,
1337 ..Default::default()
1338 })
1339 .await
1340 .unwrap();
1341
1342 for i in 0..10 {
1343 writer
1344 .write_entries(&BlockFifoRequest {
1345 command: BlockFifoCommand {
1346 opcode: BlockOpcode::Write.into_primitive(),
1347 ..Default::default()
1348 },
1349 vmoid: vmo_id.id,
1350 dev_offset: i + 1,
1351 length: 5,
1352 vmo_offset: 6,
1353 ..Default::default()
1354 })
1355 .await
1356 .unwrap();
1357 }
1358 for _ in 0..11 {
1359 let mut response = BlockFifoResponse::default();
1360 reader.read_entries(&mut response).await.unwrap();
1361 assert_eq!(response.status, zx::sys::ZX_OK);
1362 }
1363
1364 std::mem::drop(proxy);
1365 }
1366 );
1367 }
1368
1369 #[fuchsia::test]
1370 async fn test_info() {
1371 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1372
1373 futures::join!(
1374 async {
1375 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1376 block_server.handle_requests(stream).await.unwrap();
1377 },
1378 async {
1379 let expected_info = test_device_info();
1380 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1381 info
1382 } else {
1383 unreachable!()
1384 };
1385
1386 let block_info = proxy.get_info().await.unwrap().unwrap();
1387 assert_eq!(
1388 block_info.block_count,
1389 partition_info.block_range.as_ref().unwrap().end
1390 - partition_info.block_range.as_ref().unwrap().start
1391 );
1392 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1393
1394 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1395
1396 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1397 assert_eq!(status, zx::sys::ZX_OK);
1398 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1399
1400 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1401 assert_eq!(status, zx::sys::ZX_OK);
1402 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1403
1404 let (status, name) = proxy.get_name().await.unwrap();
1405 assert_eq!(status, zx::sys::ZX_OK);
1406 assert_eq!(name.as_ref(), Some(&partition_info.name));
1407
1408 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1409 assert_eq!(metadata.name, name);
1410 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1411 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1412 assert_eq!(
1413 metadata.start_block_offset,
1414 Some(partition_info.block_range.as_ref().unwrap().start)
1415 );
1416 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1417 assert_eq!(metadata.flags, Some(partition_info.flags));
1418
1419 std::mem::drop(proxy);
1420 }
1421 );
1422 }
1423
1424 #[fuchsia::test]
1425 async fn test_attach_vmo() {
1426 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1427
1428 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1429 let koid = vmo.get_koid().unwrap();
1430
1431 futures::join!(
1432 async {
1433 let block_server = BlockServer::new(
1434 BLOCK_SIZE,
1435 Arc::new(MockInterface {
1436 read_hook: Some(Box::new(move |_, _, vmo, _| {
1437 assert_eq!(vmo.get_koid().unwrap(), koid);
1438 Box::pin(async { Ok(()) })
1439 })),
1440 ..MockInterface::default()
1441 }),
1442 );
1443 block_server.handle_requests(stream).await.unwrap();
1444 },
1445 async move {
1446 let (session_proxy, server) = fidl::endpoints::create_proxy();
1447
1448 proxy.open_session(server).unwrap();
1449
1450 let vmo_id = session_proxy
1451 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1452 .await
1453 .unwrap()
1454 .unwrap();
1455 assert_ne!(vmo_id.id, 0);
1456
1457 let mut fifo =
1458 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1459 let (mut reader, mut writer) = fifo.async_io();
1460
1461 let mut count = 1;
1463 loop {
1464 match session_proxy
1465 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1466 .await
1467 .unwrap()
1468 {
1469 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1470 Err(e) => {
1471 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1472 break;
1473 }
1474 }
1475
1476 if count % 10 == 0 {
1478 writer
1479 .write_entries(&BlockFifoRequest {
1480 command: BlockFifoCommand {
1481 opcode: BlockOpcode::Read.into_primitive(),
1482 ..Default::default()
1483 },
1484 vmoid: vmo_id.id,
1485 length: 1,
1486 ..Default::default()
1487 })
1488 .await
1489 .unwrap();
1490
1491 let mut response = BlockFifoResponse::default();
1492 reader.read_entries(&mut response).await.unwrap();
1493 assert_eq!(response.status, zx::sys::ZX_OK);
1494 }
1495
1496 count += 1;
1497 }
1498
1499 assert_eq!(count, u16::MAX as u64);
1500
1501 writer
1503 .write_entries(&BlockFifoRequest {
1504 command: BlockFifoCommand {
1505 opcode: BlockOpcode::CloseVmo.into_primitive(),
1506 ..Default::default()
1507 },
1508 vmoid: vmo_id.id,
1509 ..Default::default()
1510 })
1511 .await
1512 .unwrap();
1513
1514 let mut response = BlockFifoResponse::default();
1515 reader.read_entries(&mut response).await.unwrap();
1516 assert_eq!(response.status, zx::sys::ZX_OK);
1517
1518 let new_vmo_id = session_proxy
1519 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1520 .await
1521 .unwrap()
1522 .unwrap();
1523 assert_eq!(new_vmo_id.id, vmo_id.id);
1525
1526 std::mem::drop(proxy);
1527 }
1528 );
1529 }
1530
1531 #[fuchsia::test]
1532 async fn test_close() {
1533 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1534
1535 let mut server = std::pin::pin!(async {
1536 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1537 block_server.handle_requests(stream).await.unwrap();
1538 }
1539 .fuse());
1540
1541 let mut client = std::pin::pin!(async {
1542 let (session_proxy, server) = fidl::endpoints::create_proxy();
1543
1544 proxy.open_session(server).unwrap();
1545
1546 std::mem::drop(proxy);
1549
1550 session_proxy.close().await.unwrap().unwrap();
1551
1552 let _: () = std::future::pending().await;
1554 }
1555 .fuse());
1556
1557 futures::select!(
1558 _ = server => {}
1559 _ = client => unreachable!(),
1560 );
1561 }
1562
1563 #[derive(Default)]
1564 struct IoMockInterface {
1565 do_checks: bool,
1566 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1567 return_errors: bool,
1568 }
1569
1570 #[derive(Debug)]
1571 enum ExpectedOp {
1572 Read(u64, u32, u64),
1573 Write(u64, u32, u64),
1574 Trim(u64, u32),
1575 Flush,
1576 }
1577
1578 impl super::async_interface::Interface for IoMockInterface {
1579 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1580 Ok(())
1581 }
1582
1583 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1584 Ok(Cow::Owned(test_device_info()))
1585 }
1586
1587 async fn read(
1588 &self,
1589 device_block_offset: u64,
1590 block_count: u32,
1591 _vmo: &Arc<zx::Vmo>,
1592 vmo_offset: u64,
1593 _trace_flow_id: TraceFlowId,
1594 ) -> Result<(), zx::Status> {
1595 if self.return_errors {
1596 Err(zx::Status::INTERNAL)
1597 } else {
1598 if self.do_checks {
1599 assert_matches!(
1600 self.expected_op.lock().take(),
1601 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1602 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1603 "Read {device_block_offset} {block_count} {vmo_offset}"
1604 );
1605 }
1606 Ok(())
1607 }
1608 }
1609
1610 async fn write(
1611 &self,
1612 device_block_offset: u64,
1613 block_count: u32,
1614 _vmo: &Arc<zx::Vmo>,
1615 vmo_offset: u64,
1616 _opts: WriteOptions,
1617 _trace_flow_id: TraceFlowId,
1618 ) -> Result<(), zx::Status> {
1619 if self.return_errors {
1620 Err(zx::Status::NOT_SUPPORTED)
1621 } else {
1622 if self.do_checks {
1623 assert_matches!(
1624 self.expected_op.lock().take(),
1625 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1626 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1627 "Write {device_block_offset} {block_count} {vmo_offset}"
1628 );
1629 }
1630 Ok(())
1631 }
1632 }
1633
1634 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1635 if self.return_errors {
1636 Err(zx::Status::NO_RESOURCES)
1637 } else {
1638 if self.do_checks {
1639 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1640 }
1641 Ok(())
1642 }
1643 }
1644
1645 fn barrier(&self) -> Result<(), zx::Status> {
1646 unreachable!()
1647 }
1648
1649 async fn trim(
1650 &self,
1651 device_block_offset: u64,
1652 block_count: u32,
1653 _trace_flow_id: TraceFlowId,
1654 ) -> Result<(), zx::Status> {
1655 if self.return_errors {
1656 Err(zx::Status::NO_MEMORY)
1657 } else {
1658 if self.do_checks {
1659 assert_matches!(
1660 self.expected_op.lock().take(),
1661 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1662 block_count == b,
1663 "Trim {device_block_offset} {block_count}"
1664 );
1665 }
1666 Ok(())
1667 }
1668 }
1669 }
1670
1671 #[fuchsia::test]
1672 async fn test_io() {
1673 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1674
1675 let expected_op = Arc::new(Mutex::new(None));
1676 let expected_op_clone = expected_op.clone();
1677
1678 let server = async {
1679 let block_server = BlockServer::new(
1680 BLOCK_SIZE,
1681 Arc::new(IoMockInterface {
1682 return_errors: false,
1683 do_checks: true,
1684 expected_op: expected_op_clone,
1685 }),
1686 );
1687 block_server.handle_requests(stream).await.unwrap();
1688 };
1689
1690 let client = async move {
1691 let (session_proxy, server) = fidl::endpoints::create_proxy();
1692
1693 proxy.open_session(server).unwrap();
1694
1695 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1696 let vmo_id = session_proxy
1697 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1698 .await
1699 .unwrap()
1700 .unwrap();
1701
1702 let mut fifo =
1703 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1704 let (mut reader, mut writer) = fifo.async_io();
1705
1706 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1708 writer
1709 .write_entries(&BlockFifoRequest {
1710 command: BlockFifoCommand {
1711 opcode: BlockOpcode::Read.into_primitive(),
1712 ..Default::default()
1713 },
1714 vmoid: vmo_id.id,
1715 dev_offset: 1,
1716 length: 2,
1717 vmo_offset: 3,
1718 ..Default::default()
1719 })
1720 .await
1721 .unwrap();
1722
1723 let mut response = BlockFifoResponse::default();
1724 reader.read_entries(&mut response).await.unwrap();
1725 assert_eq!(response.status, zx::sys::ZX_OK);
1726
1727 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1729 writer
1730 .write_entries(&BlockFifoRequest {
1731 command: BlockFifoCommand {
1732 opcode: BlockOpcode::Write.into_primitive(),
1733 ..Default::default()
1734 },
1735 vmoid: vmo_id.id,
1736 dev_offset: 4,
1737 length: 5,
1738 vmo_offset: 6,
1739 ..Default::default()
1740 })
1741 .await
1742 .unwrap();
1743
1744 let mut response = BlockFifoResponse::default();
1745 reader.read_entries(&mut response).await.unwrap();
1746 assert_eq!(response.status, zx::sys::ZX_OK);
1747
1748 *expected_op.lock() = Some(ExpectedOp::Flush);
1750 writer
1751 .write_entries(&BlockFifoRequest {
1752 command: BlockFifoCommand {
1753 opcode: BlockOpcode::Flush.into_primitive(),
1754 ..Default::default()
1755 },
1756 ..Default::default()
1757 })
1758 .await
1759 .unwrap();
1760
1761 reader.read_entries(&mut response).await.unwrap();
1762 assert_eq!(response.status, zx::sys::ZX_OK);
1763
1764 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1766 writer
1767 .write_entries(&BlockFifoRequest {
1768 command: BlockFifoCommand {
1769 opcode: BlockOpcode::Trim.into_primitive(),
1770 ..Default::default()
1771 },
1772 dev_offset: 7,
1773 length: 8,
1774 ..Default::default()
1775 })
1776 .await
1777 .unwrap();
1778
1779 reader.read_entries(&mut response).await.unwrap();
1780 assert_eq!(response.status, zx::sys::ZX_OK);
1781
1782 std::mem::drop(proxy);
1783 };
1784
1785 futures::join!(server, client);
1786 }
1787
1788 #[fuchsia::test]
1789 async fn test_io_errors() {
1790 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1791
1792 futures::join!(
1793 async {
1794 let block_server = BlockServer::new(
1795 BLOCK_SIZE,
1796 Arc::new(IoMockInterface {
1797 return_errors: true,
1798 do_checks: false,
1799 expected_op: Arc::new(Mutex::new(None)),
1800 }),
1801 );
1802 block_server.handle_requests(stream).await.unwrap();
1803 },
1804 async move {
1805 let (session_proxy, server) = fidl::endpoints::create_proxy();
1806
1807 proxy.open_session(server).unwrap();
1808
1809 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1810 let vmo_id = session_proxy
1811 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1812 .await
1813 .unwrap()
1814 .unwrap();
1815
1816 let mut fifo =
1817 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1818 let (mut reader, mut writer) = fifo.async_io();
1819
1820 writer
1822 .write_entries(&BlockFifoRequest {
1823 command: BlockFifoCommand {
1824 opcode: BlockOpcode::Read.into_primitive(),
1825 ..Default::default()
1826 },
1827 vmoid: vmo_id.id,
1828 length: 1,
1829 reqid: 1,
1830 ..Default::default()
1831 })
1832 .await
1833 .unwrap();
1834
1835 let mut response = BlockFifoResponse::default();
1836 reader.read_entries(&mut response).await.unwrap();
1837 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1838
1839 writer
1841 .write_entries(&BlockFifoRequest {
1842 command: BlockFifoCommand {
1843 opcode: BlockOpcode::Write.into_primitive(),
1844 ..Default::default()
1845 },
1846 vmoid: vmo_id.id,
1847 length: 1,
1848 reqid: 2,
1849 ..Default::default()
1850 })
1851 .await
1852 .unwrap();
1853
1854 reader.read_entries(&mut response).await.unwrap();
1855 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1856
1857 writer
1859 .write_entries(&BlockFifoRequest {
1860 command: BlockFifoCommand {
1861 opcode: BlockOpcode::Flush.into_primitive(),
1862 ..Default::default()
1863 },
1864 reqid: 3,
1865 ..Default::default()
1866 })
1867 .await
1868 .unwrap();
1869
1870 reader.read_entries(&mut response).await.unwrap();
1871 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1872
1873 writer
1875 .write_entries(&BlockFifoRequest {
1876 command: BlockFifoCommand {
1877 opcode: BlockOpcode::Trim.into_primitive(),
1878 ..Default::default()
1879 },
1880 reqid: 4,
1881 length: 1,
1882 ..Default::default()
1883 })
1884 .await
1885 .unwrap();
1886
1887 reader.read_entries(&mut response).await.unwrap();
1888 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1889
1890 std::mem::drop(proxy);
1891 }
1892 );
1893 }
1894
1895 #[fuchsia::test]
1896 async fn test_invalid_args() {
1897 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1898
1899 futures::join!(
1900 async {
1901 let block_server = BlockServer::new(
1902 BLOCK_SIZE,
1903 Arc::new(IoMockInterface {
1904 return_errors: false,
1905 do_checks: false,
1906 expected_op: Arc::new(Mutex::new(None)),
1907 }),
1908 );
1909 block_server.handle_requests(stream).await.unwrap();
1910 },
1911 async move {
1912 let (session_proxy, server) = fidl::endpoints::create_proxy();
1913
1914 proxy.open_session(server).unwrap();
1915
1916 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1917 let vmo_id = session_proxy
1918 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1919 .await
1920 .unwrap()
1921 .unwrap();
1922
1923 let mut fifo =
1924 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1925
1926 async fn test(
1927 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1928 request: BlockFifoRequest,
1929 ) -> Result<(), zx::Status> {
1930 let (mut reader, mut writer) = fifo.async_io();
1931 writer.write_entries(&request).await.unwrap();
1932 let mut response = BlockFifoResponse::default();
1933 reader.read_entries(&mut response).await.unwrap();
1934 zx::Status::ok(response.status)
1935 }
1936
1937 let good_read_request = || BlockFifoRequest {
1940 command: BlockFifoCommand {
1941 opcode: BlockOpcode::Read.into_primitive(),
1942 ..Default::default()
1943 },
1944 length: 1,
1945 vmoid: vmo_id.id,
1946 ..Default::default()
1947 };
1948
1949 assert_eq!(
1950 test(
1951 &mut fifo,
1952 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1953 )
1954 .await,
1955 Err(zx::Status::IO)
1956 );
1957
1958 assert_eq!(
1959 test(
1960 &mut fifo,
1961 BlockFifoRequest {
1962 vmo_offset: 0xffff_ffff_ffff_ffff,
1963 ..good_read_request()
1964 }
1965 )
1966 .await,
1967 Err(zx::Status::OUT_OF_RANGE)
1968 );
1969
1970 assert_eq!(
1971 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1972 Err(zx::Status::INVALID_ARGS)
1973 );
1974
1975 let good_write_request = || BlockFifoRequest {
1978 command: BlockFifoCommand {
1979 opcode: BlockOpcode::Write.into_primitive(),
1980 ..Default::default()
1981 },
1982 length: 1,
1983 vmoid: vmo_id.id,
1984 ..Default::default()
1985 };
1986
1987 assert_eq!(
1988 test(
1989 &mut fifo,
1990 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1991 )
1992 .await,
1993 Err(zx::Status::IO)
1994 );
1995
1996 assert_eq!(
1997 test(
1998 &mut fifo,
1999 BlockFifoRequest {
2000 vmo_offset: 0xffff_ffff_ffff_ffff,
2001 ..good_write_request()
2002 }
2003 )
2004 .await,
2005 Err(zx::Status::OUT_OF_RANGE)
2006 );
2007
2008 assert_eq!(
2009 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2010 Err(zx::Status::INVALID_ARGS)
2011 );
2012
2013 assert_eq!(
2016 test(
2017 &mut fifo,
2018 BlockFifoRequest {
2019 command: BlockFifoCommand {
2020 opcode: BlockOpcode::CloseVmo.into_primitive(),
2021 ..Default::default()
2022 },
2023 vmoid: vmo_id.id + 1,
2024 ..Default::default()
2025 }
2026 )
2027 .await,
2028 Err(zx::Status::IO)
2029 );
2030
2031 std::mem::drop(proxy);
2032 }
2033 );
2034 }
2035
2036 #[fuchsia::test]
2037 async fn test_concurrent_requests() {
2038 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2039
2040 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2041 let waiting_readers_clone = waiting_readers.clone();
2042
2043 futures::join!(
2044 async move {
2045 let block_server = BlockServer::new(
2046 BLOCK_SIZE,
2047 Arc::new(MockInterface {
2048 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2049 let (tx, rx) = oneshot::channel();
2050 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2051 Box::pin(async move {
2052 let _ = rx.await;
2053 Ok(())
2054 })
2055 })),
2056 ..MockInterface::default()
2057 }),
2058 );
2059 block_server.handle_requests(stream).await.unwrap();
2060 },
2061 async move {
2062 let (session_proxy, server) = fidl::endpoints::create_proxy();
2063
2064 proxy.open_session(server).unwrap();
2065
2066 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2067 let vmo_id = session_proxy
2068 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2069 .await
2070 .unwrap()
2071 .unwrap();
2072
2073 let mut fifo =
2074 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2075 let (mut reader, mut writer) = fifo.async_io();
2076
2077 writer
2078 .write_entries(&BlockFifoRequest {
2079 command: BlockFifoCommand {
2080 opcode: BlockOpcode::Read.into_primitive(),
2081 ..Default::default()
2082 },
2083 reqid: 1,
2084 dev_offset: 1, vmoid: vmo_id.id,
2086 length: 1,
2087 ..Default::default()
2088 })
2089 .await
2090 .unwrap();
2091
2092 writer
2093 .write_entries(&BlockFifoRequest {
2094 command: BlockFifoCommand {
2095 opcode: BlockOpcode::Read.into_primitive(),
2096 ..Default::default()
2097 },
2098 reqid: 2,
2099 dev_offset: 2,
2100 vmoid: vmo_id.id,
2101 length: 1,
2102 ..Default::default()
2103 })
2104 .await
2105 .unwrap();
2106
2107 poll_fn(|cx: &mut Context<'_>| {
2109 if waiting_readers.lock().len() == 2 {
2110 Poll::Ready(())
2111 } else {
2112 cx.waker().wake_by_ref();
2114 Poll::Pending
2115 }
2116 })
2117 .await;
2118
2119 let mut response = BlockFifoResponse::default();
2120 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2121
2122 let (id, tx) = waiting_readers.lock().pop().unwrap();
2123 tx.send(()).unwrap();
2124
2125 reader.read_entries(&mut response).await.unwrap();
2126 assert_eq!(response.status, zx::sys::ZX_OK);
2127 assert_eq!(response.reqid, id);
2128
2129 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2130
2131 let (id, tx) = waiting_readers.lock().pop().unwrap();
2132 tx.send(()).unwrap();
2133
2134 reader.read_entries(&mut response).await.unwrap();
2135 assert_eq!(response.status, zx::sys::ZX_OK);
2136 assert_eq!(response.reqid, id);
2137 }
2138 );
2139 }
2140
2141 #[fuchsia::test]
2142 async fn test_groups() {
2143 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2144
2145 futures::join!(
2146 async move {
2147 let block_server = BlockServer::new(
2148 BLOCK_SIZE,
2149 Arc::new(MockInterface {
2150 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2151 ..MockInterface::default()
2152 }),
2153 );
2154 block_server.handle_requests(stream).await.unwrap();
2155 },
2156 async move {
2157 let (session_proxy, server) = fidl::endpoints::create_proxy();
2158
2159 proxy.open_session(server).unwrap();
2160
2161 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2162 let vmo_id = session_proxy
2163 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2164 .await
2165 .unwrap()
2166 .unwrap();
2167
2168 let mut fifo =
2169 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2170 let (mut reader, mut writer) = fifo.async_io();
2171
2172 writer
2173 .write_entries(&BlockFifoRequest {
2174 command: BlockFifoCommand {
2175 opcode: BlockOpcode::Read.into_primitive(),
2176 flags: BlockIoFlag::GROUP_ITEM.bits(),
2177 ..Default::default()
2178 },
2179 group: 1,
2180 vmoid: vmo_id.id,
2181 length: 1,
2182 ..Default::default()
2183 })
2184 .await
2185 .unwrap();
2186
2187 writer
2188 .write_entries(&BlockFifoRequest {
2189 command: BlockFifoCommand {
2190 opcode: BlockOpcode::Read.into_primitive(),
2191 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2192 ..Default::default()
2193 },
2194 reqid: 2,
2195 group: 1,
2196 vmoid: vmo_id.id,
2197 length: 1,
2198 ..Default::default()
2199 })
2200 .await
2201 .unwrap();
2202
2203 let mut response = BlockFifoResponse::default();
2204 reader.read_entries(&mut response).await.unwrap();
2205 assert_eq!(response.status, zx::sys::ZX_OK);
2206 assert_eq!(response.reqid, 2);
2207 assert_eq!(response.group, 1);
2208 }
2209 );
2210 }
2211
2212 #[fuchsia::test]
2213 async fn test_group_error() {
2214 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2215
2216 let counter = Arc::new(AtomicU64::new(0));
2217 let counter_clone = counter.clone();
2218
2219 futures::join!(
2220 async move {
2221 let block_server = BlockServer::new(
2222 BLOCK_SIZE,
2223 Arc::new(MockInterface {
2224 read_hook: Some(Box::new(move |_, _, _, _| {
2225 counter_clone.fetch_add(1, Ordering::Relaxed);
2226 Box::pin(async { Err(zx::Status::BAD_STATE) })
2227 })),
2228 ..MockInterface::default()
2229 }),
2230 );
2231 block_server.handle_requests(stream).await.unwrap();
2232 },
2233 async move {
2234 let (session_proxy, server) = fidl::endpoints::create_proxy();
2235
2236 proxy.open_session(server).unwrap();
2237
2238 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2239 let vmo_id = session_proxy
2240 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2241 .await
2242 .unwrap()
2243 .unwrap();
2244
2245 let mut fifo =
2246 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2247 let (mut reader, mut writer) = fifo.async_io();
2248
2249 writer
2250 .write_entries(&BlockFifoRequest {
2251 command: BlockFifoCommand {
2252 opcode: BlockOpcode::Read.into_primitive(),
2253 flags: BlockIoFlag::GROUP_ITEM.bits(),
2254 ..Default::default()
2255 },
2256 group: 1,
2257 vmoid: vmo_id.id,
2258 length: 1,
2259 ..Default::default()
2260 })
2261 .await
2262 .unwrap();
2263
2264 poll_fn(|cx: &mut Context<'_>| {
2266 if counter.load(Ordering::Relaxed) == 1 {
2267 Poll::Ready(())
2268 } else {
2269 cx.waker().wake_by_ref();
2271 Poll::Pending
2272 }
2273 })
2274 .await;
2275
2276 let mut response = BlockFifoResponse::default();
2277 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2278
2279 writer
2280 .write_entries(&BlockFifoRequest {
2281 command: BlockFifoCommand {
2282 opcode: BlockOpcode::Read.into_primitive(),
2283 flags: BlockIoFlag::GROUP_ITEM.bits(),
2284 ..Default::default()
2285 },
2286 group: 1,
2287 vmoid: vmo_id.id,
2288 length: 1,
2289 ..Default::default()
2290 })
2291 .await
2292 .unwrap();
2293
2294 writer
2295 .write_entries(&BlockFifoRequest {
2296 command: BlockFifoCommand {
2297 opcode: BlockOpcode::Read.into_primitive(),
2298 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2299 ..Default::default()
2300 },
2301 reqid: 2,
2302 group: 1,
2303 vmoid: vmo_id.id,
2304 length: 1,
2305 ..Default::default()
2306 })
2307 .await
2308 .unwrap();
2309
2310 reader.read_entries(&mut response).await.unwrap();
2311 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2312 assert_eq!(response.reqid, 2);
2313 assert_eq!(response.group, 1);
2314
2315 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2316
2317 assert_eq!(counter.load(Ordering::Relaxed), 1);
2319 }
2320 );
2321 }
2322
2323 #[fuchsia::test]
2324 async fn test_group_with_two_lasts() {
2325 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2326
2327 let (tx, rx) = oneshot::channel();
2328
2329 futures::join!(
2330 async move {
2331 let rx = Mutex::new(Some(rx));
2332 let block_server = BlockServer::new(
2333 BLOCK_SIZE,
2334 Arc::new(MockInterface {
2335 read_hook: Some(Box::new(move |_, _, _, _| {
2336 let rx = rx.lock().take().unwrap();
2337 Box::pin(async {
2338 let _ = rx.await;
2339 Ok(())
2340 })
2341 })),
2342 ..MockInterface::default()
2343 }),
2344 );
2345 block_server.handle_requests(stream).await.unwrap();
2346 },
2347 async move {
2348 let (session_proxy, server) = fidl::endpoints::create_proxy();
2349
2350 proxy.open_session(server).unwrap();
2351
2352 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2353 let vmo_id = session_proxy
2354 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2355 .await
2356 .unwrap()
2357 .unwrap();
2358
2359 let mut fifo =
2360 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2361 let (mut reader, mut writer) = fifo.async_io();
2362
2363 writer
2364 .write_entries(&BlockFifoRequest {
2365 command: BlockFifoCommand {
2366 opcode: BlockOpcode::Read.into_primitive(),
2367 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2368 ..Default::default()
2369 },
2370 reqid: 1,
2371 group: 1,
2372 vmoid: vmo_id.id,
2373 length: 1,
2374 ..Default::default()
2375 })
2376 .await
2377 .unwrap();
2378
2379 writer
2380 .write_entries(&BlockFifoRequest {
2381 command: BlockFifoCommand {
2382 opcode: BlockOpcode::Read.into_primitive(),
2383 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2384 ..Default::default()
2385 },
2386 reqid: 2,
2387 group: 1,
2388 vmoid: vmo_id.id,
2389 length: 1,
2390 ..Default::default()
2391 })
2392 .await
2393 .unwrap();
2394
2395 writer
2397 .write_entries(&BlockFifoRequest {
2398 command: BlockFifoCommand {
2399 opcode: BlockOpcode::CloseVmo.into_primitive(),
2400 ..Default::default()
2401 },
2402 reqid: 3,
2403 vmoid: vmo_id.id,
2404 ..Default::default()
2405 })
2406 .await
2407 .unwrap();
2408
2409 let mut response = BlockFifoResponse::default();
2411 reader.read_entries(&mut response).await.unwrap();
2412 assert_eq!(response.status, zx::sys::ZX_OK);
2413 assert_eq!(response.reqid, 3);
2414
2415 tx.send(()).unwrap();
2417
2418 let mut response = BlockFifoResponse::default();
2421 reader.read_entries(&mut response).await.unwrap();
2422 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2423 assert_eq!(response.reqid, 1);
2424 assert_eq!(response.group, 1);
2425 }
2426 );
2427 }
2428
2429 #[fuchsia::test(allow_stalls = false)]
2430 async fn test_requests_dont_block_sessions() {
2431 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2432
2433 let (tx, rx) = oneshot::channel();
2434
2435 fasync::Task::local(async move {
2436 let rx = Mutex::new(Some(rx));
2437 let block_server = BlockServer::new(
2438 BLOCK_SIZE,
2439 Arc::new(MockInterface {
2440 read_hook: Some(Box::new(move |_, _, _, _| {
2441 let rx = rx.lock().take().unwrap();
2442 Box::pin(async {
2443 let _ = rx.await;
2444 Ok(())
2445 })
2446 })),
2447 ..MockInterface::default()
2448 }),
2449 );
2450 block_server.handle_requests(stream).await.unwrap();
2451 })
2452 .detach();
2453
2454 let mut fut = pin!(async {
2455 let (session_proxy, server) = fidl::endpoints::create_proxy();
2456
2457 proxy.open_session(server).unwrap();
2458
2459 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2460 let vmo_id = session_proxy
2461 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2462 .await
2463 .unwrap()
2464 .unwrap();
2465
2466 let mut fifo =
2467 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2468 let (mut reader, mut writer) = fifo.async_io();
2469
2470 writer
2471 .write_entries(&BlockFifoRequest {
2472 command: BlockFifoCommand {
2473 opcode: BlockOpcode::Read.into_primitive(),
2474 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2475 ..Default::default()
2476 },
2477 reqid: 1,
2478 group: 1,
2479 vmoid: vmo_id.id,
2480 length: 1,
2481 ..Default::default()
2482 })
2483 .await
2484 .unwrap();
2485
2486 let mut response = BlockFifoResponse::default();
2487 reader.read_entries(&mut response).await.unwrap();
2488 assert_eq!(response.status, zx::sys::ZX_OK);
2489 });
2490
2491 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2493
2494 let mut fut2 = pin!(proxy.get_volume_info());
2495
2496 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2498
2499 let _ = tx.send(());
2502
2503 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2504 }
2505
2506 #[fuchsia::test]
2507 async fn test_request_flow_control() {
2508 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2509
2510 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2513 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2514 let event_clone = event.clone();
2515 futures::join!(
2516 async move {
2517 let block_server = BlockServer::new(
2518 BLOCK_SIZE,
2519 Arc::new(MockInterface {
2520 read_hook: Some(Box::new(move |_, _, _, _| {
2521 let event_clone = event_clone.clone();
2522 Box::pin(async move {
2523 if !event_clone.1.load(Ordering::SeqCst) {
2524 event_clone.0.listen().await;
2525 }
2526 Ok(())
2527 })
2528 })),
2529 ..MockInterface::default()
2530 }),
2531 );
2532 block_server.handle_requests(stream).await.unwrap();
2533 },
2534 async move {
2535 let (session_proxy, server) = fidl::endpoints::create_proxy();
2536
2537 proxy.open_session(server).unwrap();
2538
2539 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2540 let vmo_id = session_proxy
2541 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2542 .await
2543 .unwrap()
2544 .unwrap();
2545
2546 let mut fifo =
2547 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2548 let (mut reader, mut writer) = fifo.async_io();
2549
2550 for i in 0..MAX_REQUESTS {
2551 writer
2552 .write_entries(&BlockFifoRequest {
2553 command: BlockFifoCommand {
2554 opcode: BlockOpcode::Read.into_primitive(),
2555 ..Default::default()
2556 },
2557 reqid: (i + 1) as u32,
2558 dev_offset: i,
2559 vmoid: vmo_id.id,
2560 length: 1,
2561 ..Default::default()
2562 })
2563 .await
2564 .unwrap();
2565 }
2566 assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2567 command: BlockFifoCommand {
2568 opcode: BlockOpcode::Read.into_primitive(),
2569 ..Default::default()
2570 },
2571 reqid: u32::MAX,
2572 dev_offset: MAX_REQUESTS,
2573 vmoid: vmo_id.id,
2574 length: 1,
2575 ..Default::default()
2576 })))
2577 .is_pending());
2578 event.1.store(true, Ordering::SeqCst);
2580 event.0.notify(usize::MAX);
2581 let mut finished_reqids = vec![];
2583 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2584 let mut response = BlockFifoResponse::default();
2585 reader.read_entries(&mut response).await.unwrap();
2586 assert_eq!(response.status, zx::sys::ZX_OK);
2587 finished_reqids.push(response.reqid);
2588 writer
2589 .write_entries(&BlockFifoRequest {
2590 command: BlockFifoCommand {
2591 opcode: BlockOpcode::Read.into_primitive(),
2592 ..Default::default()
2593 },
2594 reqid: (i + 1) as u32,
2595 dev_offset: i,
2596 vmoid: vmo_id.id,
2597 length: 1,
2598 ..Default::default()
2599 })
2600 .await
2601 .unwrap();
2602 }
2603 let mut response = BlockFifoResponse::default();
2604 for _ in 0..MAX_REQUESTS {
2605 reader.read_entries(&mut response).await.unwrap();
2606 assert_eq!(response.status, zx::sys::ZX_OK);
2607 finished_reqids.push(response.reqid);
2608 }
2609 finished_reqids.sort();
2612 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2613 let mut i = 1;
2614 for reqid in finished_reqids {
2615 assert_eq!(reqid, i);
2616 i += 1;
2617 }
2618 }
2619 );
2620 }
2621
2622 #[fuchsia::test]
2623 async fn test_passthrough_io_with_fixed_map() {
2624 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2625
2626 let expected_op = Arc::new(Mutex::new(None));
2627 let expected_op_clone = expected_op.clone();
2628 futures::join!(
2629 async {
2630 let block_server = BlockServer::new(
2631 BLOCK_SIZE,
2632 Arc::new(IoMockInterface {
2633 return_errors: false,
2634 do_checks: true,
2635 expected_op: expected_op_clone,
2636 }),
2637 );
2638 block_server.handle_requests(stream).await.unwrap();
2639 },
2640 async move {
2641 let (session_proxy, server) = fidl::endpoints::create_proxy();
2642
2643 let mappings = [fblock::BlockOffsetMapping {
2644 source_block_offset: 0,
2645 target_block_offset: 10,
2646 length: 20,
2647 }];
2648 proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2649
2650 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2651 let vmo_id = session_proxy
2652 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2653 .await
2654 .unwrap()
2655 .unwrap();
2656
2657 let mut fifo =
2658 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2659 let (mut reader, mut writer) = fifo.async_io();
2660
2661 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2663 writer
2664 .write_entries(&BlockFifoRequest {
2665 command: BlockFifoCommand {
2666 opcode: BlockOpcode::Read.into_primitive(),
2667 ..Default::default()
2668 },
2669 vmoid: vmo_id.id,
2670 dev_offset: 1,
2671 length: 2,
2672 vmo_offset: 3,
2673 ..Default::default()
2674 })
2675 .await
2676 .unwrap();
2677
2678 let mut response = BlockFifoResponse::default();
2679 reader.read_entries(&mut response).await.unwrap();
2680 assert_eq!(response.status, zx::sys::ZX_OK);
2681
2682 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2684 writer
2685 .write_entries(&BlockFifoRequest {
2686 command: BlockFifoCommand {
2687 opcode: BlockOpcode::Write.into_primitive(),
2688 ..Default::default()
2689 },
2690 vmoid: vmo_id.id,
2691 dev_offset: 4,
2692 length: 5,
2693 vmo_offset: 6,
2694 ..Default::default()
2695 })
2696 .await
2697 .unwrap();
2698
2699 reader.read_entries(&mut response).await.unwrap();
2700 assert_eq!(response.status, zx::sys::ZX_OK);
2701
2702 *expected_op.lock() = Some(ExpectedOp::Flush);
2704 writer
2705 .write_entries(&BlockFifoRequest {
2706 command: BlockFifoCommand {
2707 opcode: BlockOpcode::Flush.into_primitive(),
2708 ..Default::default()
2709 },
2710 ..Default::default()
2711 })
2712 .await
2713 .unwrap();
2714
2715 reader.read_entries(&mut response).await.unwrap();
2716 assert_eq!(response.status, zx::sys::ZX_OK);
2717
2718 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2720 writer
2721 .write_entries(&BlockFifoRequest {
2722 command: BlockFifoCommand {
2723 opcode: BlockOpcode::Trim.into_primitive(),
2724 ..Default::default()
2725 },
2726 dev_offset: 7,
2727 length: 3,
2728 ..Default::default()
2729 })
2730 .await
2731 .unwrap();
2732
2733 reader.read_entries(&mut response).await.unwrap();
2734 assert_eq!(response.status, zx::sys::ZX_OK);
2735
2736 *expected_op.lock() = None;
2738 writer
2739 .write_entries(&BlockFifoRequest {
2740 command: BlockFifoCommand {
2741 opcode: BlockOpcode::Read.into_primitive(),
2742 ..Default::default()
2743 },
2744 vmoid: vmo_id.id,
2745 dev_offset: 19,
2746 length: 2,
2747 vmo_offset: 3,
2748 ..Default::default()
2749 })
2750 .await
2751 .unwrap();
2752
2753 reader.read_entries(&mut response).await.unwrap();
2754 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2755
2756 std::mem::drop(proxy);
2757 }
2758 );
2759 }
2760
2761 #[fuchsia::test]
2762 fn operation_map() {
2763 fn expect_map_result(
2764 mut operation: Operation,
2765 mapping: Option<BlockOffsetMapping>,
2766 max_blocks: Option<NonZero<u32>>,
2767 expected_operations: Vec<Operation>,
2768 ) {
2769 let mut ops = vec![];
2770 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2771 ops.push(operation);
2772 operation = remainder;
2773 }
2774 ops.push(operation);
2775 assert_eq!(ops, expected_operations);
2776 }
2777
2778 expect_map_result(
2780 Operation::Read {
2781 device_block_offset: 10,
2782 block_count: 200,
2783 _unused: 0,
2784 vmo_offset: 0,
2785 },
2786 None,
2787 None,
2788 vec![Operation::Read {
2789 device_block_offset: 10,
2790 block_count: 200,
2791 _unused: 0,
2792 vmo_offset: 0,
2793 }],
2794 );
2795
2796 expect_map_result(
2798 Operation::Read {
2799 device_block_offset: 10,
2800 block_count: 200,
2801 _unused: 0,
2802 vmo_offset: 0,
2803 },
2804 None,
2805 NonZero::new(120),
2806 vec![
2807 Operation::Read {
2808 device_block_offset: 10,
2809 block_count: 120,
2810 _unused: 0,
2811 vmo_offset: 0,
2812 },
2813 Operation::Read {
2814 device_block_offset: 130,
2815 block_count: 80,
2816 _unused: 0,
2817 vmo_offset: 120 * 512,
2818 },
2819 ],
2820 );
2821 expect_map_result(
2822 Operation::Write {
2823 device_block_offset: 10,
2824 block_count: 200,
2825 options: WriteOptions::PRE_BARRIER,
2826 vmo_offset: 0,
2827 },
2828 None,
2829 NonZero::new(120),
2830 vec![
2831 Operation::Write {
2832 device_block_offset: 10,
2833 block_count: 120,
2834 options: WriteOptions::PRE_BARRIER,
2835 vmo_offset: 0,
2836 },
2837 Operation::Write {
2838 device_block_offset: 130,
2839 block_count: 80,
2840 options: WriteOptions::empty(),
2841 vmo_offset: 120 * 512,
2842 },
2843 ],
2844 );
2845 expect_map_result(
2846 Operation::Trim { device_block_offset: 10, block_count: 200 },
2847 None,
2848 NonZero::new(120),
2849 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2850 );
2851
2852 expect_map_result(
2854 Operation::Read {
2855 device_block_offset: 10,
2856 block_count: 200,
2857 _unused: 0,
2858 vmo_offset: 0,
2859 },
2860 Some(BlockOffsetMapping {
2861 source_block_offset: 10,
2862 target_block_offset: 100,
2863 length: 200,
2864 }),
2865 NonZero::new(120),
2866 vec![
2867 Operation::Read {
2868 device_block_offset: 100,
2869 block_count: 120,
2870 _unused: 0,
2871 vmo_offset: 0,
2872 },
2873 Operation::Read {
2874 device_block_offset: 220,
2875 block_count: 80,
2876 _unused: 0,
2877 vmo_offset: 120 * 512,
2878 },
2879 ],
2880 );
2881 expect_map_result(
2882 Operation::Write {
2883 device_block_offset: 10,
2884 block_count: 200,
2885 options: WriteOptions::PRE_BARRIER,
2886 vmo_offset: 0,
2887 },
2888 Some(BlockOffsetMapping {
2889 source_block_offset: 10,
2890 target_block_offset: 100,
2891 length: 200,
2892 }),
2893 NonZero::new(120),
2894 vec![
2895 Operation::Write {
2896 device_block_offset: 100,
2897 block_count: 120,
2898 options: WriteOptions::PRE_BARRIER,
2899 vmo_offset: 0,
2900 },
2901 Operation::Write {
2902 device_block_offset: 220,
2903 block_count: 80,
2904 options: WriteOptions::empty(),
2905 vmo_offset: 120 * 512,
2906 },
2907 ],
2908 );
2909 expect_map_result(
2910 Operation::Trim { device_block_offset: 10, block_count: 200 },
2911 Some(BlockOffsetMapping {
2912 source_block_offset: 10,
2913 target_block_offset: 100,
2914 length: 200,
2915 }),
2916 NonZero::new(120),
2917 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2918 );
2919 }
2920}