block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
7    SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl::endpoints::RequestStream;
12use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
13use fuchsia_async::{self as fasync, EHandle};
14use fuchsia_sync::{Condvar, Mutex};
15use futures::stream::{AbortHandle, Abortable};
16use futures::TryStreamExt;
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{c_char, c_void, CStr};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26pub struct SessionManager {
27    callbacks: Callbacks,
28    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
29    active_requests: ActiveRequests<Arc<Session>>,
30    condvar: Condvar,
31    mbox: ExecutorMailbox,
32    info: super::DeviceInfo,
33}
34
35unsafe impl Send for SessionManager {}
36unsafe impl Sync for SessionManager {}
37
38impl SessionManager {
39    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
40        if let Some((session, response)) =
41            self.active_requests.complete_and_take_response(request_id, status)
42        {
43            session.send_response(response);
44        }
45    }
46
47    fn terminate(&self) {
48        {
49            // We must drop references to sessions whilst we're not holding the lock for
50            // `open_sessions` because `Session::drop` needs to take that same lock.
51            #[allow(clippy::collection_is_never_read)]
52            let mut terminated_sessions = Vec::new();
53            for (_, session) in &*self.open_sessions.lock() {
54                if let Some(session) = session.upgrade() {
55                    session.terminate();
56                    terminated_sessions.push(session);
57                }
58            }
59        }
60        let mut guard = self.open_sessions.lock();
61        self.condvar.wait_while(&mut guard, |s| !s.is_empty());
62    }
63}
64
65impl super::SessionManager for SessionManager {
66    type Session = Arc<Session>;
67
68    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
69        Ok(())
70    }
71
72    async fn open_session(
73        self: Arc<Self>,
74        mut stream: fblock::SessionRequestStream,
75        offset_map: OffsetMap,
76        block_size: u32,
77    ) -> Result<(), Error> {
78        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
79        let (abort_handle, registration) = AbortHandle::new_pair();
80        let session = Arc::new(Session {
81            manager: self.clone(),
82            helper,
83            fifo,
84            queue: Mutex::default(),
85            abort_handle,
86        });
87        self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
88        unsafe {
89            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
90        }
91
92        let result = Abortable::new(
93            async {
94                while let Some(request) = stream.try_next().await? {
95                    session.helper.handle_request(request).await?;
96                }
97                Ok(())
98            },
99            registration,
100        )
101        .await
102        .unwrap_or_else(|e| Err(e.into()));
103
104        let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
105
106        result
107    }
108
109    async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
110        Ok(Cow::Borrowed(&self.info))
111    }
112
113    fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
114        &self.active_requests
115    }
116}
117
118impl Drop for SessionManager {
119    fn drop(&mut self) {
120        self.terminate();
121    }
122}
123
124impl IntoSessionManager for Arc<SessionManager> {
125    type SM = SessionManager;
126
127    fn into_session_manager(self) -> Self {
128        self
129    }
130}
131
132#[repr(C)]
133pub struct Callbacks {
134    pub context: *mut c_void,
135    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
136    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
137    pub on_requests:
138        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
139    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
140}
141
142impl Callbacks {
143    #[allow(dead_code)]
144    fn log(&self, msg: &str) {
145        let msg = msg.as_bytes();
146        // SAFETY: This is safe if `context` and `log` are good.
147        unsafe {
148            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
149        }
150    }
151}
152
153/// cbindgen:no-export
154#[allow(dead_code)]
155pub struct UnownedVmo(zx::sys::zx_handle_t);
156
157#[repr(C)]
158pub struct Request {
159    pub request_id: RequestId,
160    pub operation: Operation,
161    pub trace_flow_id: TraceFlowId,
162    pub vmo: UnownedVmo,
163}
164
165unsafe impl Send for Callbacks {}
166unsafe impl Sync for Callbacks {}
167
168pub struct Session {
169    manager: Arc<SessionManager>,
170    helper: SessionHelper<SessionManager>,
171    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
172    queue: Mutex<SessionQueue>,
173    abort_handle: AbortHandle,
174}
175
176#[derive(Default)]
177struct SessionQueue {
178    responses: VecDeque<BlockFifoResponse>,
179}
180
181pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
182
183impl Session {
184    fn run(self: &Arc<Self>) {
185        self.fifo_loop();
186        self.abort_handle.abort();
187        self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
188    }
189
190    fn fifo_loop(self: &Arc<Self>) {
191        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
192
193        loop {
194            // Send queued responses.
195            let is_queue_empty = {
196                let mut queue = self.queue.lock();
197                while !queue.responses.is_empty() {
198                    let (front, _) = queue.responses.as_slices();
199                    match self.fifo.write(front) {
200                        Ok(count) => {
201                            let full = count < front.len();
202                            queue.responses.drain(..count);
203                            if full {
204                                break;
205                            }
206                        }
207                        Err(zx::Status::SHOULD_WAIT) => break,
208                        Err(_) => return,
209                    }
210                }
211                queue.responses.is_empty()
212            };
213
214            // Process pending reads.
215            match self.fifo.read_uninit(&mut requests) {
216                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
217                Err(zx::Status::SHOULD_WAIT) => {
218                    let mut signals =
219                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
220                    if !is_queue_empty {
221                        signals |= zx::Signals::OBJECT_WRITABLE;
222                    }
223                    let Ok(signals) =
224                        self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
225                    else {
226                        return;
227                    };
228                    if signals.contains(zx::Signals::USER_0) {
229                        return;
230                    }
231                    // Clear USER_1 signal if it's set.
232                    if signals.contains(zx::Signals::USER_1) {
233                        let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
234                    }
235                }
236                Err(_) => return,
237            }
238        }
239    }
240
241    fn handle_requests<'a>(
242        self: &Arc<Self>,
243        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
244    ) {
245        let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
246            unsafe { MaybeUninit::uninit().assume_init() };
247        let mut count = 0;
248        for request in requests {
249            match self.helper.decode_fifo_request(self.clone(), request) {
250                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
251                    self.complete_request(request_id, zx::Status::OK);
252                }
253                Ok(mut request) => loop {
254                    match self.helper.map_request(request) {
255                        Ok((
256                            DecodedRequest { request_id, operation, trace_flow_id, vmo },
257                            remainder,
258                        )) => {
259                            // We are handing out unowned references to the VMO here.  This is safe
260                            // because the VMO bin holds references to any closed VMOs until all
261                            // preceding operations have finished.
262                            c_requests[count].write(Request {
263                                request_id,
264                                operation,
265                                trace_flow_id,
266                                vmo: UnownedVmo(
267                                    vmo.as_ref()
268                                        .map(|vmo| vmo.raw_handle())
269                                        .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
270                                ),
271                            });
272                            count += 1;
273                            if count == MAX_REQUESTS {
274                                unsafe {
275                                    (self.manager.callbacks.on_requests)(
276                                        self.manager.callbacks.context,
277                                        c_requests[0].as_mut_ptr(),
278                                        count,
279                                    );
280                                }
281                                count = 0;
282                            }
283                            if let Some(r) = remainder {
284                                request = r;
285                            } else {
286                                break;
287                            }
288                        }
289                        Err(Some(response)) => {
290                            self.send_response(response);
291                            break;
292                        }
293                        Err(None) => break,
294                    }
295                },
296                Err(None) => {}
297                Err(Some(response)) => self.send_response(response),
298            }
299        }
300        if count > 0 {
301            unsafe {
302                (self.manager.callbacks.on_requests)(
303                    self.manager.callbacks.context,
304                    c_requests[0].as_mut_ptr(),
305                    count,
306                );
307            }
308        }
309    }
310
311    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
312        self.manager.complete_request(request_id, status);
313    }
314
315    fn send_response(&self, response: BlockFifoResponse) {
316        let mut queue = self.queue.lock();
317        if queue.responses.is_empty() {
318            match self.fifo.write_one(&response) {
319                Ok(()) => {
320                    return;
321                }
322                Err(_) => {
323                    // Wake `fifo_loop`.
324                    let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
325                }
326            }
327        }
328        queue.responses.push_back(response);
329    }
330
331    fn terminate(&self) {
332        let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
333        self.abort_handle.abort();
334    }
335}
336
337impl Drop for Session {
338    fn drop(&mut self) {
339        let notify = {
340            let mut open_sessions = self.manager.open_sessions.lock();
341            open_sessions.remove(&(self as *const _ as usize));
342            open_sessions.is_empty()
343        };
344        if notify {
345            self.manager.condvar.notify_all();
346        }
347    }
348}
349
350pub struct BlockServer {
351    server: super::BlockServer<SessionManager>,
352    ehandle: EHandle,
353    abort_handle: AbortHandle,
354}
355
356struct ExecutorMailbox(Mutex<Mail>, Condvar);
357
358impl ExecutorMailbox {
359    /// Returns the old mail.
360    fn post(&self, mail: Mail) -> Mail {
361        let old = std::mem::replace(&mut *self.0.lock(), mail);
362        self.1.notify_all();
363        old
364    }
365
366    fn new() -> Self {
367        Self(Mutex::default(), Condvar::new())
368    }
369}
370
371type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
372
373#[derive(Default)]
374enum Mail {
375    #[default]
376    None,
377    Initialized(EHandle, AbortHandle),
378    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
379    Finished,
380}
381
382impl Drop for BlockServer {
383    fn drop(&mut self) {
384        self.abort_handle.abort();
385        let manager = &self.server.session_manager;
386        let mut mbox = manager.mbox.0.lock();
387        manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
388        manager.terminate();
389        debug_assert!(Arc::strong_count(manager) > 0);
390    }
391}
392
393#[repr(C)]
394pub struct PartitionInfo {
395    pub device_flags: u32,
396    pub start_block: u64,
397    pub block_count: u64,
398    pub block_size: u32,
399    pub type_guid: [u8; 16],
400    pub instance_guid: [u8; 16],
401    pub name: *const c_char,
402    pub flags: u64,
403    pub max_transfer_size: u32,
404}
405
406/// cbindgen:no-export
407#[allow(non_camel_case_types)]
408type zx_handle_t = zx::sys::zx_handle_t;
409
410/// cbindgen:no-export
411#[allow(non_camel_case_types)]
412type zx_status_t = zx::sys::zx_status_t;
413
414impl PartitionInfo {
415    unsafe fn to_rust(&self) -> super::DeviceInfo {
416        super::DeviceInfo::Partition(super::PartitionInfo {
417            device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
418            block_range: Some(self.start_block..self.start_block + self.block_count),
419            type_guid: self.type_guid,
420            instance_guid: self.instance_guid,
421            name: if self.name.is_null() {
422                "".to_string()
423            } else {
424                String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
425            },
426            flags: self.flags,
427            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
428                NonZero::new(self.max_transfer_size / self.block_size)
429            } else {
430                None
431            },
432        })
433    }
434}
435
436/// # Safety
437///
438/// All callbacks in `callbacks` must be safe.
439#[no_mangle]
440pub unsafe extern "C" fn block_server_new(
441    partition_info: &PartitionInfo,
442    callbacks: Callbacks,
443) -> *mut BlockServer {
444    let session_manager = Arc::new(SessionManager {
445        callbacks,
446        open_sessions: Mutex::default(),
447        active_requests: ActiveRequests::default(),
448        condvar: Condvar::new(),
449        mbox: ExecutorMailbox::new(),
450        info: partition_info.to_rust(),
451    });
452
453    (session_manager.callbacks.start_thread)(
454        session_manager.callbacks.context,
455        Arc::into_raw(session_manager.clone()) as *const c_void,
456    );
457
458    let mbox = &session_manager.mbox;
459    let mail = {
460        let mut mail = mbox.0.lock();
461        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
462        std::mem::replace(&mut *mail, Mail::None)
463    };
464
465    let block_size = partition_info.block_size;
466    match mail {
467        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
468            server: super::BlockServer::new(block_size, session_manager),
469            ehandle,
470            abort_handle,
471        })),
472        Mail::Finished => std::ptr::null_mut(),
473        _ => unreachable!(),
474    }
475}
476
477/// # Safety
478///
479/// `arg` must be the value passed to the `start_thread` callback.
480#[no_mangle]
481pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
482    let session_manager = &*(arg as *const SessionManager);
483
484    let mut executor = fasync::LocalExecutor::new();
485    let (abort_handle, registration) = AbortHandle::new_pair();
486
487    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
488
489    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
490}
491
492/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
493/// thread is successful or not.
494///
495/// # Safety
496///
497/// `arg` must be the value passed to the `start_thread` callback.
498#[no_mangle]
499pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
500    let mail = {
501        let session_manager = Arc::from_raw(arg as *const SessionManager);
502        debug_assert!(Arc::strong_count(&session_manager) > 0);
503        session_manager.mbox.post(Mail::Finished)
504    };
505
506    if let Mail::AsyncShutdown(server, callback, arg) = mail {
507        std::mem::drop(server);
508        // SAFETY: Whoever supplied the callback must guarantee it's safe.
509        unsafe {
510            callback(arg);
511        }
512    }
513}
514
515/// # Safety
516///
517/// `block_server` must be valid.
518#[no_mangle]
519pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
520    let _ = Box::from_raw(block_server);
521}
522
523/// # Safety
524///
525/// `block_server` must be valid.
526#[no_mangle]
527pub unsafe extern "C" fn block_server_delete_async(
528    block_server: *mut BlockServer,
529    callback: ShutdownCallback,
530    arg: *mut c_void,
531) {
532    let block_server = Box::from_raw(block_server);
533    let session_manager = block_server.server.session_manager.clone();
534    let abort_handle = block_server.abort_handle.clone();
535    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
536    abort_handle.abort();
537}
538
539/// Serves the Volume protocol for this server.  `handle` is consumed.
540///
541/// # Safety
542///
543/// `block_server` and `handle` must be valid.
544#[no_mangle]
545pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
546    let block_server = &*block_server;
547    let ehandle = &block_server.ehandle;
548    let handle = zx::Handle::from_raw(handle);
549    ehandle.global_scope().spawn(async move {
550        let _ = block_server
551            .server
552            .handle_requests(fvolume::VolumeRequestStream::from_channel(
553                fasync::Channel::from_channel(handle.into()),
554            ))
555            .await;
556    });
557}
558
559/// # Safety
560///
561/// `session` must be valid.
562#[no_mangle]
563pub unsafe extern "C" fn block_server_session_run(session: &Session) {
564    let session = Arc::from_raw(session);
565    session.run();
566    let _ = Arc::into_raw(session);
567}
568
569/// # Safety
570///
571/// `session` must be valid.
572#[no_mangle]
573pub unsafe extern "C" fn block_server_session_release(session: &Session) {
574    session.terminate();
575    Arc::from_raw(session);
576}
577
578/// # Safety
579///
580/// `block_server` must be valid.
581#[no_mangle]
582pub unsafe extern "C" fn block_server_send_reply(
583    block_server: &BlockServer,
584    request_id: RequestId,
585    status: zx_status_t,
586) {
587    block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
588}