_block_server_c_rustc_static/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    DecodeResult, IntoSessionManager, OffsetMap, Operation, RequestTracking, SessionHelper,
7};
8use anyhow::Error;
9use block_protocol::{BlockFifoRequest, BlockFifoResponse};
10use fidl::endpoints::RequestStream;
11use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
12use fuchsia_async::{self as fasync, EHandle};
13use fuchsia_sync::{Condvar, Mutex};
14use futures::stream::{AbortHandle, Abortable};
15use futures::TryStreamExt;
16use slab::Slab;
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{c_char, c_void, CStr};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26/// We internally keep track of active requests, so that when the server is torn down, we can
27/// deallocate all of the resources for pending requests.
28struct ActiveRequest {
29    session: Arc<Session>,
30    request_tracking: RequestTracking,
31    // Retain a stronng reference to the VMO the request targets while it is active.
32    _vmo: Option<Arc<zx::Vmo>>,
33}
34
35pub struct SessionManager {
36    callbacks: Callbacks,
37    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
38    active_requests: Mutex<Slab<ActiveRequest>>,
39    condvar: Condvar,
40    mbox: ExecutorMailbox,
41    info: super::DeviceInfo,
42}
43
44unsafe impl Send for SessionManager {}
45unsafe impl Sync for SessionManager {}
46
47impl SessionManager {
48    fn start_request(
49        &self,
50        session: Arc<Session>,
51        request_tracking: RequestTracking,
52        operation: Operation,
53        vmo: Option<Arc<zx::Vmo>>,
54    ) -> Request {
55        let mut active_requests = self.active_requests.lock();
56        let vacant = active_requests.vacant_entry();
57        let request = Request {
58            request_id: RequestId(vacant.key()),
59            operation,
60            trace_flow_id: request_tracking.trace_flow_id,
61            vmo: UnownedVmo(
62                vmo.as_ref().map(|vmo| vmo.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
63            ),
64        };
65        vacant.insert(ActiveRequest { session, request_tracking, _vmo: vmo });
66        request
67    }
68
69    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
70        let request = self
71            .active_requests
72            .lock()
73            .try_remove(request_id.0)
74            .unwrap_or_else(|| panic!("Invalid request id {}", request_id.0));
75        request.session.send_reply(request.request_tracking, status);
76    }
77
78    fn terminate(&self) {
79        {
80            // We must drop references to sessions whilst we're not holding the lock for
81            // `open_sessions` because `Session::drop` needs to take that same lock.
82            #[allow(clippy::collection_is_never_read)]
83            let mut terminated_sessions = Vec::new();
84            for (_, session) in &*self.open_sessions.lock() {
85                if let Some(session) = session.upgrade() {
86                    session.terminate();
87                    terminated_sessions.push(session);
88                }
89            }
90        }
91        let mut guard = self.open_sessions.lock();
92        self.condvar.wait_while(&mut guard, |s| !s.is_empty());
93    }
94}
95
96impl super::SessionManager for SessionManager {
97    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
98        Ok(())
99    }
100
101    async fn open_session(
102        self: Arc<Self>,
103        mut stream: fblock::SessionRequestStream,
104        offset_map: OffsetMap,
105        block_size: u32,
106    ) -> Result<(), Error> {
107        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
108        let (abort_handle, registration) = AbortHandle::new_pair();
109        let session = Arc::new(Session {
110            manager: self.clone(),
111            helper,
112            fifo,
113            queue: Mutex::default(),
114            abort_handle,
115        });
116        self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
117        unsafe {
118            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
119        }
120
121        let result = Abortable::new(
122            async {
123                while let Some(request) = stream.try_next().await? {
124                    session.helper.handle_request(request).await?;
125                }
126                Ok(())
127            },
128            registration,
129        )
130        .await
131        .unwrap_or_else(|e| Err(e.into()));
132
133        let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
134
135        result
136    }
137
138    async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
139        Ok(Cow::Borrowed(&self.info))
140    }
141}
142
143impl Drop for SessionManager {
144    fn drop(&mut self) {
145        self.terminate();
146    }
147}
148
149impl IntoSessionManager for Arc<SessionManager> {
150    type SM = SessionManager;
151
152    fn into_session_manager(self) -> Self {
153        self
154    }
155}
156
157#[repr(C)]
158pub struct Callbacks {
159    pub context: *mut c_void,
160    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
161    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
162    pub on_requests:
163        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
164    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
165}
166
167impl Callbacks {
168    #[allow(dead_code)]
169    fn log(&self, msg: &str) {
170        let msg = msg.as_bytes();
171        // SAFETY: This is safe if `context` and `log` are good.
172        unsafe {
173            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
174        }
175    }
176}
177
178/// cbindgen:no-export
179#[allow(dead_code)]
180pub struct UnownedVmo(zx::sys::zx_handle_t);
181
182#[repr(transparent)]
183#[derive(Clone, Copy, Eq, PartialEq, Hash)]
184pub struct RequestId(usize);
185
186#[repr(C)]
187pub struct Request {
188    pub request_id: RequestId,
189    pub operation: Operation,
190    pub trace_flow_id: Option<NonZero<u64>>,
191    pub vmo: UnownedVmo,
192}
193
194unsafe impl Send for Callbacks {}
195unsafe impl Sync for Callbacks {}
196
197pub struct Session {
198    manager: Arc<SessionManager>,
199    helper: SessionHelper<SessionManager>,
200    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
201    queue: Mutex<SessionQueue>,
202    abort_handle: AbortHandle,
203}
204
205#[derive(Default)]
206struct SessionQueue {
207    responses: VecDeque<BlockFifoResponse>,
208}
209
210pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
211
212impl Session {
213    fn run(&self) {
214        self.fifo_loop();
215        self.abort_handle.abort();
216    }
217
218    fn fifo_loop(&self) {
219        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
220
221        loop {
222            // Send queued responses.
223            let is_queue_empty = {
224                let mut queue = self.queue.lock();
225                while !queue.responses.is_empty() {
226                    let (front, _) = queue.responses.as_slices();
227                    match self.fifo.write(front) {
228                        Ok(count) => {
229                            let full = count < front.len();
230                            queue.responses.drain(..count);
231                            if full {
232                                break;
233                            }
234                        }
235                        Err(zx::Status::SHOULD_WAIT) => break,
236                        Err(_) => return,
237                    }
238                }
239                queue.responses.is_empty()
240            };
241
242            // Process pending reads.
243            match self.fifo.read_uninit(&mut requests) {
244                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
245                Err(zx::Status::SHOULD_WAIT) => {
246                    let mut signals =
247                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
248                    if !is_queue_empty {
249                        signals |= zx::Signals::OBJECT_WRITABLE;
250                    }
251                    let Ok(signals) =
252                        self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
253                    else {
254                        return;
255                    };
256                    if signals.contains(zx::Signals::USER_0) {
257                        return;
258                    }
259                    // Clear USER_1 signal if it's set.
260                    if signals.contains(zx::Signals::USER_1) {
261                        let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
262                    }
263                }
264                Err(_) => return,
265            }
266        }
267    }
268
269    fn handle_requests<'a>(&self, requests: impl Iterator<Item = &'a mut BlockFifoRequest>) {
270        let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
271            unsafe { MaybeUninit::uninit().assume_init() };
272        let mut count = 0;
273        let this = self
274            .manager
275            .open_sessions
276            .lock()
277            .get(&(self as *const _ as usize))
278            .and_then(Weak::upgrade)
279            .unwrap();
280        for request in requests {
281            let mut in_split = false;
282            loop {
283                if count >= MAX_REQUESTS {
284                    unsafe {
285                        (self.manager.callbacks.on_requests)(
286                            self.manager.callbacks.context,
287                            c_requests[0].as_mut_ptr(),
288                            count,
289                        );
290                    }
291                    count = 0;
292                }
293                match self.helper.decode_fifo_request(request, in_split) {
294                    DecodeResult::Ok(decoded_request) => {
295                        if let Operation::CloseVmo = decoded_request.operation {
296                            self.send_reply(decoded_request.request_tracking, zx::Status::OK);
297                            break;
298                        }
299                        let c_request = self.manager.start_request(
300                            this.clone(),
301                            decoded_request.request_tracking,
302                            decoded_request.operation,
303                            decoded_request.vmo,
304                        );
305                        c_requests[count].write(c_request);
306                        count += 1;
307                        break;
308                    }
309                    DecodeResult::Split(decoded_request) => {
310                        let c_request = self.manager.start_request(
311                            this.clone(),
312                            decoded_request.request_tracking,
313                            decoded_request.operation,
314                            decoded_request.vmo,
315                        );
316                        c_requests[count].write(c_request);
317                        count += 1;
318                        in_split = true;
319                    }
320                    DecodeResult::InvalidRequest(tracking, status) => {
321                        self.send_reply(tracking, status);
322                        break;
323                    }
324                    DecodeResult::IgnoreRequest => {
325                        break;
326                    }
327                }
328            }
329        }
330        if count > 0 {
331            unsafe {
332                (self.manager.callbacks.on_requests)(
333                    self.manager.callbacks.context,
334                    c_requests[0].as_mut_ptr(),
335                    count,
336                );
337            }
338        }
339    }
340
341    fn send_reply(&self, tracking: RequestTracking, status: zx::Status) {
342        let response = match self.helper.finish_fifo_request(tracking, status) {
343            Some(response) => response,
344            None => return,
345        };
346        let mut queue = self.queue.lock();
347        if queue.responses.is_empty() {
348            match self.fifo.write_one(&response) {
349                Ok(()) => {
350                    return;
351                }
352                Err(_) => {
353                    // Wake `fifo_loop`.
354                    let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
355                }
356            }
357        }
358        queue.responses.push_back(response);
359    }
360
361    fn terminate(&self) {
362        let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
363        self.abort_handle.abort();
364    }
365}
366
367impl Drop for Session {
368    fn drop(&mut self) {
369        let notify = {
370            let mut open_sessions = self.manager.open_sessions.lock();
371            open_sessions.remove(&(self as *const _ as usize));
372            open_sessions.is_empty()
373        };
374        if notify {
375            self.manager.condvar.notify_all();
376        }
377    }
378}
379
380pub struct BlockServer {
381    server: super::BlockServer<SessionManager>,
382    ehandle: EHandle,
383    abort_handle: AbortHandle,
384}
385
386struct ExecutorMailbox(Mutex<Mail>, Condvar);
387
388impl ExecutorMailbox {
389    /// Returns the old mail.
390    fn post(&self, mail: Mail) -> Mail {
391        let old = std::mem::replace(&mut *self.0.lock(), mail);
392        self.1.notify_all();
393        old
394    }
395
396    fn new() -> Self {
397        Self(Mutex::default(), Condvar::new())
398    }
399}
400
401type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
402
403#[derive(Default)]
404enum Mail {
405    #[default]
406    None,
407    Initialized(EHandle, AbortHandle),
408    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
409    Finished,
410}
411
412impl Drop for BlockServer {
413    fn drop(&mut self) {
414        self.abort_handle.abort();
415        let manager = &self.server.session_manager;
416        let mut mbox = manager.mbox.0.lock();
417        manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
418        manager.terminate();
419        debug_assert!(Arc::strong_count(manager) > 0);
420    }
421}
422
423#[repr(C)]
424pub struct PartitionInfo {
425    pub device_flags: u32,
426    pub start_block: u64,
427    pub block_count: u64,
428    pub block_size: u32,
429    pub type_guid: [u8; 16],
430    pub instance_guid: [u8; 16],
431    pub name: *const c_char,
432    pub flags: u64,
433    pub max_transfer_size: u32,
434}
435
436/// cbindgen:no-export
437#[allow(non_camel_case_types)]
438type zx_handle_t = zx::sys::zx_handle_t;
439
440/// cbindgen:no-export
441#[allow(non_camel_case_types)]
442type zx_status_t = zx::sys::zx_status_t;
443
444impl PartitionInfo {
445    unsafe fn to_rust(&self) -> super::DeviceInfo {
446        super::DeviceInfo::Partition(super::PartitionInfo {
447            device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
448            block_range: Some(self.start_block..self.start_block + self.block_count),
449            type_guid: self.type_guid,
450            instance_guid: self.instance_guid,
451            name: if self.name.is_null() {
452                "".to_string()
453            } else {
454                String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
455            },
456            flags: self.flags,
457            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
458                NonZero::new(self.max_transfer_size / self.block_size)
459            } else {
460                None
461            },
462        })
463    }
464}
465
466/// # Safety
467///
468/// All callbacks in `callbacks` must be safe.
469#[no_mangle]
470pub unsafe extern "C" fn block_server_new(
471    partition_info: &PartitionInfo,
472    callbacks: Callbacks,
473) -> *mut BlockServer {
474    let session_manager = Arc::new(SessionManager {
475        callbacks,
476        open_sessions: Mutex::default(),
477        active_requests: Mutex::new(Slab::with_capacity(MAX_REQUESTS)),
478        condvar: Condvar::new(),
479        mbox: ExecutorMailbox::new(),
480        info: partition_info.to_rust(),
481    });
482
483    (session_manager.callbacks.start_thread)(
484        session_manager.callbacks.context,
485        Arc::into_raw(session_manager.clone()) as *const c_void,
486    );
487
488    let mbox = &session_manager.mbox;
489    let mail = {
490        let mut mail = mbox.0.lock();
491        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
492        std::mem::replace(&mut *mail, Mail::None)
493    };
494
495    let block_size = partition_info.block_size;
496    match mail {
497        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
498            server: super::BlockServer::new(block_size, session_manager),
499            ehandle,
500            abort_handle,
501        })),
502        Mail::Finished => std::ptr::null_mut(),
503        _ => unreachable!(),
504    }
505}
506
507/// # Safety
508///
509/// `arg` must be the value passed to the `start_thread` callback.
510#[no_mangle]
511pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
512    let session_manager = &*(arg as *const SessionManager);
513
514    let mut executor = fasync::LocalExecutor::new();
515    let (abort_handle, registration) = AbortHandle::new_pair();
516
517    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
518
519    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
520}
521
522/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
523/// thread is successful or not.
524///
525/// # Safety
526///
527/// `arg` must be the value passed to the `start_thread` callback.
528#[no_mangle]
529pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
530    let mail = {
531        let session_manager = Arc::from_raw(arg as *const SessionManager);
532        debug_assert!(Arc::strong_count(&session_manager) > 0);
533        session_manager.mbox.post(Mail::Finished)
534    };
535
536    if let Mail::AsyncShutdown(server, callback, arg) = mail {
537        std::mem::drop(server);
538        // SAFETY: Whoever supplied the callback must guarantee it's safe.
539        unsafe {
540            callback(arg);
541        }
542    }
543}
544
545/// # Safety
546///
547/// `block_server` must be valid.
548#[no_mangle]
549pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
550    let _ = Box::from_raw(block_server);
551}
552
553/// # Safety
554///
555/// `block_server` must be valid.
556#[no_mangle]
557pub unsafe extern "C" fn block_server_delete_async(
558    block_server: *mut BlockServer,
559    callback: ShutdownCallback,
560    arg: *mut c_void,
561) {
562    let block_server = Box::from_raw(block_server);
563    let session_manager = block_server.server.session_manager.clone();
564    let abort_handle = block_server.abort_handle.clone();
565    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
566    abort_handle.abort();
567}
568
569/// Serves the Volume protocol for this server.  `handle` is consumed.
570///
571/// # Safety
572///
573/// `block_server` and `handle` must be valid.
574#[no_mangle]
575pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
576    let block_server = &*block_server;
577    let ehandle = &block_server.ehandle;
578    let handle = zx::Handle::from_raw(handle);
579    ehandle.global_scope().spawn(async move {
580        let _ = block_server
581            .server
582            .handle_requests(fvolume::VolumeRequestStream::from_channel(
583                fasync::Channel::from_channel(handle.into()),
584            ))
585            .await;
586    });
587}
588
589/// # Safety
590///
591/// `session` must be valid.
592#[no_mangle]
593pub unsafe extern "C" fn block_server_session_run(session: &Session) {
594    session.run();
595}
596
597/// # Safety
598///
599/// `session` must be valid.
600#[no_mangle]
601pub unsafe extern "C" fn block_server_session_release(session: &Session) {
602    session.terminate();
603    Arc::from_raw(session);
604}
605
606/// # Safety
607///
608/// `block_server` must be valid.
609#[no_mangle]
610pub unsafe extern "C" fn block_server_send_reply(
611    block_server: &BlockServer,
612    request_id: RequestId,
613    status: zx_status_t,
614) {
615    block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
616}