1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7 Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block::DeviceFlag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27 fn open_session(
42 &self,
43 session_manager: Arc<SessionManager<Self>>,
44 stream: fblock::SessionRequestStream,
45 offset_map: OffsetMap,
46 block_size: u32,
47 ) -> impl Future<Output = Result<(), Error>> + Send {
48 session_manager.serve_session(stream, offset_map, block_size)
50 }
51
52 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56 async { Ok(()) }
57 }
58
59 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62 fn get_info(&self) -> Cow<'_, DeviceInfo>;
64
65 fn read(
67 &self,
68 device_block_offset: u64,
69 block_count: u32,
70 vmo: &Arc<zx::Vmo>,
71 vmo_offset: u64, opts: ReadOptions,
73 trace_flow_id: TraceFlowId,
74 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76 fn write(
78 &self,
79 device_block_offset: u64,
80 block_count: u32,
81 vmo: &Arc<zx::Vmo>,
82 vmo_offset: u64, opts: WriteOptions,
84 trace_flow_id: TraceFlowId,
85 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87 fn flush(
89 &self,
90 trace_flow_id: TraceFlowId,
91 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
92
93 fn trim(
95 &self,
96 device_block_offset: u64,
97 block_count: u32,
98 trace_flow_id: TraceFlowId,
99 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101 fn get_volume_info(
103 &self,
104 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
105 {
106 async { Err(zx::Status::NOT_SUPPORTED) }
107 }
108
109 fn query_slices(
111 &self,
112 _start_slices: &[u64],
113 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
114 async { Err(zx::Status::NOT_SUPPORTED) }
115 }
116
117 fn extend(
119 &self,
120 _start_slice: u64,
121 _slice_count: u64,
122 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
123 async { Err(zx::Status::NOT_SUPPORTED) }
124 }
125
126 fn shrink(
128 &self,
129 _start_slice: u64,
130 _slice_count: u64,
131 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132 async { Err(zx::Status::NOT_SUPPORTED) }
133 }
134}
135
136pub struct PassthroughSession(fblock::SessionProxy);
138
139impl PassthroughSession {
140 pub fn new(proxy: fblock::SessionProxy) -> Self {
141 Self(proxy)
142 }
143
144 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
145 match request {
146 fblock::SessionRequest::GetFifo { responder } => {
147 responder.send(self.0.get_fifo().await?)?;
148 }
149 fblock::SessionRequest::AttachVmo { vmo, responder } => {
150 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
151 }
152 fblock::SessionRequest::Close { responder } => {
153 responder.send(self.0.close().await?)?;
154 }
155 }
156 Ok(())
157 }
158
159 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
161 while let Some(Ok(request)) = stream.next().await {
162 if let Err(error) = self.handle_request(request).await {
163 log::warn!(error:?; "FIDL error");
164 }
165 }
166 Ok(())
167 }
168}
169
170pub struct SessionManager<I: Interface + ?Sized> {
171 interface: Arc<I>,
172 active_requests: ActiveRequests<usize>,
173
174 buffer_allocator: OnceLock<BufferAllocator>,
177}
178
179impl<I: Interface + ?Sized> Drop for SessionManager<I> {
180 fn drop(&mut self) {
181 if let Some(allocator) = self.buffer_allocator.get() {
182 self.interface.on_detach_vmo(allocator.buffer_source().vmo());
183 }
184 }
185}
186
187impl<I: Interface + ?Sized> SessionManager<I> {
188 pub fn new(interface: Arc<I>) -> Self {
189 Self {
190 interface,
191 active_requests: ActiveRequests::default(),
192 buffer_allocator: OnceLock::new(),
193 }
194 }
195
196 pub fn interface(&self) -> &I {
197 self.interface.as_ref()
198 }
199
200 pub async fn serve_session(
202 self: Arc<Self>,
203 stream: fblock::SessionRequestStream,
204 offset_map: OffsetMap,
205 block_size: u32,
206 ) -> Result<(), Error> {
207 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
208 let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
209 let mut stream = stream.fuse();
210 let scope = fasync::Scope::new();
211 let helper = session.helper.clone();
212 let mut fifo_task = scope
213 .spawn(async move {
214 if let Err(status) = session.run_fifo(fifo).await {
215 if status != zx::Status::PEER_CLOSED {
216 log::error!(status:?; "FIFO error");
217 }
218 }
219 })
220 .fuse();
221
222 scopeguard::defer! {
224 for (_, (vmo, _)) in helper.take_vmos() {
225 self.interface.on_detach_vmo(&vmo);
226 }
227 }
228
229 loop {
230 futures::select! {
231 maybe_req = stream.next() => {
232 if let Some(req) = maybe_req {
233 helper.handle_request(req?).await?;
234 } else {
235 break;
236 }
237 }
238 _ = fifo_task => break,
239 }
240 }
241
242 Ok(())
243 }
244}
245
246struct Session<I: Interface + ?Sized> {
247 interface: Arc<I>,
248 helper: Arc<SessionHelper<SessionManager<I>>>,
249}
250
251impl<I: Interface + ?Sized> Session<I> {
252 async fn run_fifo(
254 &self,
255 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
256 ) -> Result<(), zx::Status> {
257 scopeguard::defer! {
258 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
259 }
260
261 let mut fifo = fasync::Fifo::from_fifo(fifo);
271 let (mut reader, mut writer) = fifo.async_io();
272 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
273 let active_requests = &self.helper.session_manager.active_requests;
274 let mut active_request_futures = FuturesUnordered::new();
275 let mut responses = Vec::new();
276
277 let mut map_future = pin!(Fuse::terminated());
282 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
283
284 loop {
285 let new_requests = {
286 let pending_requests = active_request_futures.len() + responses.len();
289 let count = requests.len().saturating_sub(pending_requests);
290 let mut receive_requests = pin!(if count == 0 {
291 Fuse::terminated()
292 } else {
293 reader.read_entries(&mut requests[..count]).fuse()
294 });
295 let mut send_responses = pin!(if responses.is_empty() {
296 Fuse::terminated()
297 } else {
298 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
299 match ready!(writer.try_write(cx, &responses[..])) {
300 Ok(written) => {
301 responses.drain(..written);
302 Poll::Ready(Ok(()))
303 }
304 Err(status) => Poll::Ready(Err(status)),
305 }
306 })
307 .fuse()
308 });
309
310 select_biased!(
313 res = send_responses => {
314 res?;
315 0
316 },
317 response = active_request_futures.select_next_some() => {
318 responses.extend(response);
319 0
320 }
321 result = map_future => {
322 match result {
323 Ok((request, remainder, commit_decompression_buffers)) => {
324 active_request_futures.push(self.process_fifo_request(
325 request,
326 commit_decompression_buffers,
327 ));
328 if let Some(remainder) = remainder {
329 map_future.set(
330 self.map_request_or_get_response(remainder).fuse()
331 );
332 }
333 }
334 Err(response) => responses.extend(response),
335 }
336 if map_future.is_terminated() {
337 if let Some(request) = pending_mappings.pop_front() {
338 map_future.set(self.map_request_or_get_response(request).fuse());
339 }
340 }
341 0
342 }
343 count = receive_requests => {
344 count?
345 }
346 )
347 };
348
349 for request in &mut requests[..new_requests] {
352 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
353 request.assume_init_mut()
354 }) {
355 Ok(DecodedRequest {
356 operation: Operation::CloseVmo, vmo, request_id, ..
357 }) => {
358 if let Some(vmo) = vmo {
359 self.interface.on_detach_vmo(vmo.as_ref());
360 }
361 responses.extend(
362 active_requests
363 .complete_and_take_response(request_id, zx::Status::OK)
364 .map(|(_, response)| response),
365 );
366 }
367 Ok(request) => {
368 if map_future.is_terminated() {
369 map_future.set(self.map_request_or_get_response(request).fuse());
370 } else {
371 pending_mappings.push_back(request);
372 }
373 }
374 Err(None) => {}
375 Err(Some(response)) => responses.push(response),
376 }
377 }
378 }
379 }
380
381 async fn map_request_or_get_response(
382 &self,
383 request: DecodedRequest,
384 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
385 let request_id = request.request_id;
386 self.map_request(request).await.map_err(|status| {
387 self.helper
388 .session_manager
389 .active_requests
390 .complete_and_take_response(request_id, status)
391 .map(|(_, r)| r)
392 })
393 }
394
395 async fn map_request(
398 &self,
399 mut request: DecodedRequest,
400 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
401 let mut active_requests;
402 let active_request;
403 let mut commit_decompression_buffers = false;
404 let flags = self.interface.get_info().as_ref().device_flags();
405 if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
408 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
409 {
410 if let Some(id) = request.trace_flow_id {
411 fuchsia_trace::async_instant!(
412 fuchsia_trace::Id::from(id.get()),
413 "storage",
414 "block_server::SimulatedBarrier",
415 "request_id" => request.request_id.0
416 );
417 }
418 self.interface.flush(request.trace_flow_id).await?;
419 }
420
421 match request.operation {
423 Operation::StartDecompressedRead {
424 required_buffer_size,
425 device_block_offset,
426 block_count,
427 options,
428 } => {
429 let allocator = match self.helper.session_manager.buffer_allocator.get() {
430 Some(a) => a,
431 None => {
432 let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
435 self.interface.on_attach_vmo(&source.vmo()).await?;
436 let allocator = BufferAllocator::new(
437 std::cmp::max(
438 self.helper.block_size as usize,
439 zx::system_get_page_size() as usize,
440 ),
441 source,
442 );
443 self.helper.session_manager.buffer_allocator.set(allocator).unwrap();
444 self.helper.session_manager.buffer_allocator.get().unwrap()
445 }
446 };
447
448 if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
449 return Err(zx::Status::OUT_OF_RANGE);
450 }
451
452 let buffer = allocator.allocate_buffer(required_buffer_size).await;
453 let vmo_offset = buffer.range().start as u64;
454
455 unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
459 unsafe { std::mem::transmute(buffer) }
460 }
461
462 active_requests = self.helper.session_manager.active_requests.0.lock();
463 active_request = &mut active_requests.requests[request.request_id.0];
464
465 active_request.decompression_info.as_mut().unwrap().buffer =
468 Some(unsafe { remove_lifetime(buffer) });
469
470 request.operation = Operation::Read {
471 device_block_offset,
472 block_count,
473 _unused: 0,
474 vmo_offset,
475 options,
476 };
477 request.vmo = Some(allocator.buffer_source().vmo().clone());
478
479 commit_decompression_buffers = true;
480 }
481 Operation::ContinueDecompressedRead {
482 offset,
483 device_block_offset,
484 block_count,
485 options,
486 } => {
487 active_requests = self.helper.session_manager.active_requests.0.lock();
488 active_request = &mut active_requests.requests[request.request_id.0];
489
490 let buffer =
491 active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
492
493 if offset >= buffer.len() as u64
495 || buffer.len() as u64 - offset
496 < block_count as u64 * self.helper.block_size as u64
497 {
498 return Err(zx::Status::OUT_OF_RANGE);
499 }
500
501 request.operation = Operation::Read {
502 device_block_offset,
503 block_count,
504 _unused: 0,
505 vmo_offset: buffer.range().start as u64 + offset,
506 options,
507 };
508
509 let allocator = self.helper.session_manager.buffer_allocator.get().unwrap();
510 request.vmo = Some(allocator.buffer_source().vmo().clone());
511 }
512 _ => {
513 active_requests = self.helper.session_manager.active_requests.0.lock();
514 active_request = &mut active_requests.requests[request.request_id.0];
515 }
516 }
517
518 self.helper
522 .map_request(request, active_request)
523 .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
524 }
525
526 async fn process_fifo_request(
528 &self,
529 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
530 commit_decompression_buffers: bool,
531 ) -> Option<BlockFifoResponse> {
532 let mut needs_postflush = false;
533 let result = match operation {
534 Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
535 join(
536 self.interface.read(
537 device_block_offset,
538 block_count,
539 vmo.as_ref().unwrap(),
540 vmo_offset,
541 options,
542 trace_flow_id,
543 ),
544 async {
545 if commit_decompression_buffers {
546 let (target_slice, buffer_slice, buffer_range) = {
547 let active_request =
548 self.helper.session_manager.active_requests.request(request_id);
549 let info = active_request.decompression_info.as_ref().unwrap();
550 (
551 info.uncompressed_slice(),
552 self.helper
553 .session_manager
554 .buffer_allocator
555 .get()
556 .unwrap()
557 .buffer_source()
558 .slice(),
559 info.buffer.as_ref().unwrap().range(),
560 )
561 };
562 let vmar = fuchsia_runtime::vmar_root_self();
563 let addr = target_slice.addr();
565 let unaligned = addr % zx::system_get_page_size() as usize;
566 if let Err(error) = vmar.op_range(
567 zx::VmarOp::COMMIT,
568 addr - unaligned,
569 target_slice.len() + unaligned,
570 ) {
571 log::warn!(error:?; "Unable to commit target range");
572 }
573 if let Err(error) = vmar.op_range(
575 zx::VmarOp::PREFETCH,
576 buffer_slice.addr() + buffer_range.start,
577 buffer_range.len(),
578 ) {
579 log::warn!(
580 error:?,
581 buffer_range:?;
582 "Unable to prefetch source range",
583 );
584 }
585 }
586 },
587 )
588 .await
589 .0
590 }
591 Operation::Write {
592 device_block_offset,
593 block_count,
594 _unused,
595 vmo_offset,
596 mut options,
597 } => {
598 if options.flags.contains(WriteFlags::FORCE_ACCESS) {
601 let flags = self.interface.get_info().as_ref().device_flags();
602 if !flags.contains(DeviceFlag::FUA_SUPPORT) {
603 options.flags.remove(WriteFlags::FORCE_ACCESS);
604 needs_postflush = true;
605 }
606 }
607 self.interface
608 .write(
609 device_block_offset,
610 block_count,
611 vmo.as_ref().unwrap(),
612 vmo_offset,
613 options,
614 trace_flow_id,
615 )
616 .await
617 }
618 Operation::Flush => self.interface.flush(trace_flow_id).await,
619 Operation::Trim { device_block_offset, block_count } => {
620 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
621 }
622 Operation::CloseVmo
623 | Operation::StartDecompressedRead { .. }
624 | Operation::ContinueDecompressedRead { .. } => {
625 unreachable!()
627 }
628 };
629 let response = self
630 .helper
631 .session_manager
632 .active_requests
633 .complete_and_take_response(request_id, result.into())
634 .map(|(_, r)| r);
635 if let Some(mut response) = response {
636 if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
638 if let Some(id) = trace_flow_id {
639 fuchsia_trace::async_instant!(
640 fuchsia_trace::Id::from(id.get()),
641 "storage",
642 "block_server::SimulatedFUA",
643 "request_id" => request_id.0
644 );
645 }
646 response.status =
647 zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
648 }
649 Some(response)
650 } else {
651 response
652 }
653 }
654}
655
656impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
657 const SUPPORTS_DECOMPRESSION: bool = true;
658
659 type Session = usize;
661
662 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
663 self.interface.on_attach_vmo(vmo).await
664 }
665
666 async fn open_session(
667 self: Arc<Self>,
668 stream: fblock::SessionRequestStream,
669 offset_map: OffsetMap,
670 block_size: u32,
671 ) -> Result<(), Error> {
672 self.interface.clone().open_session(self, stream, offset_map, block_size).await
673 }
674
675 fn get_info(&self) -> Cow<'_, DeviceInfo> {
676 self.interface.get_info()
677 }
678
679 async fn get_volume_info(
680 &self,
681 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
682 self.interface.get_volume_info().await
683 }
684
685 async fn query_slices(
686 &self,
687 start_slices: &[u64],
688 ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
689 self.interface.query_slices(start_slices).await
690 }
691
692 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
693 self.interface.extend(start_slice, slice_count).await
694 }
695
696 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
697 self.interface.shrink(start_slice, slice_count).await
698 }
699
700 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
701 return &self.active_requests;
702 }
703}
704
705impl<I: Interface> IntoSessionManager for Arc<I> {
706 type SM = SessionManager<I>;
707
708 fn into_session_manager(self) -> Arc<Self::SM> {
709 Arc::new(SessionManager {
710 interface: self,
711 active_requests: ActiveRequests::default(),
712 buffer_allocator: OnceLock::new(),
713 })
714 }
715}