1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation,
7 SessionHelper, TraceFlowId, FIFO_MAX_REQUESTS,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
11use futures::future::{Fuse, FusedFuture};
12use futures::stream::FuturesUnordered;
13use futures::{select_biased, FutureExt, StreamExt};
14use std::borrow::Cow;
15use std::collections::VecDeque;
16use std::future::{poll_fn, Future};
17use std::mem::MaybeUninit;
18use std::pin::pin;
19use std::sync::Arc;
20use std::task::{ready, Poll};
21use {
22 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
23 fuchsia_async as fasync,
24};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27 fn open_session(
42 &self,
43 session_manager: Arc<SessionManager<Self>>,
44 stream: fblock::SessionRequestStream,
45 offset_map: OffsetMap,
46 block_size: u32,
47 ) -> impl Future<Output = Result<(), Error>> + Send {
48 session_manager.serve_session(stream, offset_map, block_size)
50 }
51
52 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56 async { Ok(()) }
57 }
58
59 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
64
65 fn read(
67 &self,
68 device_block_offset: u64,
69 block_count: u32,
70 vmo: &Arc<zx::Vmo>,
71 vmo_offset: u64, trace_flow_id: TraceFlowId,
73 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
74
75 fn write(
78 &self,
79 device_block_offset: u64,
80 block_count: u32,
81 vmo: &Arc<zx::Vmo>,
82 vmo_offset: u64, opts: WriteOptions,
84 trace_flow_id: TraceFlowId,
85 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87 fn barrier(&self) -> Result<(), zx::Status>;
94
95 fn flush(
97 &self,
98 trace_flow_id: TraceFlowId,
99 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101 fn trim(
103 &self,
104 device_block_offset: u64,
105 block_count: u32,
106 trace_flow_id: TraceFlowId,
107 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
108
109 fn get_volume_info(
111 &self,
112 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
113 {
114 async { Err(zx::Status::NOT_SUPPORTED) }
115 }
116
117 fn query_slices(
119 &self,
120 _start_slices: &[u64],
121 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
122 async { Err(zx::Status::NOT_SUPPORTED) }
123 }
124
125 fn extend(
127 &self,
128 _start_slice: u64,
129 _slice_count: u64,
130 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
131 async { Err(zx::Status::NOT_SUPPORTED) }
132 }
133
134 fn shrink(
136 &self,
137 _start_slice: u64,
138 _slice_count: u64,
139 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
140 async { Err(zx::Status::NOT_SUPPORTED) }
141 }
142}
143
144pub struct PassthroughSession(fblock::SessionProxy);
146
147impl PassthroughSession {
148 pub fn new(proxy: fblock::SessionProxy) -> Self {
149 Self(proxy)
150 }
151
152 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
153 match request {
154 fblock::SessionRequest::GetFifo { responder } => {
155 responder.send(self.0.get_fifo().await?)?;
156 }
157 fblock::SessionRequest::AttachVmo { vmo, responder } => {
158 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
159 }
160 fblock::SessionRequest::Close { responder } => {
161 responder.send(self.0.close().await?)?;
162 }
163 }
164 Ok(())
165 }
166
167 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
169 while let Some(Ok(request)) = stream.next().await {
170 if let Err(error) = self.handle_request(request).await {
171 log::warn!(error:?; "FIDL error");
172 }
173 }
174 Ok(())
175 }
176}
177
178pub struct SessionManager<I: Interface + ?Sized> {
179 interface: Arc<I>,
180 active_requests: ActiveRequests<usize>,
181}
182
183impl<I: Interface + ?Sized> SessionManager<I> {
184 pub fn new(interface: Arc<I>) -> Self {
185 Self { interface, active_requests: ActiveRequests::default() }
186 }
187
188 pub fn interface(&self) -> &I {
189 self.interface.as_ref()
190 }
191
192 pub async fn serve_session(
194 self: Arc<Self>,
195 stream: fblock::SessionRequestStream,
196 offset_map: OffsetMap,
197 block_size: u32,
198 ) -> Result<(), Error> {
199 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
200 let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
201 let mut stream = stream.fuse();
202 let scope = fasync::Scope::new();
203 let helper = session.helper.clone();
204 let mut fifo_task = scope
205 .spawn(async move {
206 if let Err(status) = session.run_fifo(fifo).await {
207 if status != zx::Status::PEER_CLOSED {
208 log::error!(status:?; "FIFO error");
209 }
210 }
211 })
212 .fuse();
213
214 scopeguard::defer! {
216 for (_, vmo) in helper.take_vmos() {
217 self.interface.on_detach_vmo(&vmo);
218 }
219 }
220
221 loop {
222 futures::select! {
223 maybe_req = stream.next() => {
224 if let Some(req) = maybe_req {
225 helper.handle_request(req?).await?;
226 } else {
227 break;
228 }
229 }
230 _ = fifo_task => break,
231 }
232 }
233
234 Ok(())
235 }
236}
237
238struct Session<I: Interface + ?Sized> {
239 interface: Arc<I>,
240 helper: Arc<SessionHelper<SessionManager<I>>>,
241}
242
243impl<I: Interface + ?Sized> Session<I> {
244 async fn run_fifo(
246 &self,
247 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
248 ) -> Result<(), zx::Status> {
249 scopeguard::defer! {
250 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
251 }
252
253 let mut fifo = fasync::Fifo::from_fifo(fifo);
263 let (mut reader, mut writer) = fifo.async_io();
264 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
265 let active_requests = &self.helper.session_manager.active_requests;
266 let mut active_request_futures = FuturesUnordered::new();
267 let mut responses = Vec::new();
268
269 let mut map_future = pin!(Fuse::terminated());
274 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
275
276 loop {
277 let new_requests = {
278 let pending_requests = active_request_futures.len() + responses.len();
281 let count = requests.len().saturating_sub(pending_requests);
282 let mut receive_requests = pin!(if count == 0 {
283 Fuse::terminated()
284 } else {
285 reader.read_entries(&mut requests[..count]).fuse()
286 });
287 let mut send_responses = pin!(if responses.is_empty() {
288 Fuse::terminated()
289 } else {
290 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
291 match ready!(writer.try_write(cx, &responses[..])) {
292 Ok(written) => {
293 responses.drain(..written);
294 Poll::Ready(Ok(()))
295 }
296 Err(status) => Poll::Ready(Err(status)),
297 }
298 })
299 .fuse()
300 });
301
302 select_biased!(
305 res = send_responses => {
306 res?;
307 0
308 },
309 response = active_request_futures.select_next_some() => {
310 responses.extend(response);
311 0
312 }
313 result = map_future => {
314 match result {
315 Ok((request, remainder)) => {
316 active_request_futures.push(self.process_fifo_request(request));
317 if let Some(remainder) = remainder {
318 map_future.set(self.map_request(remainder).fuse());
319 }
320 }
321 Err(response) => responses.extend(response),
322 }
323 if map_future.is_terminated() {
324 if let Some(request) = pending_mappings.pop_front() {
325 map_future.set(self.map_request(request).fuse());
326 }
327 }
328 0
329 }
330 count = receive_requests => {
331 count?
332 }
333 )
334 };
335
336 for request in &mut requests[..new_requests] {
339 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
340 request.assume_init_mut()
341 }) {
342 Ok(DecodedRequest {
343 operation: Operation::CloseVmo, vmo, request_id, ..
344 }) => {
345 if let Some(vmo) = vmo {
346 self.interface.on_detach_vmo(vmo.as_ref());
347 }
348 responses.extend(
349 active_requests
350 .complete_and_take_response(request_id, zx::Status::OK)
351 .map(|(_, response)| response),
352 );
353 }
354 Ok(mut request) => {
355 if let Err(status) = self.maybe_issue_barrier(&mut request) {
359 let response = self
360 .helper
361 .session_manager
362 .active_requests
363 .complete_and_take_response(request.request_id, status)
364 .map(|(_, r)| r);
365 responses.extend(response);
366 } else if map_future.is_terminated() {
367 map_future.set(self.map_request(request).fuse());
368 } else {
369 pending_mappings.push_back(request);
370 }
371 }
372 Err(None) => {}
373 Err(Some(response)) => responses.push(response),
374 }
375 }
376 }
377 }
378
379 fn maybe_issue_barrier(&self, request: &mut DecodedRequest) -> Result<(), zx::Status> {
380 if let Operation::Write {
381 device_block_offset: _,
382 block_count: _,
383 mut options,
384 vmo_offset: _,
385 } = &request.operation
386 {
387 if options.contains(WriteOptions::PRE_BARRIER) {
388 self.interface.barrier()?;
389 options &= !WriteOptions::PRE_BARRIER;
390 }
391 }
392 Ok(())
393 }
394
395 async fn map_request(
397 &self,
398 request: DecodedRequest,
399 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
400 self.helper.map_request(request)
401 }
402
403 async fn process_fifo_request(
405 &self,
406 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
407 ) -> Option<BlockFifoResponse> {
408 let result = match operation {
409 Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
410 self.interface
411 .read(
412 device_block_offset,
413 block_count,
414 vmo.as_ref().unwrap(),
415 vmo_offset,
416 trace_flow_id,
417 )
418 .await
419 }
420 Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
421 self.interface
422 .write(
423 device_block_offset,
424 block_count,
425 vmo.as_ref().unwrap(),
426 vmo_offset,
427 options,
428 trace_flow_id,
429 )
430 .await
431 }
432 Operation::Flush => self.interface.flush(trace_flow_id).await,
433 Operation::Trim { device_block_offset, block_count } => {
434 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
435 }
436 Operation::CloseVmo => {
437 unreachable!()
439 }
440 };
441 self.helper
442 .session_manager
443 .active_requests
444 .complete_and_take_response(request_id, result.into())
445 .map(|(_, r)| r)
446 }
447}
448
449impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
450 type Session = usize;
452
453 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
454 self.interface.on_attach_vmo(vmo).await
455 }
456
457 async fn open_session(
458 self: Arc<Self>,
459 stream: fblock::SessionRequestStream,
460 offset_map: OffsetMap,
461 block_size: u32,
462 ) -> Result<(), Error> {
463 self.interface.clone().open_session(self, stream, offset_map, block_size).await
464 }
465
466 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
467 self.interface.get_info().await
468 }
469
470 async fn get_volume_info(
471 &self,
472 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
473 self.interface.get_volume_info().await
474 }
475
476 async fn query_slices(
477 &self,
478 start_slices: &[u64],
479 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
480 self.interface.query_slices(start_slices).await
481 }
482
483 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
484 self.interface.extend(start_slice, slice_count).await
485 }
486
487 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
488 self.interface.shrink(start_slice, slice_count).await
489 }
490
491 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
492 return &self.active_requests;
493 }
494}
495
496impl<I: Interface> IntoSessionManager for Arc<I> {
497 type SM = SessionManager<I>;
498
499 fn into_session_manager(self) -> Arc<Self::SM> {
500 Arc::new(SessionManager { interface: self, active_requests: ActiveRequests::default() })
501 }
502}