1use crate::WriteFlags;
6
7use super::{
8 ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
9 SessionHelper, TraceFlowId,
10};
11use anyhow::Error;
12use block_protocol::{BlockFifoRequest, BlockFifoResponse};
13use fidl::endpoints::RequestStream;
14use fidl_fuchsia_storage_block as fblock;
15use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
16use fuchsia_async::{self as fasync, EHandle};
17use fuchsia_sync::{Condvar, Mutex};
18use futures::TryStreamExt;
19use futures::stream::{AbortHandle, Abortable};
20use std::borrow::Cow;
21use std::collections::{HashMap, VecDeque};
22use std::ffi::{CStr, c_char, c_void};
23use std::mem::MaybeUninit;
24use std::num::NonZero;
25use std::ops::{Deref, DerefMut};
26use std::sync::{Arc, Weak};
27
28pub struct SessionManager {
29 callbacks: Callbacks,
30 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
31 open_sessions_condvar: Condvar,
32 active_requests: ActiveRequests<Arc<Session>>,
33 inflight_requests: Mutex<usize>,
34 no_inflight_requests_condvar: Condvar,
35 mbox: ExecutorMailbox,
36 info: super::DeviceInfo,
37}
38
39unsafe impl Send for SessionManager {}
40unsafe impl Sync for SessionManager {}
41
42impl SessionManager {
43 fn submit_requests(&self, requests: &mut [Request]) {
44 *self.inflight_requests.lock() += requests.len();
45 unsafe {
48 (self.callbacks.on_requests)(
49 self.callbacks.context,
50 std::ptr::from_mut(&mut requests[0]),
51 requests.len(),
52 )
53 }
54 }
55
56 fn wait_for_no_inflight_requests(&self) {
61 let mut guard = self.inflight_requests.lock();
62 self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
63 }
64
65 fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
68 if let Some((session, response)) =
69 self.active_requests.complete_and_take_response(request_id, status)
70 {
71 session.send_response(response);
72 }
73 }
74
75 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
76 let notify = {
77 let mut inflight_requests = self.inflight_requests.lock();
78 *inflight_requests -= 1;
79 *inflight_requests == 0
80 };
81 self.complete_unsubmitted_request(request_id, status);
82 if notify {
83 self.no_inflight_requests_condvar.notify_all();
84 }
85 }
86
87 fn terminate(&self) {
88 {
89 #[allow(clippy::collection_is_never_read)]
92 let mut terminated_sessions = Vec::new();
93 for (_, session) in &*self.open_sessions.lock() {
94 if let Some(session) = session.upgrade() {
95 session.terminate();
96 terminated_sessions.push(session);
97 }
98 }
99 }
100 let mut guard = self.open_sessions.lock();
101 self.open_sessions_condvar.wait_while(&mut guard, |s| !s.is_empty());
102 }
103}
104
105impl super::SessionManager for SessionManager {
106 const SUPPORTS_DECOMPRESSION: bool = false;
107 type Session = Arc<Session>;
108
109 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
110 Ok(())
111 }
112
113 async fn open_session(
114 self: Arc<Self>,
115 mut stream: fblock::SessionRequestStream,
116 offset_map: OffsetMap,
117 block_size: u32,
118 ) -> Result<(), Error> {
119 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
120 let (abort_handle, registration) = AbortHandle::new_pair();
121 let session = Arc::new(Session {
122 manager: self.clone(),
123 helper,
124 fifo,
125 queue: Mutex::default(),
126 abort_handle,
127 });
128 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
129 unsafe {
130 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
131 }
132
133 let result = Abortable::new(
134 async {
135 while let Some(request) = stream.try_next().await? {
136 session.helper.handle_request(request).await?;
137 }
138 Ok(())
139 },
140 registration,
141 )
142 .await
143 .unwrap_or_else(|e| Err(e.into()));
144
145 let _ = session.fifo.signal(zx::Signals::empty(), zx::Signals::USER_0);
146
147 result
148 }
149
150 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
151 Cow::Borrowed(&self.info)
152 }
153
154 fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
155 &self.active_requests
156 }
157}
158
159impl Drop for SessionManager {
160 fn drop(&mut self) {
161 self.terminate();
162 }
163}
164
165impl IntoSessionManager for Arc<SessionManager> {
166 type SM = SessionManager;
167
168 fn into_session_manager(self) -> Self {
169 self
170 }
171}
172
173#[repr(C)]
174pub struct Callbacks {
175 pub context: *mut c_void,
179 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
183 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
188 pub on_requests:
194 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
195 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
198}
199
200impl Callbacks {
201 #[allow(dead_code)]
202 fn log(&self, msg: &str) {
203 let msg = msg.as_bytes();
204 unsafe {
206 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
207 }
208 }
209}
210
211#[allow(dead_code)]
213pub struct UnownedVmo(zx::sys::zx_handle_t);
214
215#[repr(C)]
216pub struct Request {
217 pub request_id: RequestId,
218 pub operation: Operation,
219 pub trace_flow_id: TraceFlowId,
220 pub vmo: UnownedVmo,
221}
222
223unsafe impl Send for Callbacks {}
224unsafe impl Sync for Callbacks {}
225
226pub struct Session {
227 manager: Arc<SessionManager>,
228 helper: SessionHelper<SessionManager>,
229 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
230 queue: Mutex<SessionQueue>,
231 abort_handle: AbortHandle,
232}
233
234#[derive(Default)]
235struct SessionQueue {
236 responses: VecDeque<BlockFifoResponse>,
237}
238
239pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
240
241impl Session {
242 fn run(self: &Arc<Self>) {
243 self.fifo_loop();
244 self.abort_handle.abort();
245 self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
246 }
247
248 fn fifo_loop(self: &Arc<Self>) {
249 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
250
251 loop {
252 let is_queue_empty = {
254 let mut queue = self.queue.lock();
255 while !queue.responses.is_empty() {
256 let (front, _) = queue.responses.as_slices();
257 match self.fifo.write(front) {
258 Ok(count) => {
259 let full = count < front.len();
260 queue.responses.drain(..count);
261 if full {
262 break;
263 }
264 }
265 Err(zx::Status::SHOULD_WAIT) => break,
266 Err(_) => return,
267 }
268 }
269 queue.responses.is_empty()
270 };
271
272 match self.fifo.read_uninit(&mut requests) {
274 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
275 Err(zx::Status::SHOULD_WAIT) => {
276 let mut signals =
277 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
278 if !is_queue_empty {
279 signals |= zx::Signals::OBJECT_WRITABLE;
280 }
281 let Ok(signals) =
282 self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
283 else {
284 return;
285 };
286 if signals.contains(zx::Signals::USER_0) {
287 return;
288 }
289 if signals.contains(zx::Signals::USER_1) {
291 let _ = self.fifo.signal(zx::Signals::USER_1, zx::Signals::empty());
292 }
293 }
294 Err(_) => return,
295 }
296 }
297 }
298
299 fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
301 let trace_flow_id = {
302 let mut request = self.manager.active_requests.request(request_id);
303 if let Some(id) = request.trace_flow_id {
304 fuchsia_trace::async_instant!(
305 fuchsia_trace::Id::from(id.get()),
306 c"storage",
307 c"block_server::SimulatedBarrier",
308 "request_id" => request_id.0
309 );
310 }
311 request.count += 1;
312 request.trace_flow_id
313 };
314 self.manager.submit_requests(&mut [Request {
315 request_id,
316 operation: Operation::Flush,
317 trace_flow_id,
318 vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
319 }]);
320 self.manager.wait_for_no_inflight_requests();
321 let status = self.manager.active_requests.request(request_id).status;
322 match status {
323 zx::Status::OK => Ok(()),
324 status => {
325 self.manager.complete_unsubmitted_request(request_id, status);
327 Err(status)
328 }
329 }
330 }
331
332 fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
335 if decoded_requests.len() > 0 {
336 self.manager.submit_requests(decoded_requests);
337 decoded_requests.clear();
338 }
339 self.manager.wait_for_no_inflight_requests();
340 let request = self.manager.active_requests.request(request_id);
341 match request.status {
342 zx::Status::OK => decoded_requests.push(Request {
343 request_id,
344 operation: Operation::Flush,
345 trace_flow_id: request.trace_flow_id,
346 vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
347 }),
348 status => {
349 drop(request);
350 self.manager.complete_unsubmitted_request(request_id, status)
351 }
352 }
353 }
354
355 fn handle_requests<'a>(
356 self: &Arc<Self>,
357 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
358 ) {
359 let mut decoded_requests = DecodedRequests::default();
360 for request in requests {
361 match self.helper.decode_fifo_request(self.clone(), request) {
362 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
363 self.manager.complete_unsubmitted_request(request_id, zx::Status::OK);
364 }
365 Ok(mut request) => {
366 let request_id = request.request_id;
367 if !self
370 .manager
371 .info
372 .device_flags()
373 .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
374 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
375 && self.pre_flush(request_id).is_err()
376 {
377 continue;
378 }
379 let simulate_fua =
382 !self.manager.info.device_flags().contains(fblock::DeviceFlag::FUA_SUPPORT)
383 && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
384 if simulate_fua {
385 self.manager.active_requests.request(request_id).count += 1;
387 }
388
389 loop {
390 let result = self.helper.map_request(
391 request,
392 &mut self.manager.active_requests.request(request_id),
393 );
394 match result {
395 Ok((
396 DecodedRequest { request_id, operation, vmo, trace_flow_id },
397 remainder,
398 )) => {
399 decoded_requests.push(Request {
403 request_id,
404 operation,
405 trace_flow_id,
406 vmo: UnownedVmo(
407 vmo.as_ref()
408 .map(|vmo| vmo.raw_handle())
409 .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
410 ),
411 });
412
413 if decoded_requests.is_full() {
414 self.manager.submit_requests(&mut decoded_requests);
415 decoded_requests.clear();
416 }
417 if let Some(r) = remainder {
418 request = r;
419 } else {
420 break;
421 }
422 }
423 Err(status) => {
424 self.manager.complete_unsubmitted_request(request_id, status);
425 break;
426 }
427 }
428 }
429
430 if simulate_fua {
431 self.post_flush(request_id, &mut decoded_requests);
432 }
433 }
434 Err(None) => {}
435 Err(Some(response)) => self.send_response(response),
436 }
437 }
438 if !decoded_requests.is_empty() {
439 self.manager.submit_requests(&mut decoded_requests);
440 }
441 }
442
443 fn send_response(&self, response: BlockFifoResponse) {
444 let mut queue = self.queue.lock();
445 if queue.responses.is_empty() {
446 match self.fifo.write_one(&response) {
447 Ok(()) => {
448 return;
449 }
450 Err(_) => {
451 let _ = self.fifo.signal(zx::Signals::empty(), zx::Signals::USER_1);
453 }
454 }
455 }
456 queue.responses.push_back(response);
457 }
458
459 fn terminate(&self) {
460 let _ = self.fifo.signal(zx::Signals::empty(), zx::Signals::USER_0);
461 self.abort_handle.abort();
462 }
463}
464
465impl Drop for Session {
466 fn drop(&mut self) {
467 let notify = {
468 let mut open_sessions = self.manager.open_sessions.lock();
469 open_sessions.remove(&(self as *const _ as usize));
470 open_sessions.is_empty()
471 };
472 if notify {
473 self.manager.open_sessions_condvar.notify_all();
474 }
475 }
476}
477
478pub struct BlockServer {
479 server: super::BlockServer<SessionManager>,
480 ehandle: EHandle,
481 abort_handle: AbortHandle,
482}
483
484struct ExecutorMailbox(Mutex<Mail>, Condvar);
485
486impl ExecutorMailbox {
487 fn post(&self, mail: Mail) -> Mail {
489 let old = std::mem::replace(&mut *self.0.lock(), mail);
490 self.1.notify_all();
491 old
492 }
493
494 fn new() -> Self {
495 Self(Mutex::default(), Condvar::new())
496 }
497}
498
499type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
500
501#[derive(Default)]
502enum Mail {
503 #[default]
504 None,
505 Initialized(EHandle, AbortHandle),
506 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
507 Finished,
508}
509
510impl Drop for BlockServer {
511 fn drop(&mut self) {
512 self.abort_handle.abort();
513 let manager = &self.server.session_manager;
514 let mut mbox = manager.mbox.0.lock();
515 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
516 manager.terminate();
517 debug_assert!(Arc::strong_count(manager) > 0);
518 }
519}
520
521#[repr(C)]
522pub struct PartitionInfo {
523 pub device_flags: u32,
524 pub start_block: u64,
525 pub block_count: u64,
526 pub block_size: u32,
527 pub type_guid: [u8; 16],
528 pub instance_guid: [u8; 16],
529 pub name: *const c_char,
530 pub flags: u64,
531 pub max_transfer_size: u32,
532}
533
534#[allow(non_camel_case_types)]
536type zx_handle_t = zx::sys::zx_handle_t;
537
538#[allow(non_camel_case_types)]
540type zx_status_t = zx::sys::zx_status_t;
541
542impl PartitionInfo {
543 unsafe fn to_rust(&self) -> super::DeviceInfo {
547 super::DeviceInfo::Partition(super::PartitionInfo {
548 device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
549 block_range: Some(self.start_block..self.start_block + self.block_count),
550 type_guid: self.type_guid,
551 instance_guid: self.instance_guid,
552 name: if self.name.is_null() {
553 "".to_string()
554 } else {
555 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
556 },
557 flags: self.flags,
558 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
559 NonZero::new(self.max_transfer_size / self.block_size)
560 } else {
561 None
562 },
563 })
564 }
565}
566
567struct DecodedRequests {
568 requests: [MaybeUninit<Request>; MAX_REQUESTS],
569 count: usize,
570}
571
572impl Default for DecodedRequests {
573 fn default() -> Self {
574 Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
575 }
576}
577
578impl DecodedRequests {
579 fn push(&mut self, request: Request) {
580 assert!(self.count < MAX_REQUESTS);
581 self.requests[self.count].write(request);
582 self.count += 1;
583 }
584
585 fn is_full(&self) -> bool {
586 self.count == MAX_REQUESTS
587 }
588
589 fn clear(&mut self) {
590 self.count = 0;
591 }
592}
593
594impl Deref for DecodedRequests {
595 type Target = [Request];
596
597 fn deref(&self) -> &Self::Target {
598 unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
600 }
601}
602
603impl DerefMut for DecodedRequests {
604 fn deref_mut(&mut self) -> &mut Self::Target {
605 unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
607 }
608}
609
610#[unsafe(no_mangle)]
614pub unsafe extern "C" fn block_server_new(
615 partition_info: &PartitionInfo,
616 callbacks: Callbacks,
617) -> *mut BlockServer {
618 let session_manager = Arc::new(SessionManager {
619 callbacks,
620 open_sessions: Mutex::default(),
621 active_requests: ActiveRequests::default(),
622 open_sessions_condvar: Condvar::new(),
623 inflight_requests: Mutex::default(),
624 no_inflight_requests_condvar: Condvar::new(),
625 mbox: ExecutorMailbox::new(),
626 info: unsafe { partition_info.to_rust() },
627 });
628
629 unsafe {
630 (session_manager.callbacks.start_thread)(
631 session_manager.callbacks.context,
632 Arc::into_raw(session_manager.clone()) as *const c_void,
633 );
634 }
635
636 let mbox = &session_manager.mbox;
637 let mail = {
638 let mut mail = mbox.0.lock();
639 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
640 std::mem::replace(&mut *mail, Mail::None)
641 };
642
643 let block_size = partition_info.block_size;
644 match mail {
645 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
646 server: super::BlockServer::new(block_size, session_manager),
647 ehandle,
648 abort_handle,
649 })),
650 Mail::Finished => std::ptr::null_mut(),
651 _ => unreachable!(),
652 }
653}
654
655#[unsafe(no_mangle)]
659pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
660 let session_manager = unsafe { &*(arg as *const SessionManager) };
661
662 let mut executor = fasync::LocalExecutor::default();
663 let (abort_handle, registration) = AbortHandle::new_pair();
664
665 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
666
667 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
668}
669
670#[unsafe(no_mangle)]
677pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
678 let mail = {
679 let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
680 debug_assert!(Arc::strong_count(&session_manager) > 0);
681 session_manager.mbox.post(Mail::Finished)
682 };
683
684 if let Mail::AsyncShutdown(server, callback, arg) = mail {
685 std::mem::drop(server);
686 unsafe {
688 callback(arg);
689 }
690 }
691}
692
693#[unsafe(no_mangle)]
697pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
698 let _ = unsafe { Box::from_raw(block_server) };
699}
700
701#[unsafe(no_mangle)]
705pub unsafe extern "C" fn block_server_delete_async(
706 block_server: *mut BlockServer,
707 callback: ShutdownCallback,
708 arg: *mut c_void,
709) {
710 let block_server = unsafe { Box::from_raw(block_server) };
711 let session_manager = block_server.server.session_manager.clone();
712 let abort_handle = block_server.abort_handle.clone();
713 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
714 abort_handle.abort();
715}
716
717#[unsafe(no_mangle)]
723pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
724 let block_server = unsafe { &*block_server };
725 let ehandle = &block_server.ehandle;
726 let handle = unsafe { zx::NullableHandle::from_raw(handle) };
727 ehandle.global_scope().spawn(async move {
728 let _ = block_server
729 .server
730 .handle_requests(fblock::BlockRequestStream::from_channel(
731 fasync::Channel::from_channel(handle.into()),
732 ))
733 .await;
734 });
735}
736
737#[unsafe(no_mangle)]
741pub unsafe extern "C" fn block_server_session_run(session: &Session) {
742 let session = unsafe { Arc::from_raw(session) };
743 session.run();
744 let _ = Arc::into_raw(session);
745}
746
747#[unsafe(no_mangle)]
751pub unsafe extern "C" fn block_server_session_release(session: &Session) {
752 session.terminate();
753 unsafe { Arc::from_raw(session) };
754}
755
756#[unsafe(no_mangle)]
760pub unsafe extern "C" fn block_server_send_reply(
761 block_server: &BlockServer,
762 request_id: RequestId,
763 status: zx_status_t,
764) {
765 block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
766}