1use super::{
6 ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
7 SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl::endpoints::RequestStream;
12use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
13use fuchsia_async::{self as fasync, EHandle};
14use fuchsia_sync::{Condvar, Mutex};
15use futures::stream::{AbortHandle, Abortable};
16use futures::TryStreamExt;
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{c_char, c_void, CStr};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26pub struct SessionManager {
27 callbacks: Callbacks,
28 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
29 active_requests: ActiveRequests<Arc<Session>>,
30 condvar: Condvar,
31 mbox: ExecutorMailbox,
32 info: super::DeviceInfo,
33}
34
35unsafe impl Send for SessionManager {}
36unsafe impl Sync for SessionManager {}
37
38impl SessionManager {
39 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
40 if let Some((session, response)) =
41 self.active_requests.complete_and_take_response(request_id, status)
42 {
43 session.send_response(response);
44 }
45 }
46
47 fn terminate(&self) {
48 {
49 #[allow(clippy::collection_is_never_read)]
52 let mut terminated_sessions = Vec::new();
53 for (_, session) in &*self.open_sessions.lock() {
54 if let Some(session) = session.upgrade() {
55 session.terminate();
56 terminated_sessions.push(session);
57 }
58 }
59 }
60 let mut guard = self.open_sessions.lock();
61 self.condvar.wait_while(&mut guard, |s| !s.is_empty());
62 }
63}
64
65impl super::SessionManager for SessionManager {
66 type Session = Arc<Session>;
67
68 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
69 Ok(())
70 }
71
72 async fn open_session(
73 self: Arc<Self>,
74 mut stream: fblock::SessionRequestStream,
75 offset_map: OffsetMap,
76 block_size: u32,
77 ) -> Result<(), Error> {
78 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
79 let (abort_handle, registration) = AbortHandle::new_pair();
80 let session = Arc::new(Session {
81 manager: self.clone(),
82 helper,
83 fifo,
84 queue: Mutex::default(),
85 abort_handle,
86 });
87 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
88 unsafe {
89 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
90 }
91
92 let result = Abortable::new(
93 async {
94 while let Some(request) = stream.try_next().await? {
95 session.helper.handle_request(request).await?;
96 }
97 Ok(())
98 },
99 registration,
100 )
101 .await
102 .unwrap_or_else(|e| Err(e.into()));
103
104 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
105
106 result
107 }
108
109 async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
110 Ok(Cow::Borrowed(&self.info))
111 }
112
113 fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
114 &self.active_requests
115 }
116}
117
118impl Drop for SessionManager {
119 fn drop(&mut self) {
120 self.terminate();
121 }
122}
123
124impl IntoSessionManager for Arc<SessionManager> {
125 type SM = SessionManager;
126
127 fn into_session_manager(self) -> Self {
128 self
129 }
130}
131
132#[repr(C)]
133pub struct Callbacks {
134 pub context: *mut c_void,
135 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
136 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
137 pub on_requests:
138 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
139 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
140}
141
142impl Callbacks {
143 #[allow(dead_code)]
144 fn log(&self, msg: &str) {
145 let msg = msg.as_bytes();
146 unsafe {
148 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
149 }
150 }
151}
152
153#[allow(dead_code)]
155pub struct UnownedVmo(zx::sys::zx_handle_t);
156
157#[repr(C)]
158pub struct Request {
159 pub request_id: RequestId,
160 pub operation: Operation,
161 pub trace_flow_id: TraceFlowId,
162 pub vmo: UnownedVmo,
163}
164
165unsafe impl Send for Callbacks {}
166unsafe impl Sync for Callbacks {}
167
168pub struct Session {
169 manager: Arc<SessionManager>,
170 helper: SessionHelper<SessionManager>,
171 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
172 queue: Mutex<SessionQueue>,
173 abort_handle: AbortHandle,
174}
175
176#[derive(Default)]
177struct SessionQueue {
178 responses: VecDeque<BlockFifoResponse>,
179}
180
181pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
182
183impl Session {
184 fn run(self: &Arc<Self>) {
185 self.fifo_loop();
186 self.abort_handle.abort();
187 self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
188 }
189
190 fn fifo_loop(self: &Arc<Self>) {
191 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
192
193 loop {
194 let is_queue_empty = {
196 let mut queue = self.queue.lock();
197 while !queue.responses.is_empty() {
198 let (front, _) = queue.responses.as_slices();
199 match self.fifo.write(front) {
200 Ok(count) => {
201 let full = count < front.len();
202 queue.responses.drain(..count);
203 if full {
204 break;
205 }
206 }
207 Err(zx::Status::SHOULD_WAIT) => break,
208 Err(_) => return,
209 }
210 }
211 queue.responses.is_empty()
212 };
213
214 match self.fifo.read_uninit(&mut requests) {
216 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
217 Err(zx::Status::SHOULD_WAIT) => {
218 let mut signals =
219 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
220 if !is_queue_empty {
221 signals |= zx::Signals::OBJECT_WRITABLE;
222 }
223 let Ok(signals) =
224 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
225 else {
226 return;
227 };
228 if signals.contains(zx::Signals::USER_0) {
229 return;
230 }
231 if signals.contains(zx::Signals::USER_1) {
233 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
234 }
235 }
236 Err(_) => return,
237 }
238 }
239 }
240
241 fn handle_requests<'a>(
242 self: &Arc<Self>,
243 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
244 ) {
245 let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
246 unsafe { MaybeUninit::uninit().assume_init() };
247 let mut count = 0;
248 for request in requests {
249 match self.helper.decode_fifo_request(self.clone(), request) {
250 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
251 self.complete_request(request_id, zx::Status::OK);
252 }
253 Ok(mut request) => loop {
254 match self.helper.map_request(request) {
255 Ok((
256 DecodedRequest { request_id, operation, trace_flow_id, vmo },
257 remainder,
258 )) => {
259 c_requests[count].write(Request {
263 request_id,
264 operation,
265 trace_flow_id,
266 vmo: UnownedVmo(
267 vmo.as_ref()
268 .map(|vmo| vmo.raw_handle())
269 .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
270 ),
271 });
272 count += 1;
273 if count == MAX_REQUESTS {
274 unsafe {
275 (self.manager.callbacks.on_requests)(
276 self.manager.callbacks.context,
277 c_requests[0].as_mut_ptr(),
278 count,
279 );
280 }
281 count = 0;
282 }
283 if let Some(r) = remainder {
284 request = r;
285 } else {
286 break;
287 }
288 }
289 Err(Some(response)) => {
290 self.send_response(response);
291 break;
292 }
293 Err(None) => break,
294 }
295 },
296 Err(None) => {}
297 Err(Some(response)) => self.send_response(response),
298 }
299 }
300 if count > 0 {
301 unsafe {
302 (self.manager.callbacks.on_requests)(
303 self.manager.callbacks.context,
304 c_requests[0].as_mut_ptr(),
305 count,
306 );
307 }
308 }
309 }
310
311 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
312 self.manager.complete_request(request_id, status);
313 }
314
315 fn send_response(&self, response: BlockFifoResponse) {
316 let mut queue = self.queue.lock();
317 if queue.responses.is_empty() {
318 match self.fifo.write_one(&response) {
319 Ok(()) => {
320 return;
321 }
322 Err(_) => {
323 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
325 }
326 }
327 }
328 queue.responses.push_back(response);
329 }
330
331 fn terminate(&self) {
332 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
333 self.abort_handle.abort();
334 }
335}
336
337impl Drop for Session {
338 fn drop(&mut self) {
339 let notify = {
340 let mut open_sessions = self.manager.open_sessions.lock();
341 open_sessions.remove(&(self as *const _ as usize));
342 open_sessions.is_empty()
343 };
344 if notify {
345 self.manager.condvar.notify_all();
346 }
347 }
348}
349
350pub struct BlockServer {
351 server: super::BlockServer<SessionManager>,
352 ehandle: EHandle,
353 abort_handle: AbortHandle,
354}
355
356struct ExecutorMailbox(Mutex<Mail>, Condvar);
357
358impl ExecutorMailbox {
359 fn post(&self, mail: Mail) -> Mail {
361 let old = std::mem::replace(&mut *self.0.lock(), mail);
362 self.1.notify_all();
363 old
364 }
365
366 fn new() -> Self {
367 Self(Mutex::default(), Condvar::new())
368 }
369}
370
371type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
372
373#[derive(Default)]
374enum Mail {
375 #[default]
376 None,
377 Initialized(EHandle, AbortHandle),
378 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
379 Finished,
380}
381
382impl Drop for BlockServer {
383 fn drop(&mut self) {
384 self.abort_handle.abort();
385 let manager = &self.server.session_manager;
386 let mut mbox = manager.mbox.0.lock();
387 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
388 manager.terminate();
389 debug_assert!(Arc::strong_count(manager) > 0);
390 }
391}
392
393#[repr(C)]
394pub struct PartitionInfo {
395 pub device_flags: u32,
396 pub start_block: u64,
397 pub block_count: u64,
398 pub block_size: u32,
399 pub type_guid: [u8; 16],
400 pub instance_guid: [u8; 16],
401 pub name: *const c_char,
402 pub flags: u64,
403 pub max_transfer_size: u32,
404}
405
406#[allow(non_camel_case_types)]
408type zx_handle_t = zx::sys::zx_handle_t;
409
410#[allow(non_camel_case_types)]
412type zx_status_t = zx::sys::zx_status_t;
413
414impl PartitionInfo {
415 unsafe fn to_rust(&self) -> super::DeviceInfo {
416 super::DeviceInfo::Partition(super::PartitionInfo {
417 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
418 block_range: Some(self.start_block..self.start_block + self.block_count),
419 type_guid: self.type_guid,
420 instance_guid: self.instance_guid,
421 name: if self.name.is_null() {
422 "".to_string()
423 } else {
424 String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
425 },
426 flags: self.flags,
427 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
428 NonZero::new(self.max_transfer_size / self.block_size)
429 } else {
430 None
431 },
432 })
433 }
434}
435
436#[no_mangle]
440pub unsafe extern "C" fn block_server_new(
441 partition_info: &PartitionInfo,
442 callbacks: Callbacks,
443) -> *mut BlockServer {
444 let session_manager = Arc::new(SessionManager {
445 callbacks,
446 open_sessions: Mutex::default(),
447 active_requests: ActiveRequests::default(),
448 condvar: Condvar::new(),
449 mbox: ExecutorMailbox::new(),
450 info: partition_info.to_rust(),
451 });
452
453 (session_manager.callbacks.start_thread)(
454 session_manager.callbacks.context,
455 Arc::into_raw(session_manager.clone()) as *const c_void,
456 );
457
458 let mbox = &session_manager.mbox;
459 let mail = {
460 let mut mail = mbox.0.lock();
461 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
462 std::mem::replace(&mut *mail, Mail::None)
463 };
464
465 let block_size = partition_info.block_size;
466 match mail {
467 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
468 server: super::BlockServer::new(block_size, session_manager),
469 ehandle,
470 abort_handle,
471 })),
472 Mail::Finished => std::ptr::null_mut(),
473 _ => unreachable!(),
474 }
475}
476
477#[no_mangle]
481pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
482 let session_manager = &*(arg as *const SessionManager);
483
484 let mut executor = fasync::LocalExecutor::new();
485 let (abort_handle, registration) = AbortHandle::new_pair();
486
487 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
488
489 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
490}
491
492#[no_mangle]
499pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
500 let mail = {
501 let session_manager = Arc::from_raw(arg as *const SessionManager);
502 debug_assert!(Arc::strong_count(&session_manager) > 0);
503 session_manager.mbox.post(Mail::Finished)
504 };
505
506 if let Mail::AsyncShutdown(server, callback, arg) = mail {
507 std::mem::drop(server);
508 unsafe {
510 callback(arg);
511 }
512 }
513}
514
515#[no_mangle]
519pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
520 let _ = Box::from_raw(block_server);
521}
522
523#[no_mangle]
527pub unsafe extern "C" fn block_server_delete_async(
528 block_server: *mut BlockServer,
529 callback: ShutdownCallback,
530 arg: *mut c_void,
531) {
532 let block_server = Box::from_raw(block_server);
533 let session_manager = block_server.server.session_manager.clone();
534 let abort_handle = block_server.abort_handle.clone();
535 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
536 abort_handle.abort();
537}
538
539#[no_mangle]
545pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
546 let block_server = &*block_server;
547 let ehandle = &block_server.ehandle;
548 let handle = zx::Handle::from_raw(handle);
549 ehandle.global_scope().spawn(async move {
550 let _ = block_server
551 .server
552 .handle_requests(fvolume::VolumeRequestStream::from_channel(
553 fasync::Channel::from_channel(handle.into()),
554 ))
555 .await;
556 });
557}
558
559#[no_mangle]
563pub unsafe extern "C" fn block_server_session_run(session: &Session) {
564 let session = Arc::from_raw(session);
565 session.run();
566 let _ = Arc::into_raw(session);
567}
568
569#[no_mangle]
573pub unsafe extern "C" fn block_server_session_release(session: &Session) {
574 session.terminate();
575 Arc::from_raw(session);
576}
577
578#[no_mangle]
582pub unsafe extern "C" fn block_server_send_reply(
583 block_server: &BlockServer,
584 request_id: RequestId,
585 status: zx_status_t,
586) {
587 block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
588}