1use super::{
6 ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
7 SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl::endpoints::RequestStream;
12use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
13use fuchsia_async::{self as fasync, EHandle};
14use fuchsia_sync::{Condvar, Mutex};
15use futures::TryStreamExt;
16use futures::stream::{AbortHandle, Abortable};
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{CStr, c_char, c_void};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26pub struct SessionManager {
27 callbacks: Callbacks,
28 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
29 active_requests: ActiveRequests<Arc<Session>>,
30 condvar: Condvar,
31 mbox: ExecutorMailbox,
32 info: super::DeviceInfo,
33}
34
35unsafe impl Send for SessionManager {}
36unsafe impl Sync for SessionManager {}
37
38impl SessionManager {
39 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
40 if let Some((session, response)) =
41 self.active_requests.complete_and_take_response(request_id, status)
42 {
43 session.send_response(response);
44 }
45 }
46
47 fn terminate(&self) {
48 {
49 #[allow(clippy::collection_is_never_read)]
52 let mut terminated_sessions = Vec::new();
53 for (_, session) in &*self.open_sessions.lock() {
54 if let Some(session) = session.upgrade() {
55 session.terminate();
56 terminated_sessions.push(session);
57 }
58 }
59 }
60 let mut guard = self.open_sessions.lock();
61 self.condvar.wait_while(&mut guard, |s| !s.is_empty());
62 }
63}
64
65impl super::SessionManager for SessionManager {
66 const SUPPORTS_DECOMPRESSION: bool = false;
67 type Session = Arc<Session>;
68
69 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
70 Ok(())
71 }
72
73 async fn open_session(
74 self: Arc<Self>,
75 mut stream: fblock::SessionRequestStream,
76 offset_map: OffsetMap,
77 block_size: u32,
78 ) -> Result<(), Error> {
79 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
80 let (abort_handle, registration) = AbortHandle::new_pair();
81 let session = Arc::new(Session {
82 manager: self.clone(),
83 helper,
84 fifo,
85 queue: Mutex::default(),
86 abort_handle,
87 });
88 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
89 unsafe {
90 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
91 }
92
93 let result = Abortable::new(
94 async {
95 while let Some(request) = stream.try_next().await? {
96 session.helper.handle_request(request).await?;
97 }
98 Ok(())
99 },
100 registration,
101 )
102 .await
103 .unwrap_or_else(|e| Err(e.into()));
104
105 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
106
107 result
108 }
109
110 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
111 Cow::Borrowed(&self.info)
112 }
113
114 fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
115 &self.active_requests
116 }
117}
118
119impl Drop for SessionManager {
120 fn drop(&mut self) {
121 self.terminate();
122 }
123}
124
125impl IntoSessionManager for Arc<SessionManager> {
126 type SM = SessionManager;
127
128 fn into_session_manager(self) -> Self {
129 self
130 }
131}
132
133#[repr(C)]
134pub struct Callbacks {
135 pub context: *mut c_void,
136 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
137 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
138 pub on_requests:
139 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
140 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
141}
142
143impl Callbacks {
144 #[allow(dead_code)]
145 fn log(&self, msg: &str) {
146 let msg = msg.as_bytes();
147 unsafe {
149 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
150 }
151 }
152}
153
154#[allow(dead_code)]
156pub struct UnownedVmo(zx::sys::zx_handle_t);
157
158#[repr(C)]
159pub struct Request {
160 pub request_id: RequestId,
161 pub operation: Operation,
162 pub trace_flow_id: TraceFlowId,
163 pub vmo: UnownedVmo,
164}
165
166unsafe impl Send for Callbacks {}
167unsafe impl Sync for Callbacks {}
168
169pub struct Session {
170 manager: Arc<SessionManager>,
171 helper: SessionHelper<SessionManager>,
172 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
173 queue: Mutex<SessionQueue>,
174 abort_handle: AbortHandle,
175}
176
177#[derive(Default)]
178struct SessionQueue {
179 responses: VecDeque<BlockFifoResponse>,
180}
181
182pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
183
184impl Session {
185 fn run(self: &Arc<Self>) {
186 self.fifo_loop();
187 self.abort_handle.abort();
188 self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
189 }
190
191 fn fifo_loop(self: &Arc<Self>) {
192 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
193
194 loop {
195 let is_queue_empty = {
197 let mut queue = self.queue.lock();
198 while !queue.responses.is_empty() {
199 let (front, _) = queue.responses.as_slices();
200 match self.fifo.write(front) {
201 Ok(count) => {
202 let full = count < front.len();
203 queue.responses.drain(..count);
204 if full {
205 break;
206 }
207 }
208 Err(zx::Status::SHOULD_WAIT) => break,
209 Err(_) => return,
210 }
211 }
212 queue.responses.is_empty()
213 };
214
215 match self.fifo.read_uninit(&mut requests) {
217 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
218 Err(zx::Status::SHOULD_WAIT) => {
219 let mut signals =
220 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
221 if !is_queue_empty {
222 signals |= zx::Signals::OBJECT_WRITABLE;
223 }
224 let Ok(signals) =
225 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
226 else {
227 return;
228 };
229 if signals.contains(zx::Signals::USER_0) {
230 return;
231 }
232 if signals.contains(zx::Signals::USER_1) {
234 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
235 }
236 }
237 Err(_) => return,
238 }
239 }
240 }
241
242 fn handle_requests<'a>(
243 self: &Arc<Self>,
244 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
245 ) {
246 let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
247 unsafe { MaybeUninit::uninit().assume_init() };
248 let mut count = 0;
249 for request in requests {
250 match self.helper.decode_fifo_request(self.clone(), request) {
251 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
252 self.complete_request(request_id, zx::Status::OK);
253 }
254 Ok(mut request) => {
255 let request_id = request.request_id;
256 loop {
257 let map_result = self.helper.map_request(
258 request,
259 &mut self.helper.session_manager.active_requests.request(request_id),
260 );
261 match map_result {
262 Ok((
263 DecodedRequest { request_id, operation, trace_flow_id, vmo },
264 remainder,
265 )) => {
266 c_requests[count].write(Request {
270 request_id,
271 operation,
272 trace_flow_id,
273 vmo: UnownedVmo(
274 vmo.as_ref()
275 .map(|vmo| vmo.raw_handle())
276 .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
277 ),
278 });
279 count += 1;
280 if count == MAX_REQUESTS {
281 unsafe {
282 (self.manager.callbacks.on_requests)(
283 self.manager.callbacks.context,
284 c_requests[0].as_mut_ptr(),
285 count,
286 );
287 }
288 count = 0;
289 }
290 if let Some(r) = remainder {
291 request = r;
292 } else {
293 break;
294 }
295 }
296 Err(status) => {
297 self.complete_request(request_id, status);
298 break;
299 }
300 }
301 }
302 }
303 Err(None) => {}
304 Err(Some(response)) => self.send_response(response),
305 }
306 }
307 if count > 0 {
308 unsafe {
309 (self.manager.callbacks.on_requests)(
310 self.manager.callbacks.context,
311 c_requests[0].as_mut_ptr(),
312 count,
313 );
314 }
315 }
316 }
317
318 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
319 self.manager.complete_request(request_id, status);
320 }
321
322 fn send_response(&self, response: BlockFifoResponse) {
323 let mut queue = self.queue.lock();
324 if queue.responses.is_empty() {
325 match self.fifo.write_one(&response) {
326 Ok(()) => {
327 return;
328 }
329 Err(_) => {
330 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
332 }
333 }
334 }
335 queue.responses.push_back(response);
336 }
337
338 fn terminate(&self) {
339 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
340 self.abort_handle.abort();
341 }
342}
343
344impl Drop for Session {
345 fn drop(&mut self) {
346 let notify = {
347 let mut open_sessions = self.manager.open_sessions.lock();
348 open_sessions.remove(&(self as *const _ as usize));
349 open_sessions.is_empty()
350 };
351 if notify {
352 self.manager.condvar.notify_all();
353 }
354 }
355}
356
357pub struct BlockServer {
358 server: super::BlockServer<SessionManager>,
359 ehandle: EHandle,
360 abort_handle: AbortHandle,
361}
362
363struct ExecutorMailbox(Mutex<Mail>, Condvar);
364
365impl ExecutorMailbox {
366 fn post(&self, mail: Mail) -> Mail {
368 let old = std::mem::replace(&mut *self.0.lock(), mail);
369 self.1.notify_all();
370 old
371 }
372
373 fn new() -> Self {
374 Self(Mutex::default(), Condvar::new())
375 }
376}
377
378type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
379
380#[derive(Default)]
381enum Mail {
382 #[default]
383 None,
384 Initialized(EHandle, AbortHandle),
385 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
386 Finished,
387}
388
389impl Drop for BlockServer {
390 fn drop(&mut self) {
391 self.abort_handle.abort();
392 let manager = &self.server.session_manager;
393 let mut mbox = manager.mbox.0.lock();
394 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
395 manager.terminate();
396 debug_assert!(Arc::strong_count(manager) > 0);
397 }
398}
399
400#[repr(C)]
401pub struct PartitionInfo {
402 pub device_flags: u32,
403 pub start_block: u64,
404 pub block_count: u64,
405 pub block_size: u32,
406 pub type_guid: [u8; 16],
407 pub instance_guid: [u8; 16],
408 pub name: *const c_char,
409 pub flags: u64,
410 pub max_transfer_size: u32,
411}
412
413#[allow(non_camel_case_types)]
415type zx_handle_t = zx::sys::zx_handle_t;
416
417#[allow(non_camel_case_types)]
419type zx_status_t = zx::sys::zx_status_t;
420
421impl PartitionInfo {
422 unsafe fn to_rust(&self) -> super::DeviceInfo {
426 super::DeviceInfo::Partition(super::PartitionInfo {
427 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
428 block_range: Some(self.start_block..self.start_block + self.block_count),
429 type_guid: self.type_guid,
430 instance_guid: self.instance_guid,
431 name: if self.name.is_null() {
432 "".to_string()
433 } else {
434 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
435 },
436 flags: self.flags,
437 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
438 NonZero::new(self.max_transfer_size / self.block_size)
439 } else {
440 None
441 },
442 })
443 }
444}
445
446#[unsafe(no_mangle)]
450pub unsafe extern "C" fn block_server_new(
451 partition_info: &PartitionInfo,
452 callbacks: Callbacks,
453) -> *mut BlockServer {
454 let session_manager = Arc::new(SessionManager {
455 callbacks,
456 open_sessions: Mutex::default(),
457 active_requests: ActiveRequests::default(),
458 condvar: Condvar::new(),
459 mbox: ExecutorMailbox::new(),
460 info: unsafe { partition_info.to_rust() },
461 });
462
463 unsafe {
464 (session_manager.callbacks.start_thread)(
465 session_manager.callbacks.context,
466 Arc::into_raw(session_manager.clone()) as *const c_void,
467 );
468 }
469
470 let mbox = &session_manager.mbox;
471 let mail = {
472 let mut mail = mbox.0.lock();
473 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
474 std::mem::replace(&mut *mail, Mail::None)
475 };
476
477 let block_size = partition_info.block_size;
478 match mail {
479 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
480 server: super::BlockServer::new(block_size, session_manager),
481 ehandle,
482 abort_handle,
483 })),
484 Mail::Finished => std::ptr::null_mut(),
485 _ => unreachable!(),
486 }
487}
488
489#[unsafe(no_mangle)]
493pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
494 let session_manager = unsafe { &*(arg as *const SessionManager) };
495
496 let mut executor = fasync::LocalExecutor::default();
497 let (abort_handle, registration) = AbortHandle::new_pair();
498
499 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
500
501 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
502}
503
504#[unsafe(no_mangle)]
511pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
512 let mail = {
513 let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
514 debug_assert!(Arc::strong_count(&session_manager) > 0);
515 session_manager.mbox.post(Mail::Finished)
516 };
517
518 if let Mail::AsyncShutdown(server, callback, arg) = mail {
519 std::mem::drop(server);
520 unsafe {
522 callback(arg);
523 }
524 }
525}
526
527#[unsafe(no_mangle)]
531pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
532 let _ = unsafe { Box::from_raw(block_server) };
533}
534
535#[unsafe(no_mangle)]
539pub unsafe extern "C" fn block_server_delete_async(
540 block_server: *mut BlockServer,
541 callback: ShutdownCallback,
542 arg: *mut c_void,
543) {
544 let block_server = unsafe { Box::from_raw(block_server) };
545 let session_manager = block_server.server.session_manager.clone();
546 let abort_handle = block_server.abort_handle.clone();
547 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
548 abort_handle.abort();
549}
550
551#[unsafe(no_mangle)]
557pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
558 let block_server = unsafe { &*block_server };
559 let ehandle = &block_server.ehandle;
560 let handle = unsafe { zx::Handle::from_raw(handle) };
561 ehandle.global_scope().spawn(async move {
562 let _ = block_server
563 .server
564 .handle_requests(fvolume::VolumeRequestStream::from_channel(
565 fasync::Channel::from_channel(handle.into()),
566 ))
567 .await;
568 });
569}
570
571#[unsafe(no_mangle)]
575pub unsafe extern "C" fn block_server_session_run(session: &Session) {
576 let session = unsafe { Arc::from_raw(session) };
577 session.run();
578 let _ = Arc::into_raw(session);
579}
580
581#[unsafe(no_mangle)]
585pub unsafe extern "C" fn block_server_session_release(session: &Session) {
586 session.terminate();
587 unsafe { Arc::from_raw(session) };
588}
589
590#[unsafe(no_mangle)]
594pub unsafe extern "C" fn block_server_send_reply(
595 block_server: &BlockServer,
596 request_id: RequestId,
597 status: zx_status_t,
598) {
599 block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
600}