1use super::{IntoSessionManager, OffsetMap, Operation, RequestId, RequestTracking, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse};
8use fidl::endpoints::RequestStream;
9use fuchsia_async::{self as fasync, EHandle};
10use fuchsia_sync::{Condvar, Mutex};
11use futures::stream::{AbortHandle, Abortable};
12use futures::TryStreamExt;
13use std::borrow::Cow;
14use std::collections::{HashMap, VecDeque};
15use std::ffi::{c_char, c_void, CStr};
16use std::mem::MaybeUninit;
17use std::num::NonZero;
18use std::sync::{Arc, Weak};
19use zx::{self as zx, AsHandleRef as _};
20use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
21
22pub struct SessionManager {
23 callbacks: Callbacks,
24 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
25 condvar: Condvar,
26 mbox: ExecutorMailbox,
27 info: super::DeviceInfo,
28}
29
30unsafe impl Send for SessionManager {}
31unsafe impl Sync for SessionManager {}
32
33impl SessionManager {
34 fn terminate(&self) {
35 {
36 #[allow(clippy::collection_is_never_read)]
39 let mut terminated_sessions = Vec::new();
40 for (_, session) in &*self.open_sessions.lock() {
41 if let Some(session) = session.upgrade() {
42 session.terminate();
43 terminated_sessions.push(session);
44 }
45 }
46 }
47 let mut guard = self.open_sessions.lock();
48 self.condvar.wait_while(&mut guard, |s| !s.is_empty());
49 }
50}
51
52impl super::SessionManager for SessionManager {
53 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
54 Ok(())
55 }
56
57 async fn open_session(
58 self: Arc<Self>,
59 mut stream: fblock::SessionRequestStream,
60 offset_map: Option<OffsetMap>,
61 block_size: u32,
62 ) -> Result<(), Error> {
63 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
64 let (abort_handle, registration) = AbortHandle::new_pair();
65 let session = Arc::new(Session {
66 manager: self.clone(),
67 helper,
68 fifo,
69 queue: Mutex::default(),
70 vmos: Mutex::default(),
71 abort_handle,
72 });
73 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
74 unsafe {
75 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
76 }
77
78 let result = Abortable::new(
79 async {
80 while let Some(request) = stream.try_next().await? {
81 session.helper.handle_request(request).await?;
82 }
83 Ok(())
84 },
85 registration,
86 )
87 .await
88 .unwrap_or_else(|e| Err(e.into()));
89
90 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
91
92 result
93 }
94
95 async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
96 Ok(Cow::Borrowed(&self.info))
97 }
98}
99
100impl Drop for SessionManager {
101 fn drop(&mut self) {
102 self.terminate();
103 }
104}
105
106impl IntoSessionManager for Arc<SessionManager> {
107 type SM = SessionManager;
108
109 fn into_session_manager(self) -> Self {
110 self
111 }
112}
113
114#[repr(C)]
115pub struct Callbacks {
116 pub context: *mut c_void,
117 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
118 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
119 pub on_requests: unsafe extern "C" fn(
120 context: *mut c_void,
121 session: *const Session,
122 requests: *mut Request,
123 request_count: usize,
124 ),
125 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
126}
127
128impl Callbacks {
129 #[allow(dead_code)]
130 fn log(&self, msg: &str) {
131 let msg = msg.as_bytes();
132 unsafe {
134 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
135 }
136 }
137}
138
139#[allow(dead_code)]
141pub struct UnownedVmo(zx::sys::zx_handle_t);
142
143#[repr(C)]
144pub struct Request {
145 pub request_id: RequestId,
146 pub operation: Operation,
147 pub trace_flow_id: Option<NonZero<u64>>,
148 pub vmo: UnownedVmo,
149}
150
151unsafe impl Send for Callbacks {}
152unsafe impl Sync for Callbacks {}
153
154pub struct Session {
155 manager: Arc<SessionManager>,
156 helper: SessionHelper<SessionManager>,
157 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
158 queue: Mutex<SessionQueue>,
159 vmos: Mutex<HashMap<RequestId, Arc<zx::Vmo>>>,
160 abort_handle: AbortHandle,
161}
162
163#[derive(Default)]
164struct SessionQueue {
165 responses: VecDeque<BlockFifoResponse>,
166}
167
168pub const MAX_REQUESTS: usize = 64;
169
170impl Session {
171 fn run(&self) {
172 self.fifo_loop();
173 self.abort_handle.abort();
174 }
175
176 fn fifo_loop(&self) {
177 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
178
179 loop {
180 let is_queue_empty = {
182 let mut queue = self.queue.lock();
183 while !queue.responses.is_empty() {
184 let (front, _) = queue.responses.as_slices();
185 match self.fifo.write(front) {
186 Ok(count) => {
187 let full = count < front.len();
188 queue.responses.drain(..count);
189 if full {
190 break;
191 }
192 }
193 Err(zx::Status::SHOULD_WAIT) => break,
194 Err(_) => return,
195 }
196 }
197 queue.responses.is_empty()
198 };
199
200 match self.fifo.read_uninit(&mut requests) {
202 Ok(valid_requests) => self.handle_requests(valid_requests.iter()),
203 Err(zx::Status::SHOULD_WAIT) => {
204 let mut signals =
205 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
206 if !is_queue_empty {
207 signals |= zx::Signals::OBJECT_WRITABLE;
208 }
209 let Ok(signals) =
210 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE)
211 else {
212 return;
213 };
214 if signals.contains(zx::Signals::USER_0) {
215 return;
216 }
217 if signals.contains(zx::Signals::USER_1) {
219 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
220 }
221 }
222 Err(_) => return,
223 }
224 }
225 }
226
227 fn handle_requests<'a>(&self, requests: impl Iterator<Item = &'a BlockFifoRequest>) {
228 let mut decoded_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
229 unsafe { MaybeUninit::uninit().assume_init() };
230 let mut count = 0;
231 for request in requests {
232 if let Some(r) = self.helper.decode_fifo_request(request) {
233 let operation = match r.operation {
234 Ok(Operation::CloseVmo) => {
235 self.send_reply(r.request_tracking, zx::Status::OK);
236 continue;
237 }
238 Ok(operation) => operation,
239 Err(status) => {
240 self.send_reply(r.request_tracking, status);
241 continue;
242 }
243 };
244 let RequestTracking { group_or_request, trace_flow_id } = r.request_tracking;
245 let mut request_id = group_or_request.into();
246 let vmo = if let Some(vmo) = r.vmo {
247 let raw_handle = vmo.raw_handle();
248 self.vmos.lock().insert(request_id, vmo);
249 request_id = request_id.with_vmo();
250 UnownedVmo(raw_handle)
251 } else {
252 UnownedVmo(zx::sys::ZX_HANDLE_INVALID)
253 };
254 decoded_requests[count].write(Request {
255 request_id,
256 operation,
257 trace_flow_id,
258 vmo,
259 });
260 count += 1;
261 }
262 }
263 if count > 0 {
264 unsafe {
265 (self.manager.callbacks.on_requests)(
266 self.manager.callbacks.context,
267 self,
268 decoded_requests[0].as_mut_ptr(),
269 count,
270 );
271 }
272 }
273 }
274
275 fn send_reply(&self, tracking: RequestTracking, status: zx::Status) {
276 let response = match self.helper.finish_fifo_request(tracking, status) {
277 Some(response) => response,
278 None => return,
279 };
280 let mut queue = self.queue.lock();
281 if queue.responses.is_empty() {
282 match self.fifo.write_one(&response) {
283 Ok(()) => {
284 return;
285 }
286 Err(_) => {
287 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
289 }
290 }
291 }
292 queue.responses.push_back(response);
293 }
294
295 fn terminate(&self) {
296 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
297 self.abort_handle.abort();
298 }
299}
300
301impl Drop for Session {
302 fn drop(&mut self) {
303 let notify = {
304 let mut open_sessions = self.manager.open_sessions.lock();
305 open_sessions.remove(&(self as *const _ as usize));
306 open_sessions.is_empty()
307 };
308 if notify {
309 self.manager.condvar.notify_all();
310 }
311 }
312}
313
314pub struct BlockServer {
315 server: super::BlockServer<SessionManager>,
316 ehandle: EHandle,
317 abort_handle: AbortHandle,
318}
319
320struct ExecutorMailbox(Mutex<Mail>, Condvar);
321
322impl ExecutorMailbox {
323 fn post(&self, mail: Mail) -> Mail {
325 let old = std::mem::replace(&mut *self.0.lock(), mail);
326 self.1.notify_all();
327 old
328 }
329
330 fn new() -> Self {
331 Self(Mutex::default(), Condvar::new())
332 }
333}
334
335type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
336
337#[derive(Default)]
338enum Mail {
339 #[default]
340 None,
341 Initialized(EHandle, AbortHandle),
342 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
343 Finished,
344}
345
346impl Drop for BlockServer {
347 fn drop(&mut self) {
348 self.abort_handle.abort();
349 let manager = &self.server.session_manager;
350 let mut mbox = manager.mbox.0.lock();
351 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
352 manager.terminate();
353 debug_assert!(Arc::strong_count(manager) > 0);
354 }
355}
356
357#[repr(C)]
358pub struct PartitionInfo {
359 pub device_flags: u32,
360 pub start_block: u64,
361 pub block_count: u64,
362 pub block_size: u32,
363 pub type_guid: [u8; 16],
364 pub instance_guid: [u8; 16],
365 pub name: *const c_char,
366 pub flags: u64,
367}
368
369#[allow(non_camel_case_types)]
371type zx_handle_t = zx::sys::zx_handle_t;
372
373#[allow(non_camel_case_types)]
375type zx_status_t = zx::sys::zx_status_t;
376
377impl PartitionInfo {
378 unsafe fn to_rust(&self) -> super::DeviceInfo {
379 super::DeviceInfo::Partition(super::PartitionInfo {
380 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
381 block_range: Some(self.start_block..self.start_block + self.block_count),
382 type_guid: self.type_guid,
383 instance_guid: self.instance_guid,
384 name: if self.name.is_null() {
385 "".to_string()
386 } else {
387 String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
388 },
389 flags: self.flags,
390 })
391 }
392}
393
394#[no_mangle]
398pub unsafe extern "C" fn block_server_new(
399 partition_info: &PartitionInfo,
400 callbacks: Callbacks,
401) -> *mut BlockServer {
402 let session_manager = Arc::new(SessionManager {
403 callbacks,
404 open_sessions: Mutex::default(),
405 condvar: Condvar::new(),
406 mbox: ExecutorMailbox::new(),
407 info: partition_info.to_rust(),
408 });
409
410 (session_manager.callbacks.start_thread)(
411 session_manager.callbacks.context,
412 Arc::into_raw(session_manager.clone()) as *const c_void,
413 );
414
415 let mbox = &session_manager.mbox;
416 let mail = {
417 let mut mail = mbox.0.lock();
418 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
419 std::mem::replace(&mut *mail, Mail::None)
420 };
421
422 let block_size = partition_info.block_size;
423 match mail {
424 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
425 server: super::BlockServer::new(block_size, session_manager),
426 ehandle,
427 abort_handle,
428 })),
429 Mail::Finished => std::ptr::null_mut(),
430 _ => unreachable!(),
431 }
432}
433
434#[no_mangle]
438pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
439 let session_manager = &*(arg as *const SessionManager);
440
441 let mut executor = fasync::LocalExecutor::new();
442 let (abort_handle, registration) = AbortHandle::new_pair();
443
444 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
445
446 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
447}
448
449#[no_mangle]
456pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
457 let mail = {
458 let session_manager = Arc::from_raw(arg as *const SessionManager);
459 debug_assert!(Arc::strong_count(&session_manager) > 0);
460 session_manager.mbox.post(Mail::Finished)
461 };
462
463 if let Mail::AsyncShutdown(server, callback, arg) = mail {
464 std::mem::drop(server);
465 unsafe {
467 callback(arg);
468 }
469 }
470}
471
472#[no_mangle]
476pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
477 let _ = Box::from_raw(block_server);
478}
479
480#[no_mangle]
484pub unsafe extern "C" fn block_server_delete_async(
485 block_server: *mut BlockServer,
486 callback: ShutdownCallback,
487 arg: *mut c_void,
488) {
489 let block_server = Box::from_raw(block_server);
490 let session_manager = block_server.server.session_manager.clone();
491 let abort_handle = block_server.abort_handle.clone();
492 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
493 abort_handle.abort();
494}
495
496#[no_mangle]
502pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
503 let block_server = &*block_server;
504 let ehandle = &block_server.ehandle;
505 let handle = zx::Handle::from_raw(handle);
506 ehandle.global_scope().spawn(async move {
507 let _ = block_server
508 .server
509 .handle_requests(fvolume::VolumeRequestStream::from_channel(
510 fasync::Channel::from_channel(handle.into()),
511 ))
512 .await;
513 });
514}
515
516#[no_mangle]
520pub unsafe extern "C" fn block_server_session_run(session: &Session) {
521 session.run();
522}
523
524#[no_mangle]
528pub unsafe extern "C" fn block_server_session_release(session: &Session) {
529 session.terminate();
530 Arc::from_raw(session);
531}
532
533#[no_mangle]
537pub unsafe extern "C" fn block_server_send_reply(
538 session: &Session,
539 request_id: RequestId,
540 trace_flow_id: Option<NonZero<u64>>,
541 status: zx_status_t,
542) {
543 if request_id.did_have_vmo() {
544 session.vmos.lock().remove(&request_id);
545 }
546 session.send_reply(
547 RequestTracking { group_or_request: request_id.into(), trace_flow_id },
548 zx::Status::from_raw(status),
549 );
550}