1use super::{
6 DecodeResult, IntoSessionManager, OffsetMap, Operation, RequestTracking, SessionHelper,
7};
8use anyhow::Error;
9use block_protocol::{BlockFifoRequest, BlockFifoResponse};
10use fidl::endpoints::RequestStream;
11use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
12use fuchsia_async::{self as fasync, EHandle};
13use fuchsia_sync::{Condvar, Mutex};
14use futures::stream::{AbortHandle, Abortable};
15use futures::TryStreamExt;
16use slab::Slab;
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{c_char, c_void, CStr};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26struct ActiveRequest {
29 session: Arc<Session>,
30 request_tracking: RequestTracking,
31 _vmo: Option<Arc<zx::Vmo>>,
33}
34
35pub struct SessionManager {
36 callbacks: Callbacks,
37 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
38 active_requests: Mutex<Slab<ActiveRequest>>,
39 condvar: Condvar,
40 mbox: ExecutorMailbox,
41 info: super::DeviceInfo,
42}
43
44unsafe impl Send for SessionManager {}
45unsafe impl Sync for SessionManager {}
46
47impl SessionManager {
48 fn start_request(
49 &self,
50 session: Arc<Session>,
51 request_tracking: RequestTracking,
52 operation: Operation,
53 vmo: Option<Arc<zx::Vmo>>,
54 ) -> Request {
55 let mut active_requests = self.active_requests.lock();
56 let vacant = active_requests.vacant_entry();
57 let request = Request {
58 request_id: RequestId(vacant.key()),
59 operation,
60 trace_flow_id: request_tracking.trace_flow_id,
61 vmo: UnownedVmo(
62 vmo.as_ref().map(|vmo| vmo.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
63 ),
64 };
65 vacant.insert(ActiveRequest { session, request_tracking, _vmo: vmo });
66 request
67 }
68
69 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
70 let request = self
71 .active_requests
72 .lock()
73 .try_remove(request_id.0)
74 .unwrap_or_else(|| panic!("Invalid request id {}", request_id.0));
75 request.session.send_reply(request.request_tracking, status);
76 }
77
78 fn terminate(&self) {
79 {
80 #[allow(clippy::collection_is_never_read)]
83 let mut terminated_sessions = Vec::new();
84 for (_, session) in &*self.open_sessions.lock() {
85 if let Some(session) = session.upgrade() {
86 session.terminate();
87 terminated_sessions.push(session);
88 }
89 }
90 }
91 let mut guard = self.open_sessions.lock();
92 self.condvar.wait_while(&mut guard, |s| !s.is_empty());
93 }
94}
95
96impl super::SessionManager for SessionManager {
97 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
98 Ok(())
99 }
100
101 async fn open_session(
102 self: Arc<Self>,
103 mut stream: fblock::SessionRequestStream,
104 offset_map: OffsetMap,
105 block_size: u32,
106 ) -> Result<(), Error> {
107 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
108 let (abort_handle, registration) = AbortHandle::new_pair();
109 let session = Arc::new(Session {
110 manager: self.clone(),
111 helper,
112 fifo,
113 queue: Mutex::default(),
114 abort_handle,
115 });
116 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
117 unsafe {
118 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
119 }
120
121 let result = Abortable::new(
122 async {
123 while let Some(request) = stream.try_next().await? {
124 session.helper.handle_request(request).await?;
125 }
126 Ok(())
127 },
128 registration,
129 )
130 .await
131 .unwrap_or_else(|e| Err(e.into()));
132
133 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
134
135 result
136 }
137
138 async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
139 Ok(Cow::Borrowed(&self.info))
140 }
141}
142
143impl Drop for SessionManager {
144 fn drop(&mut self) {
145 self.terminate();
146 }
147}
148
149impl IntoSessionManager for Arc<SessionManager> {
150 type SM = SessionManager;
151
152 fn into_session_manager(self) -> Self {
153 self
154 }
155}
156
157#[repr(C)]
158pub struct Callbacks {
159 pub context: *mut c_void,
160 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
161 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
162 pub on_requests:
163 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
164 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
165}
166
167impl Callbacks {
168 #[allow(dead_code)]
169 fn log(&self, msg: &str) {
170 let msg = msg.as_bytes();
171 unsafe {
173 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
174 }
175 }
176}
177
178#[allow(dead_code)]
180pub struct UnownedVmo(zx::sys::zx_handle_t);
181
182#[repr(transparent)]
183#[derive(Clone, Copy, Eq, PartialEq, Hash)]
184pub struct RequestId(usize);
185
186#[repr(C)]
187pub struct Request {
188 pub request_id: RequestId,
189 pub operation: Operation,
190 pub trace_flow_id: Option<NonZero<u64>>,
191 pub vmo: UnownedVmo,
192}
193
194unsafe impl Send for Callbacks {}
195unsafe impl Sync for Callbacks {}
196
197pub struct Session {
198 manager: Arc<SessionManager>,
199 helper: SessionHelper<SessionManager>,
200 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
201 queue: Mutex<SessionQueue>,
202 abort_handle: AbortHandle,
203}
204
205#[derive(Default)]
206struct SessionQueue {
207 responses: VecDeque<BlockFifoResponse>,
208}
209
210pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
211
212impl Session {
213 fn run(&self) {
214 self.fifo_loop();
215 self.abort_handle.abort();
216 }
217
218 fn fifo_loop(&self) {
219 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
220
221 loop {
222 let is_queue_empty = {
224 let mut queue = self.queue.lock();
225 while !queue.responses.is_empty() {
226 let (front, _) = queue.responses.as_slices();
227 match self.fifo.write(front) {
228 Ok(count) => {
229 let full = count < front.len();
230 queue.responses.drain(..count);
231 if full {
232 break;
233 }
234 }
235 Err(zx::Status::SHOULD_WAIT) => break,
236 Err(_) => return,
237 }
238 }
239 queue.responses.is_empty()
240 };
241
242 match self.fifo.read_uninit(&mut requests) {
244 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
245 Err(zx::Status::SHOULD_WAIT) => {
246 let mut signals =
247 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
248 if !is_queue_empty {
249 signals |= zx::Signals::OBJECT_WRITABLE;
250 }
251 let Ok(signals) =
252 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
253 else {
254 return;
255 };
256 if signals.contains(zx::Signals::USER_0) {
257 return;
258 }
259 if signals.contains(zx::Signals::USER_1) {
261 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
262 }
263 }
264 Err(_) => return,
265 }
266 }
267 }
268
269 fn handle_requests<'a>(&self, requests: impl Iterator<Item = &'a mut BlockFifoRequest>) {
270 let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
271 unsafe { MaybeUninit::uninit().assume_init() };
272 let mut count = 0;
273 let this = self
274 .manager
275 .open_sessions
276 .lock()
277 .get(&(self as *const _ as usize))
278 .and_then(Weak::upgrade)
279 .unwrap();
280 for request in requests {
281 let mut in_split = false;
282 loop {
283 if count >= MAX_REQUESTS {
284 unsafe {
285 (self.manager.callbacks.on_requests)(
286 self.manager.callbacks.context,
287 c_requests[0].as_mut_ptr(),
288 count,
289 );
290 }
291 count = 0;
292 }
293 match self.helper.decode_fifo_request(request, in_split) {
294 DecodeResult::Ok(decoded_request) => {
295 if let Operation::CloseVmo = decoded_request.operation {
296 self.send_reply(decoded_request.request_tracking, zx::Status::OK);
297 break;
298 }
299 let c_request = self.manager.start_request(
300 this.clone(),
301 decoded_request.request_tracking,
302 decoded_request.operation,
303 decoded_request.vmo,
304 );
305 c_requests[count].write(c_request);
306 count += 1;
307 break;
308 }
309 DecodeResult::Split(decoded_request) => {
310 let c_request = self.manager.start_request(
311 this.clone(),
312 decoded_request.request_tracking,
313 decoded_request.operation,
314 decoded_request.vmo,
315 );
316 c_requests[count].write(c_request);
317 count += 1;
318 in_split = true;
319 }
320 DecodeResult::InvalidRequest(tracking, status) => {
321 self.send_reply(tracking, status);
322 break;
323 }
324 DecodeResult::IgnoreRequest => {
325 break;
326 }
327 }
328 }
329 }
330 if count > 0 {
331 unsafe {
332 (self.manager.callbacks.on_requests)(
333 self.manager.callbacks.context,
334 c_requests[0].as_mut_ptr(),
335 count,
336 );
337 }
338 }
339 }
340
341 fn send_reply(&self, tracking: RequestTracking, status: zx::Status) {
342 let response = match self.helper.finish_fifo_request(tracking, status) {
343 Some(response) => response,
344 None => return,
345 };
346 let mut queue = self.queue.lock();
347 if queue.responses.is_empty() {
348 match self.fifo.write_one(&response) {
349 Ok(()) => {
350 return;
351 }
352 Err(_) => {
353 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
355 }
356 }
357 }
358 queue.responses.push_back(response);
359 }
360
361 fn terminate(&self) {
362 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
363 self.abort_handle.abort();
364 }
365}
366
367impl Drop for Session {
368 fn drop(&mut self) {
369 let notify = {
370 let mut open_sessions = self.manager.open_sessions.lock();
371 open_sessions.remove(&(self as *const _ as usize));
372 open_sessions.is_empty()
373 };
374 if notify {
375 self.manager.condvar.notify_all();
376 }
377 }
378}
379
380pub struct BlockServer {
381 server: super::BlockServer<SessionManager>,
382 ehandle: EHandle,
383 abort_handle: AbortHandle,
384}
385
386struct ExecutorMailbox(Mutex<Mail>, Condvar);
387
388impl ExecutorMailbox {
389 fn post(&self, mail: Mail) -> Mail {
391 let old = std::mem::replace(&mut *self.0.lock(), mail);
392 self.1.notify_all();
393 old
394 }
395
396 fn new() -> Self {
397 Self(Mutex::default(), Condvar::new())
398 }
399}
400
401type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
402
403#[derive(Default)]
404enum Mail {
405 #[default]
406 None,
407 Initialized(EHandle, AbortHandle),
408 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
409 Finished,
410}
411
412impl Drop for BlockServer {
413 fn drop(&mut self) {
414 self.abort_handle.abort();
415 let manager = &self.server.session_manager;
416 let mut mbox = manager.mbox.0.lock();
417 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
418 manager.terminate();
419 debug_assert!(Arc::strong_count(manager) > 0);
420 }
421}
422
423#[repr(C)]
424pub struct PartitionInfo {
425 pub device_flags: u32,
426 pub start_block: u64,
427 pub block_count: u64,
428 pub block_size: u32,
429 pub type_guid: [u8; 16],
430 pub instance_guid: [u8; 16],
431 pub name: *const c_char,
432 pub flags: u64,
433 pub max_transfer_size: u32,
434}
435
436#[allow(non_camel_case_types)]
438type zx_handle_t = zx::sys::zx_handle_t;
439
440#[allow(non_camel_case_types)]
442type zx_status_t = zx::sys::zx_status_t;
443
444impl PartitionInfo {
445 unsafe fn to_rust(&self) -> super::DeviceInfo {
446 super::DeviceInfo::Partition(super::PartitionInfo {
447 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
448 block_range: Some(self.start_block..self.start_block + self.block_count),
449 type_guid: self.type_guid,
450 instance_guid: self.instance_guid,
451 name: if self.name.is_null() {
452 "".to_string()
453 } else {
454 String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
455 },
456 flags: self.flags,
457 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
458 NonZero::new(self.max_transfer_size / self.block_size)
459 } else {
460 None
461 },
462 })
463 }
464}
465
466#[no_mangle]
470pub unsafe extern "C" fn block_server_new(
471 partition_info: &PartitionInfo,
472 callbacks: Callbacks,
473) -> *mut BlockServer {
474 let session_manager = Arc::new(SessionManager {
475 callbacks,
476 open_sessions: Mutex::default(),
477 active_requests: Mutex::new(Slab::with_capacity(MAX_REQUESTS)),
478 condvar: Condvar::new(),
479 mbox: ExecutorMailbox::new(),
480 info: partition_info.to_rust(),
481 });
482
483 (session_manager.callbacks.start_thread)(
484 session_manager.callbacks.context,
485 Arc::into_raw(session_manager.clone()) as *const c_void,
486 );
487
488 let mbox = &session_manager.mbox;
489 let mail = {
490 let mut mail = mbox.0.lock();
491 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
492 std::mem::replace(&mut *mail, Mail::None)
493 };
494
495 let block_size = partition_info.block_size;
496 match mail {
497 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
498 server: super::BlockServer::new(block_size, session_manager),
499 ehandle,
500 abort_handle,
501 })),
502 Mail::Finished => std::ptr::null_mut(),
503 _ => unreachable!(),
504 }
505}
506
507#[no_mangle]
511pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
512 let session_manager = &*(arg as *const SessionManager);
513
514 let mut executor = fasync::LocalExecutor::new();
515 let (abort_handle, registration) = AbortHandle::new_pair();
516
517 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
518
519 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
520}
521
522#[no_mangle]
529pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
530 let mail = {
531 let session_manager = Arc::from_raw(arg as *const SessionManager);
532 debug_assert!(Arc::strong_count(&session_manager) > 0);
533 session_manager.mbox.post(Mail::Finished)
534 };
535
536 if let Mail::AsyncShutdown(server, callback, arg) = mail {
537 std::mem::drop(server);
538 unsafe {
540 callback(arg);
541 }
542 }
543}
544
545#[no_mangle]
549pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
550 let _ = Box::from_raw(block_server);
551}
552
553#[no_mangle]
557pub unsafe extern "C" fn block_server_delete_async(
558 block_server: *mut BlockServer,
559 callback: ShutdownCallback,
560 arg: *mut c_void,
561) {
562 let block_server = Box::from_raw(block_server);
563 let session_manager = block_server.server.session_manager.clone();
564 let abort_handle = block_server.abort_handle.clone();
565 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
566 abort_handle.abort();
567}
568
569#[no_mangle]
575pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
576 let block_server = &*block_server;
577 let ehandle = &block_server.ehandle;
578 let handle = zx::Handle::from_raw(handle);
579 ehandle.global_scope().spawn(async move {
580 let _ = block_server
581 .server
582 .handle_requests(fvolume::VolumeRequestStream::from_channel(
583 fasync::Channel::from_channel(handle.into()),
584 ))
585 .await;
586 });
587}
588
589#[no_mangle]
593pub unsafe extern "C" fn block_server_session_run(session: &Session) {
594 session.run();
595}
596
597#[no_mangle]
601pub unsafe extern "C" fn block_server_session_release(session: &Session) {
602 session.terminate();
603 Arc::from_raw(session);
604}
605
606#[no_mangle]
610pub unsafe extern "C" fn block_server_send_reply(
611 block_server: &BlockServer,
612 request_id: RequestId,
613 status: zx_status_t,
614) {
615 block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
616}