1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7 Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use futures::future::{Fuse, FusedFuture, join};
12use futures::stream::FuturesUnordered;
13use futures::{FutureExt, StreamExt, select_biased};
14use std::borrow::Cow;
15use std::collections::VecDeque;
16use std::future::{Future, poll_fn};
17use std::mem::MaybeUninit;
18use std::pin::pin;
19use std::sync::{Arc, OnceLock};
20use std::task::{Poll, ready};
21use storage_device::buffer::Buffer;
22use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
23use {
24 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
25 fuchsia_async as fasync,
26};
27
28pub trait Interface: Send + Sync + Unpin + 'static {
29 fn open_session(
44 &self,
45 session_manager: Arc<SessionManager<Self>>,
46 stream: fblock::SessionRequestStream,
47 offset_map: OffsetMap,
48 block_size: u32,
49 ) -> impl Future<Output = Result<(), Error>> + Send {
50 session_manager.serve_session(stream, offset_map, block_size)
52 }
53
54 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
58 async { Ok(()) }
59 }
60
61 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
63
64 fn get_info(&self) -> Cow<'_, DeviceInfo>;
66
67 fn read(
69 &self,
70 device_block_offset: u64,
71 block_count: u32,
72 vmo: &Arc<zx::Vmo>,
73 vmo_offset: u64, opts: ReadOptions,
75 trace_flow_id: TraceFlowId,
76 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
77
78 fn write(
81 &self,
82 device_block_offset: u64,
83 block_count: u32,
84 vmo: &Arc<zx::Vmo>,
85 vmo_offset: u64, opts: WriteOptions,
87 trace_flow_id: TraceFlowId,
88 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
89
90 fn barrier(&self) -> Result<(), zx::Status>;
97
98 fn flush(
100 &self,
101 trace_flow_id: TraceFlowId,
102 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
103
104 fn trim(
106 &self,
107 device_block_offset: u64,
108 block_count: u32,
109 trace_flow_id: TraceFlowId,
110 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
111
112 fn get_volume_info(
114 &self,
115 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
116 {
117 async { Err(zx::Status::NOT_SUPPORTED) }
118 }
119
120 fn query_slices(
122 &self,
123 _start_slices: &[u64],
124 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
125 async { Err(zx::Status::NOT_SUPPORTED) }
126 }
127
128 fn extend(
130 &self,
131 _start_slice: u64,
132 _slice_count: u64,
133 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
134 async { Err(zx::Status::NOT_SUPPORTED) }
135 }
136
137 fn shrink(
139 &self,
140 _start_slice: u64,
141 _slice_count: u64,
142 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
143 async { Err(zx::Status::NOT_SUPPORTED) }
144 }
145}
146
147pub struct PassthroughSession(fblock::SessionProxy);
149
150impl PassthroughSession {
151 pub fn new(proxy: fblock::SessionProxy) -> Self {
152 Self(proxy)
153 }
154
155 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
156 match request {
157 fblock::SessionRequest::GetFifo { responder } => {
158 responder.send(self.0.get_fifo().await?)?;
159 }
160 fblock::SessionRequest::AttachVmo { vmo, responder } => {
161 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
162 }
163 fblock::SessionRequest::Close { responder } => {
164 responder.send(self.0.close().await?)?;
165 }
166 }
167 Ok(())
168 }
169
170 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
172 while let Some(Ok(request)) = stream.next().await {
173 if let Err(error) = self.handle_request(request).await {
174 log::warn!(error:?; "FIDL error");
175 }
176 }
177 Ok(())
178 }
179}
180
181pub struct SessionManager<I: Interface + ?Sized> {
182 interface: Arc<I>,
183 active_requests: ActiveRequests<usize>,
184
185 buffer_allocator: OnceLock<BufferAllocator>,
188}
189
190impl<I: Interface + ?Sized> Drop for SessionManager<I> {
191 fn drop(&mut self) {
192 if let Some(allocator) = self.buffer_allocator.get() {
193 self.interface.on_detach_vmo(allocator.buffer_source().vmo());
194 }
195 }
196}
197
198impl<I: Interface + ?Sized> SessionManager<I> {
199 pub fn new(interface: Arc<I>) -> Self {
200 Self {
201 interface,
202 active_requests: ActiveRequests::default(),
203 buffer_allocator: OnceLock::new(),
204 }
205 }
206
207 pub fn interface(&self) -> &I {
208 self.interface.as_ref()
209 }
210
211 pub async fn serve_session(
213 self: Arc<Self>,
214 stream: fblock::SessionRequestStream,
215 offset_map: OffsetMap,
216 block_size: u32,
217 ) -> Result<(), Error> {
218 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
219 let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
220 let mut stream = stream.fuse();
221 let scope = fasync::Scope::new();
222 let helper = session.helper.clone();
223 let mut fifo_task = scope
224 .spawn(async move {
225 if let Err(status) = session.run_fifo(fifo).await {
226 if status != zx::Status::PEER_CLOSED {
227 log::error!(status:?; "FIFO error");
228 }
229 }
230 })
231 .fuse();
232
233 scopeguard::defer! {
235 for (_, (vmo, _)) in helper.take_vmos() {
236 self.interface.on_detach_vmo(&vmo);
237 }
238 }
239
240 loop {
241 futures::select! {
242 maybe_req = stream.next() => {
243 if let Some(req) = maybe_req {
244 helper.handle_request(req?).await?;
245 } else {
246 break;
247 }
248 }
249 _ = fifo_task => break,
250 }
251 }
252
253 Ok(())
254 }
255}
256
257struct Session<I: Interface + ?Sized> {
258 interface: Arc<I>,
259 helper: Arc<SessionHelper<SessionManager<I>>>,
260}
261
262impl<I: Interface + ?Sized> Session<I> {
263 async fn run_fifo(
265 &self,
266 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
267 ) -> Result<(), zx::Status> {
268 scopeguard::defer! {
269 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
270 }
271
272 let mut fifo = fasync::Fifo::from_fifo(fifo);
282 let (mut reader, mut writer) = fifo.async_io();
283 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
284 let active_requests = &self.helper.session_manager.active_requests;
285 let mut active_request_futures = FuturesUnordered::new();
286 let mut responses = Vec::new();
287
288 let mut map_future = pin!(Fuse::terminated());
293 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
294
295 loop {
296 let new_requests = {
297 let pending_requests = active_request_futures.len() + responses.len();
300 let count = requests.len().saturating_sub(pending_requests);
301 let mut receive_requests = pin!(if count == 0 {
302 Fuse::terminated()
303 } else {
304 reader.read_entries(&mut requests[..count]).fuse()
305 });
306 let mut send_responses = pin!(if responses.is_empty() {
307 Fuse::terminated()
308 } else {
309 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
310 match ready!(writer.try_write(cx, &responses[..])) {
311 Ok(written) => {
312 responses.drain(..written);
313 Poll::Ready(Ok(()))
314 }
315 Err(status) => Poll::Ready(Err(status)),
316 }
317 })
318 .fuse()
319 });
320
321 select_biased!(
324 res = send_responses => {
325 res?;
326 0
327 },
328 response = active_request_futures.select_next_some() => {
329 responses.extend(response);
330 0
331 }
332 result = map_future => {
333 match result {
334 Ok((request, remainder, commit_decompression_buffers)) => {
335 active_request_futures.push(self.process_fifo_request(
336 request,
337 commit_decompression_buffers
338 ));
339 if let Some(remainder) = remainder {
340 map_future.set(
341 self.map_request_or_get_response(remainder).fuse()
342 );
343 }
344 }
345 Err(response) => responses.extend(response),
346 }
347 if map_future.is_terminated() {
348 if let Some(request) = pending_mappings.pop_front() {
349 map_future.set(self.map_request_or_get_response(request).fuse());
350 }
351 }
352 0
353 }
354 count = receive_requests => {
355 count?
356 }
357 )
358 };
359
360 for request in &mut requests[..new_requests] {
363 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
364 request.assume_init_mut()
365 }) {
366 Ok(DecodedRequest {
367 operation: Operation::CloseVmo, vmo, request_id, ..
368 }) => {
369 if let Some(vmo) = vmo {
370 self.interface.on_detach_vmo(vmo.as_ref());
371 }
372 responses.extend(
373 active_requests
374 .complete_and_take_response(request_id, zx::Status::OK)
375 .map(|(_, response)| response),
376 );
377 }
378 Ok(mut request) => {
379 if let Err(status) = self.maybe_issue_barrier(&mut request) {
383 let response = self
384 .helper
385 .session_manager
386 .active_requests
387 .complete_and_take_response(request.request_id, status)
388 .map(|(_, r)| r);
389 responses.extend(response);
390 } else if map_future.is_terminated() {
391 map_future.set(self.map_request_or_get_response(request).fuse());
392 } else {
393 pending_mappings.push_back(request);
394 }
395 }
396 Err(None) => {}
397 Err(Some(response)) => responses.push(response),
398 }
399 }
400 }
401 }
402
403 fn maybe_issue_barrier(&self, request: &mut DecodedRequest) -> Result<(), zx::Status> {
404 if let Operation::Write {
405 device_block_offset: _,
406 block_count: _,
407 _unused,
408 options,
409 vmo_offset: _,
410 ..
411 } = &mut request.operation
412 {
413 if options.flags.contains(WriteFlags::PRE_BARRIER) {
414 self.interface.barrier()?;
415 options.flags &= !WriteFlags::PRE_BARRIER;
416 }
417 }
418 Ok(())
419 }
420
421 async fn map_request_or_get_response(
422 &self,
423 request: DecodedRequest,
424 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
425 let request_id = request.request_id;
426 self.map_request(request).await.map_err(|status| {
427 self.helper
428 .session_manager
429 .active_requests
430 .complete_and_take_response(request_id, status)
431 .map(|(_, r)| r)
432 })
433 }
434
435 async fn map_request(
438 &self,
439 mut request: DecodedRequest,
440 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
441 let mut active_requests;
442 let active_request;
443 let mut commit_decompression_buffers = false;
444 match request.operation {
446 Operation::StartDecompressedRead {
447 required_buffer_size,
448 device_block_offset,
449 block_count,
450 options,
451 } => {
452 let allocator = match self.helper.session_manager.buffer_allocator.get() {
453 Some(a) => a,
454 None => {
455 let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
458 self.interface.on_attach_vmo(&source.vmo()).await?;
459 let allocator = BufferAllocator::new(
460 std::cmp::max(
461 self.helper.block_size as usize,
462 zx::system_get_page_size() as usize,
463 ),
464 source,
465 );
466 self.helper.session_manager.buffer_allocator.set(allocator).unwrap();
467 self.helper.session_manager.buffer_allocator.get().unwrap()
468 }
469 };
470
471 if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
472 return Err(zx::Status::OUT_OF_RANGE);
473 }
474
475 let buffer = allocator.allocate_buffer(required_buffer_size).await;
476 let vmo_offset = buffer.range().start as u64;
477
478 unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
482 unsafe { std::mem::transmute(buffer) }
483 }
484
485 active_requests = self.helper.session_manager.active_requests.0.lock();
486 active_request = &mut active_requests.requests[request.request_id.0];
487
488 active_request.decompression_info.as_mut().unwrap().buffer =
491 Some(unsafe { remove_lifetime(buffer) });
492
493 request.operation = Operation::Read {
494 device_block_offset,
495 block_count,
496 _unused: 0,
497 vmo_offset,
498 options,
499 };
500 request.vmo = Some(allocator.buffer_source().vmo().clone());
501
502 commit_decompression_buffers = true;
503 }
504 Operation::ContinueDecompressedRead {
505 offset,
506 device_block_offset,
507 block_count,
508 options,
509 } => {
510 active_requests = self.helper.session_manager.active_requests.0.lock();
511 active_request = &mut active_requests.requests[request.request_id.0];
512
513 let buffer =
514 active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
515
516 if offset >= buffer.len() as u64
518 || buffer.len() as u64 - offset
519 < block_count as u64 * self.helper.block_size as u64
520 {
521 return Err(zx::Status::OUT_OF_RANGE);
522 }
523
524 request.operation = Operation::Read {
525 device_block_offset,
526 block_count,
527 _unused: 0,
528 vmo_offset: buffer.range().start as u64 + offset,
529 options,
530 };
531
532 let allocator = self.helper.session_manager.buffer_allocator.get().unwrap();
533 request.vmo = Some(allocator.buffer_source().vmo().clone());
534 }
535 _ => {
536 active_requests = self.helper.session_manager.active_requests.0.lock();
537 active_request = &mut active_requests.requests[request.request_id.0];
538 }
539 }
540
541 self.helper
542 .map_request(request, active_request)
543 .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
544 }
545
546 async fn process_fifo_request(
548 &self,
549 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
550 commit_decompression_buffers: bool,
551 ) -> Option<BlockFifoResponse> {
552 let result = match operation {
553 Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
554 join(
555 self.interface.read(
556 device_block_offset,
557 block_count,
558 vmo.as_ref().unwrap(),
559 vmo_offset,
560 options,
561 trace_flow_id,
562 ),
563 async {
564 if commit_decompression_buffers {
565 let (target_slice, buffer_slice, buffer_range) = {
566 let active_request =
567 self.helper.session_manager.active_requests.request(request_id);
568 let info = active_request.decompression_info.as_ref().unwrap();
569 (
570 info.uncompressed_slice(),
571 self.helper
572 .session_manager
573 .buffer_allocator
574 .get()
575 .unwrap()
576 .buffer_source()
577 .slice(),
578 info.buffer.as_ref().unwrap().range(),
579 )
580 };
581 let vmar = fuchsia_runtime::vmar_root_self();
582 let addr = target_slice.addr();
584 let unaligned = addr % zx::system_get_page_size() as usize;
585 if let Err(error) = vmar.op_range(
586 zx::VmarOp::COMMIT,
587 addr - unaligned,
588 target_slice.len() + unaligned,
589 ) {
590 log::warn!(error:?; "Unable to commit target range");
591 }
592 if let Err(error) = vmar.op_range(
594 zx::VmarOp::PREFETCH,
595 buffer_slice.addr() + buffer_range.start,
596 buffer_range.len(),
597 ) {
598 log::warn!(
599 error:?,
600 buffer_range:?;
601 "Unable to prefetch source range",
602 );
603 }
604 }
605 },
606 )
607 .await
608 .0
609 }
610 Operation::Write { device_block_offset, block_count, _unused, vmo_offset, options } => {
611 self.interface
612 .write(
613 device_block_offset,
614 block_count,
615 vmo.as_ref().unwrap(),
616 vmo_offset,
617 options,
618 trace_flow_id,
619 )
620 .await
621 }
622 Operation::Flush => self.interface.flush(trace_flow_id).await,
623 Operation::Trim { device_block_offset, block_count } => {
624 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
625 }
626 Operation::CloseVmo
627 | Operation::StartDecompressedRead { .. }
628 | Operation::ContinueDecompressedRead { .. } => {
629 unreachable!()
631 }
632 };
633 self.helper
634 .session_manager
635 .active_requests
636 .complete_and_take_response(request_id, result.into())
637 .map(|(_, r)| r)
638 }
639}
640
641impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
642 const SUPPORTS_DECOMPRESSION: bool = true;
643
644 type Session = usize;
646
647 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
648 self.interface.on_attach_vmo(vmo).await
649 }
650
651 async fn open_session(
652 self: Arc<Self>,
653 stream: fblock::SessionRequestStream,
654 offset_map: OffsetMap,
655 block_size: u32,
656 ) -> Result<(), Error> {
657 self.interface.clone().open_session(self, stream, offset_map, block_size).await
658 }
659
660 fn get_info(&self) -> Cow<'_, DeviceInfo> {
661 self.interface.get_info()
662 }
663
664 async fn get_volume_info(
665 &self,
666 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
667 self.interface.get_volume_info().await
668 }
669
670 async fn query_slices(
671 &self,
672 start_slices: &[u64],
673 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
674 self.interface.query_slices(start_slices).await
675 }
676
677 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
678 self.interface.extend(start_slice, slice_count).await
679 }
680
681 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
682 self.interface.shrink(start_slice, slice_count).await
683 }
684
685 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
686 return &self.active_requests;
687 }
688}
689
690impl<I: Interface> IntoSessionManager for Arc<I> {
691 type SM = SessionManager<I>;
692
693 fn into_session_manager(self) -> Arc<Self::SM> {
694 Arc::new(SessionManager {
695 interface: self,
696 active_requests: ActiveRequests::default(),
697 buffer_allocator: OnceLock::new(),
698 })
699 }
700}