_block_server_c_rustc_static/
async_interface.rs1use super::{
6 DecodeResult, DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation,
7 SessionHelper, FIFO_MAX_REQUESTS,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
11use futures::future::Fuse;
12use futures::stream::FuturesUnordered;
13use futures::{select_biased, FutureExt, StreamExt};
14use std::borrow::Cow;
15use std::future::{poll_fn, Future};
16use std::mem::MaybeUninit;
17use std::num::NonZero;
18use std::pin::pin;
19use std::sync::Arc;
20use std::task::{ready, Poll};
21use {
22 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
23 fuchsia_async as fasync,
24};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27 fn open_session(
42 &self,
43 session_manager: Arc<SessionManager<Self>>,
44 stream: fblock::SessionRequestStream,
45 offset_map: OffsetMap,
46 block_size: u32,
47 ) -> impl Future<Output = Result<(), Error>> + Send {
48 session_manager.serve_session(stream, offset_map, block_size)
50 }
51
52 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56 async { Ok(()) }
57 }
58
59 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
64
65 fn read(
67 &self,
68 device_block_offset: u64,
69 block_count: u32,
70 vmo: &Arc<zx::Vmo>,
71 vmo_offset: u64, trace_flow_id: Option<NonZero<u64>>,
73 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
74
75 fn write(
77 &self,
78 device_block_offset: u64,
79 block_count: u32,
80 vmo: &Arc<zx::Vmo>,
81 vmo_offset: u64, opts: WriteOptions,
83 trace_flow_id: Option<NonZero<u64>>,
84 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
85
86 fn flush(
88 &self,
89 trace_flow_id: Option<NonZero<u64>>,
90 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
91
92 fn trim(
94 &self,
95 device_block_offset: u64,
96 block_count: u32,
97 trace_flow_id: Option<NonZero<u64>>,
98 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
99
100 fn get_volume_info(
102 &self,
103 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
104 {
105 async { Err(zx::Status::NOT_SUPPORTED) }
106 }
107
108 fn query_slices(
110 &self,
111 _start_slices: &[u64],
112 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
113 async { Err(zx::Status::NOT_SUPPORTED) }
114 }
115
116 fn extend(
118 &self,
119 _start_slice: u64,
120 _slice_count: u64,
121 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
122 async { Err(zx::Status::NOT_SUPPORTED) }
123 }
124
125 fn shrink(
127 &self,
128 _start_slice: u64,
129 _slice_count: u64,
130 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
131 async { Err(zx::Status::NOT_SUPPORTED) }
132 }
133}
134
135pub struct PassthroughSession(fblock::SessionProxy);
137
138impl PassthroughSession {
139 pub fn new(proxy: fblock::SessionProxy) -> Self {
140 Self(proxy)
141 }
142
143 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
144 match request {
145 fblock::SessionRequest::GetFifo { responder } => {
146 responder.send(self.0.get_fifo().await?)?;
147 }
148 fblock::SessionRequest::AttachVmo { vmo, responder } => {
149 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
150 }
151 fblock::SessionRequest::Close { responder } => {
152 responder.send(self.0.close().await?)?;
153 }
154 }
155 Ok(())
156 }
157
158 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
160 while let Some(Ok(request)) = stream.next().await {
161 if let Err(error) = self.handle_request(request).await {
162 log::warn!(error:?; "FIDL error");
163 }
164 }
165 Ok(())
166 }
167}
168
169pub struct SessionManager<I: ?Sized> {
170 interface: Arc<I>,
171}
172
173impl<I: Interface + ?Sized> SessionManager<I> {
174 pub fn new(interface: Arc<I>) -> Self {
175 Self { interface }
176 }
177
178 pub async fn serve_session(
180 self: Arc<Self>,
181 stream: fblock::SessionRequestStream,
182 offset_map: OffsetMap,
183 block_size: u32,
184 ) -> Result<(), Error> {
185 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
186 let helper = Arc::new(helper);
187 let interface = self.interface.clone();
188
189 let mut stream = stream.fuse();
190
191 let scope = fasync::Scope::new();
192 let helper_clone = helper.clone();
193 let mut fifo_task = scope
194 .spawn(async move {
195 if let Err(status) = run_fifo(fifo, interface, helper).await {
196 if status != zx::Status::PEER_CLOSED {
197 log::error!(status:?; "FIFO error");
198 }
199 }
200 })
201 .fuse();
202
203 scopeguard::defer! {
205 for (_, vmo) in helper_clone.take_vmos() {
206 self.interface.on_detach_vmo(&vmo);
207 }
208 }
209
210 loop {
211 futures::select! {
212 maybe_req = stream.next() => {
213 if let Some(req) = maybe_req {
214 helper_clone.handle_request(req?).await?;
215 } else {
216 break;
217 }
218 }
219 _ = fifo_task => break,
220 }
221 }
222
223 Ok(())
224 }
225}
226
227async fn run_fifo<I: Interface + ?Sized>(
229 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
230 interface: Arc<I>,
231 helper: Arc<SessionHelper<SessionManager<I>>>,
232) -> Result<(), zx::Status> {
233 let mut fifo = fasync::Fifo::from_fifo(fifo);
242 let (mut reader, mut writer) = fifo.async_io();
243 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
244 let mut active_requests = FuturesUnordered::new();
245 let mut responses = vec![];
246
247 loop {
248 let new_requests = {
249 let pending_requests = active_requests.len() + responses.len();
253 let count = requests.len().saturating_sub(pending_requests);
254 let mut receive_requests = pin!(if count == 0 {
255 Fuse::terminated()
256 } else {
257 reader.read_entries(&mut requests[..count]).fuse()
258 });
259 let mut send_responses = pin!(if responses.is_empty() {
260 Fuse::terminated()
261 } else {
262 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
263 match ready!(writer.try_write(cx, &responses[..])) {
264 Ok(written) => {
265 responses.drain(..written);
266 Poll::Ready(Ok(()))
267 }
268 Err(status) => Poll::Ready(Err(status)),
269 }
270 })
271 .fuse()
272 });
273
274 select_biased!(
277 res = send_responses => {
278 res?;
279 0
280 },
281 response = active_requests.select_next_some() => {
282 responses.extend(response);
283 0
284 }
285 count = receive_requests => {
286 count?
287 }
288 )
289 };
290
291 let process_request =
292 async |interface: &Arc<I>,
293 helper: &Arc<SessionHelper<SessionManager<I>>>,
294 decoded_request: DecodedRequest| {
295 let tracking = decoded_request.request_tracking;
296 let status = process_fifo_request(interface.clone(), decoded_request).await.into();
297 helper.finish_fifo_request(tracking, status)
298 };
299 let mut i = 0;
302 let mut in_split = false;
303 while i < new_requests {
304 let request = &mut requests[i];
305 i += 1;
306 match helper.decode_fifo_request(unsafe { request.assume_init_mut() }, in_split) {
307 DecodeResult::Ok(decoded_request) => {
308 in_split = false;
309 if let Operation::CloseVmo = decoded_request.operation {
310 if let Some(vmo) = decoded_request.vmo {
311 interface.on_detach_vmo(vmo.as_ref());
312 }
313 responses.extend(
314 helper.finish_fifo_request(
315 decoded_request.request_tracking,
316 zx::Status::OK,
317 ),
318 );
319 } else {
320 active_requests.push(process_request(&interface, &helper, decoded_request));
321 }
322 }
323 DecodeResult::Split(decoded_request) => {
324 active_requests.push(process_request(&interface, &helper, decoded_request));
325 in_split = true;
327 i -= 1;
328 }
329 DecodeResult::InvalidRequest(tracking, status) => {
330 in_split = false;
331 responses.extend(helper.finish_fifo_request(tracking, status));
332 }
333 DecodeResult::IgnoreRequest => {
334 in_split = false;
335 }
336 }
337 }
338 }
339}
340
341impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
342 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
343 self.interface.on_attach_vmo(vmo).await
344 }
345
346 async fn open_session(
347 self: Arc<Self>,
348 stream: fblock::SessionRequestStream,
349 offset_map: OffsetMap,
350 block_size: u32,
351 ) -> Result<(), Error> {
352 self.interface.clone().open_session(self, stream, offset_map, block_size).await
353 }
354
355 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
356 self.interface.get_info().await
357 }
358
359 async fn get_volume_info(
360 &self,
361 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
362 self.interface.get_volume_info().await
363 }
364
365 async fn query_slices(
366 &self,
367 start_slices: &[u64],
368 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
369 self.interface.query_slices(start_slices).await
370 }
371
372 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
373 self.interface.extend(start_slice, slice_count).await
374 }
375
376 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
377 self.interface.shrink(start_slice, slice_count).await
378 }
379}
380
381impl<I: Interface> IntoSessionManager for Arc<I> {
382 type SM = SessionManager<I>;
383
384 fn into_session_manager(self) -> Arc<Self::SM> {
385 Arc::new(SessionManager { interface: self })
386 }
387}
388
389async fn process_fifo_request<I: Interface + ?Sized>(
391 interface: Arc<I>,
392 r: DecodedRequest,
393) -> Result<(), zx::Status> {
394 let trace_flow_id = r.request_tracking.trace_flow_id;
395 match r.operation {
396 Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
397 interface
398 .read(
399 device_block_offset,
400 block_count,
401 &r.vmo.as_ref().unwrap(),
402 vmo_offset,
403 trace_flow_id,
404 )
405 .await
406 }
407 Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
408 interface
409 .write(
410 device_block_offset,
411 block_count,
412 &r.vmo.as_ref().unwrap(),
413 vmo_offset,
414 options,
415 trace_flow_id,
416 )
417 .await
418 }
419 Operation::Flush => interface.flush(trace_flow_id).await,
420 Operation::Trim { device_block_offset, block_count } => {
421 interface.trim(device_block_offset, block_count, trace_flow_id).await
422 }
423 Operation::CloseVmo => {
424 unreachable!()
426 }
427 }
428}