block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
7    SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl::endpoints::RequestStream;
12use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
13use fuchsia_async::{self as fasync, EHandle};
14use fuchsia_sync::{Condvar, Mutex};
15use futures::TryStreamExt;
16use futures::stream::{AbortHandle, Abortable};
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{CStr, c_char, c_void};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26pub struct SessionManager {
27    callbacks: Callbacks,
28    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
29    active_requests: ActiveRequests<Arc<Session>>,
30    condvar: Condvar,
31    mbox: ExecutorMailbox,
32    info: super::DeviceInfo,
33}
34
35unsafe impl Send for SessionManager {}
36unsafe impl Sync for SessionManager {}
37
38impl SessionManager {
39    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
40        if let Some((session, response)) =
41            self.active_requests.complete_and_take_response(request_id, status)
42        {
43            session.send_response(response);
44        }
45    }
46
47    fn terminate(&self) {
48        {
49            // We must drop references to sessions whilst we're not holding the lock for
50            // `open_sessions` because `Session::drop` needs to take that same lock.
51            #[allow(clippy::collection_is_never_read)]
52            let mut terminated_sessions = Vec::new();
53            for (_, session) in &*self.open_sessions.lock() {
54                if let Some(session) = session.upgrade() {
55                    session.terminate();
56                    terminated_sessions.push(session);
57                }
58            }
59        }
60        let mut guard = self.open_sessions.lock();
61        self.condvar.wait_while(&mut guard, |s| !s.is_empty());
62    }
63}
64
65impl super::SessionManager for SessionManager {
66    const SUPPORTS_DECOMPRESSION: bool = false;
67    type Session = Arc<Session>;
68
69    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
70        Ok(())
71    }
72
73    async fn open_session(
74        self: Arc<Self>,
75        mut stream: fblock::SessionRequestStream,
76        offset_map: OffsetMap,
77        block_size: u32,
78    ) -> Result<(), Error> {
79        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
80        let (abort_handle, registration) = AbortHandle::new_pair();
81        let session = Arc::new(Session {
82            manager: self.clone(),
83            helper,
84            fifo,
85            queue: Mutex::default(),
86            abort_handle,
87        });
88        self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
89        unsafe {
90            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
91        }
92
93        let result = Abortable::new(
94            async {
95                while let Some(request) = stream.try_next().await? {
96                    session.helper.handle_request(request).await?;
97                }
98                Ok(())
99            },
100            registration,
101        )
102        .await
103        .unwrap_or_else(|e| Err(e.into()));
104
105        let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
106
107        result
108    }
109
110    async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
111        Ok(Cow::Borrowed(&self.info))
112    }
113
114    fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
115        &self.active_requests
116    }
117}
118
119impl Drop for SessionManager {
120    fn drop(&mut self) {
121        self.terminate();
122    }
123}
124
125impl IntoSessionManager for Arc<SessionManager> {
126    type SM = SessionManager;
127
128    fn into_session_manager(self) -> Self {
129        self
130    }
131}
132
133#[repr(C)]
134pub struct Callbacks {
135    pub context: *mut c_void,
136    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
137    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
138    pub on_requests:
139        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
140    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
141}
142
143impl Callbacks {
144    #[allow(dead_code)]
145    fn log(&self, msg: &str) {
146        let msg = msg.as_bytes();
147        // SAFETY: This is safe if `context` and `log` are good.
148        unsafe {
149            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
150        }
151    }
152}
153
154/// cbindgen:no-export
155#[allow(dead_code)]
156pub struct UnownedVmo(zx::sys::zx_handle_t);
157
158#[repr(C)]
159pub struct Request {
160    pub request_id: RequestId,
161    pub operation: Operation,
162    pub trace_flow_id: TraceFlowId,
163    pub vmo: UnownedVmo,
164}
165
166unsafe impl Send for Callbacks {}
167unsafe impl Sync for Callbacks {}
168
169pub struct Session {
170    manager: Arc<SessionManager>,
171    helper: SessionHelper<SessionManager>,
172    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
173    queue: Mutex<SessionQueue>,
174    abort_handle: AbortHandle,
175}
176
177#[derive(Default)]
178struct SessionQueue {
179    responses: VecDeque<BlockFifoResponse>,
180}
181
182pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
183
184impl Session {
185    fn run(self: &Arc<Self>) {
186        self.fifo_loop();
187        self.abort_handle.abort();
188        self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
189    }
190
191    fn fifo_loop(self: &Arc<Self>) {
192        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
193
194        loop {
195            // Send queued responses.
196            let is_queue_empty = {
197                let mut queue = self.queue.lock();
198                while !queue.responses.is_empty() {
199                    let (front, _) = queue.responses.as_slices();
200                    match self.fifo.write(front) {
201                        Ok(count) => {
202                            let full = count < front.len();
203                            queue.responses.drain(..count);
204                            if full {
205                                break;
206                            }
207                        }
208                        Err(zx::Status::SHOULD_WAIT) => break,
209                        Err(_) => return,
210                    }
211                }
212                queue.responses.is_empty()
213            };
214
215            // Process pending reads.
216            match self.fifo.read_uninit(&mut requests) {
217                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
218                Err(zx::Status::SHOULD_WAIT) => {
219                    let mut signals =
220                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
221                    if !is_queue_empty {
222                        signals |= zx::Signals::OBJECT_WRITABLE;
223                    }
224                    let Ok(signals) =
225                        self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
226                    else {
227                        return;
228                    };
229                    if signals.contains(zx::Signals::USER_0) {
230                        return;
231                    }
232                    // Clear USER_1 signal if it's set.
233                    if signals.contains(zx::Signals::USER_1) {
234                        let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
235                    }
236                }
237                Err(_) => return,
238            }
239        }
240    }
241
242    fn handle_requests<'a>(
243        self: &Arc<Self>,
244        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
245    ) {
246        let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
247            unsafe { MaybeUninit::uninit().assume_init() };
248        let mut count = 0;
249        for request in requests {
250            match self.helper.decode_fifo_request(self.clone(), request) {
251                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
252                    self.complete_request(request_id, zx::Status::OK);
253                }
254                Ok(mut request) => {
255                    let request_id = request.request_id;
256                    loop {
257                        let map_result = self.helper.map_request(
258                            request,
259                            &mut self.helper.session_manager.active_requests.request(request_id),
260                        );
261                        match map_result {
262                            Ok((
263                                DecodedRequest { request_id, operation, trace_flow_id, vmo },
264                                remainder,
265                            )) => {
266                                // We are handing out unowned references to the VMO here.  This
267                                // is safe because the VMO bin holds references to any closed
268                                // VMOs until all preceding operations have finished.
269                                c_requests[count].write(Request {
270                                    request_id,
271                                    operation,
272                                    trace_flow_id,
273                                    vmo: UnownedVmo(
274                                        vmo.as_ref()
275                                            .map(|vmo| vmo.raw_handle())
276                                            .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
277                                    ),
278                                });
279                                count += 1;
280                                if count == MAX_REQUESTS {
281                                    unsafe {
282                                        (self.manager.callbacks.on_requests)(
283                                            self.manager.callbacks.context,
284                                            c_requests[0].as_mut_ptr(),
285                                            count,
286                                        );
287                                    }
288                                    count = 0;
289                                }
290                                if let Some(r) = remainder {
291                                    request = r;
292                                } else {
293                                    break;
294                                }
295                            }
296                            Err(status) => {
297                                self.complete_request(request_id, status);
298                                break;
299                            }
300                        }
301                    }
302                }
303                Err(None) => {}
304                Err(Some(response)) => self.send_response(response),
305            }
306        }
307        if count > 0 {
308            unsafe {
309                (self.manager.callbacks.on_requests)(
310                    self.manager.callbacks.context,
311                    c_requests[0].as_mut_ptr(),
312                    count,
313                );
314            }
315        }
316    }
317
318    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
319        self.manager.complete_request(request_id, status);
320    }
321
322    fn send_response(&self, response: BlockFifoResponse) {
323        let mut queue = self.queue.lock();
324        if queue.responses.is_empty() {
325            match self.fifo.write_one(&response) {
326                Ok(()) => {
327                    return;
328                }
329                Err(_) => {
330                    // Wake `fifo_loop`.
331                    let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
332                }
333            }
334        }
335        queue.responses.push_back(response);
336    }
337
338    fn terminate(&self) {
339        let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
340        self.abort_handle.abort();
341    }
342}
343
344impl Drop for Session {
345    fn drop(&mut self) {
346        let notify = {
347            let mut open_sessions = self.manager.open_sessions.lock();
348            open_sessions.remove(&(self as *const _ as usize));
349            open_sessions.is_empty()
350        };
351        if notify {
352            self.manager.condvar.notify_all();
353        }
354    }
355}
356
357pub struct BlockServer {
358    server: super::BlockServer<SessionManager>,
359    ehandle: EHandle,
360    abort_handle: AbortHandle,
361}
362
363struct ExecutorMailbox(Mutex<Mail>, Condvar);
364
365impl ExecutorMailbox {
366    /// Returns the old mail.
367    fn post(&self, mail: Mail) -> Mail {
368        let old = std::mem::replace(&mut *self.0.lock(), mail);
369        self.1.notify_all();
370        old
371    }
372
373    fn new() -> Self {
374        Self(Mutex::default(), Condvar::new())
375    }
376}
377
378type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
379
380#[derive(Default)]
381enum Mail {
382    #[default]
383    None,
384    Initialized(EHandle, AbortHandle),
385    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
386    Finished,
387}
388
389impl Drop for BlockServer {
390    fn drop(&mut self) {
391        self.abort_handle.abort();
392        let manager = &self.server.session_manager;
393        let mut mbox = manager.mbox.0.lock();
394        manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
395        manager.terminate();
396        debug_assert!(Arc::strong_count(manager) > 0);
397    }
398}
399
400#[repr(C)]
401pub struct PartitionInfo {
402    pub device_flags: u32,
403    pub start_block: u64,
404    pub block_count: u64,
405    pub block_size: u32,
406    pub type_guid: [u8; 16],
407    pub instance_guid: [u8; 16],
408    pub name: *const c_char,
409    pub flags: u64,
410    pub max_transfer_size: u32,
411}
412
413/// cbindgen:no-export
414#[allow(non_camel_case_types)]
415type zx_handle_t = zx::sys::zx_handle_t;
416
417/// cbindgen:no-export
418#[allow(non_camel_case_types)]
419type zx_status_t = zx::sys::zx_status_t;
420
421impl PartitionInfo {
422    /// # Safety
423    ///
424    /// [`self.name`] must point to valid, null-terminated C-string, or be a nullptr.
425    unsafe fn to_rust(&self) -> super::DeviceInfo {
426        super::DeviceInfo::Partition(super::PartitionInfo {
427            device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
428            block_range: Some(self.start_block..self.start_block + self.block_count),
429            type_guid: self.type_guid,
430            instance_guid: self.instance_guid,
431            name: if self.name.is_null() {
432                "".to_string()
433            } else {
434                String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
435            },
436            flags: self.flags,
437            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
438                NonZero::new(self.max_transfer_size / self.block_size)
439            } else {
440                None
441            },
442        })
443    }
444}
445
446/// # Safety
447///
448/// All callbacks in `callbacks` must be safe.
449#[unsafe(no_mangle)]
450pub unsafe extern "C" fn block_server_new(
451    partition_info: &PartitionInfo,
452    callbacks: Callbacks,
453) -> *mut BlockServer {
454    let session_manager = Arc::new(SessionManager {
455        callbacks,
456        open_sessions: Mutex::default(),
457        active_requests: ActiveRequests::default(),
458        condvar: Condvar::new(),
459        mbox: ExecutorMailbox::new(),
460        info: unsafe { partition_info.to_rust() },
461    });
462
463    unsafe {
464        (session_manager.callbacks.start_thread)(
465            session_manager.callbacks.context,
466            Arc::into_raw(session_manager.clone()) as *const c_void,
467        );
468    }
469
470    let mbox = &session_manager.mbox;
471    let mail = {
472        let mut mail = mbox.0.lock();
473        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
474        std::mem::replace(&mut *mail, Mail::None)
475    };
476
477    let block_size = partition_info.block_size;
478    match mail {
479        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
480            server: super::BlockServer::new(block_size, session_manager),
481            ehandle,
482            abort_handle,
483        })),
484        Mail::Finished => std::ptr::null_mut(),
485        _ => unreachable!(),
486    }
487}
488
489/// # Safety
490///
491/// `arg` must be the value passed to the `start_thread` callback.
492#[unsafe(no_mangle)]
493pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
494    let session_manager = unsafe { &*(arg as *const SessionManager) };
495
496    let mut executor = fasync::LocalExecutor::default();
497    let (abort_handle, registration) = AbortHandle::new_pair();
498
499    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
500
501    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
502}
503
504/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
505/// thread is successful or not.
506///
507/// # Safety
508///
509/// `arg` must be the value passed to the `start_thread` callback.
510#[unsafe(no_mangle)]
511pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
512    let mail = {
513        let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
514        debug_assert!(Arc::strong_count(&session_manager) > 0);
515        session_manager.mbox.post(Mail::Finished)
516    };
517
518    if let Mail::AsyncShutdown(server, callback, arg) = mail {
519        std::mem::drop(server);
520        // SAFETY: Whoever supplied the callback must guarantee it's safe.
521        unsafe {
522            callback(arg);
523        }
524    }
525}
526
527/// # Safety
528///
529/// `block_server` must be valid.
530#[unsafe(no_mangle)]
531pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
532    let _ = unsafe { Box::from_raw(block_server) };
533}
534
535/// # Safety
536///
537/// `block_server` must be valid.
538#[unsafe(no_mangle)]
539pub unsafe extern "C" fn block_server_delete_async(
540    block_server: *mut BlockServer,
541    callback: ShutdownCallback,
542    arg: *mut c_void,
543) {
544    let block_server = unsafe { Box::from_raw(block_server) };
545    let session_manager = block_server.server.session_manager.clone();
546    let abort_handle = block_server.abort_handle.clone();
547    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
548    abort_handle.abort();
549}
550
551/// Serves the Volume protocol for this server.  `handle` is consumed.
552///
553/// # Safety
554///
555/// `block_server` and `handle` must be valid.
556#[unsafe(no_mangle)]
557pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
558    let block_server = unsafe { &*block_server };
559    let ehandle = &block_server.ehandle;
560    let handle = unsafe { zx::Handle::from_raw(handle) };
561    ehandle.global_scope().spawn(async move {
562        let _ = block_server
563            .server
564            .handle_requests(fvolume::VolumeRequestStream::from_channel(
565                fasync::Channel::from_channel(handle.into()),
566            ))
567            .await;
568    });
569}
570
571/// # Safety
572///
573/// `session` must be valid.
574#[unsafe(no_mangle)]
575pub unsafe extern "C" fn block_server_session_run(session: &Session) {
576    let session = unsafe { Arc::from_raw(session) };
577    session.run();
578    let _ = Arc::into_raw(session);
579}
580
581/// # Safety
582///
583/// `session` must be valid.
584#[unsafe(no_mangle)]
585pub unsafe extern "C" fn block_server_session_release(session: &Session) {
586    session.terminate();
587    unsafe { Arc::from_raw(session) };
588}
589
590/// # Safety
591///
592/// `block_server` must be valid.
593#[unsafe(no_mangle)]
594pub unsafe extern "C" fn block_server_send_reply(
595    block_server: &BlockServer,
596    request_id: RequestId,
597    status: zx_status_t,
598) {
599    block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
600}