block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{IntoSessionManager, OffsetMap, Operation, RequestId, RequestTracking, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse};
8use fidl::endpoints::RequestStream;
9use fuchsia_async::{self as fasync, EHandle};
10use fuchsia_sync::{Condvar, Mutex};
11use futures::stream::{AbortHandle, Abortable};
12use futures::TryStreamExt;
13use std::borrow::Cow;
14use std::collections::{HashMap, VecDeque};
15use std::ffi::{c_char, c_void, CStr};
16use std::mem::MaybeUninit;
17use std::num::NonZero;
18use std::sync::{Arc, Weak};
19use zx::{self as zx, AsHandleRef as _};
20use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
21
22pub struct SessionManager {
23    callbacks: Callbacks,
24    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
25    condvar: Condvar,
26    mbox: ExecutorMailbox,
27    info: super::DeviceInfo,
28}
29
30unsafe impl Send for SessionManager {}
31unsafe impl Sync for SessionManager {}
32
33impl SessionManager {
34    fn terminate(&self) {
35        {
36            // We must drop references to sessions whilst we're not holding the lock for
37            // `open_sessions` because `Session::drop` needs to take that same lock.
38            #[allow(clippy::collection_is_never_read)]
39            let mut terminated_sessions = Vec::new();
40            for (_, session) in &*self.open_sessions.lock() {
41                if let Some(session) = session.upgrade() {
42                    session.terminate();
43                    terminated_sessions.push(session);
44                }
45            }
46        }
47        let mut guard = self.open_sessions.lock();
48        self.condvar.wait_while(&mut guard, |s| !s.is_empty());
49    }
50}
51
52impl super::SessionManager for SessionManager {
53    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
54        Ok(())
55    }
56
57    async fn open_session(
58        self: Arc<Self>,
59        mut stream: fblock::SessionRequestStream,
60        offset_map: Option<OffsetMap>,
61        block_size: u32,
62    ) -> Result<(), Error> {
63        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
64        let (abort_handle, registration) = AbortHandle::new_pair();
65        let session = Arc::new(Session {
66            manager: self.clone(),
67            helper,
68            fifo,
69            queue: Mutex::default(),
70            vmos: Mutex::default(),
71            abort_handle,
72        });
73        self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
74        unsafe {
75            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
76        }
77
78        let result = Abortable::new(
79            async {
80                while let Some(request) = stream.try_next().await? {
81                    session.helper.handle_request(request).await?;
82                }
83                Ok(())
84            },
85            registration,
86        )
87        .await
88        .unwrap_or_else(|e| Err(e.into()));
89
90        let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
91
92        result
93    }
94
95    async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
96        Ok(Cow::Borrowed(&self.info))
97    }
98}
99
100impl Drop for SessionManager {
101    fn drop(&mut self) {
102        self.terminate();
103    }
104}
105
106impl IntoSessionManager for Arc<SessionManager> {
107    type SM = SessionManager;
108
109    fn into_session_manager(self) -> Self {
110        self
111    }
112}
113
114#[repr(C)]
115pub struct Callbacks {
116    pub context: *mut c_void,
117    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
118    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
119    pub on_requests: unsafe extern "C" fn(
120        context: *mut c_void,
121        session: *const Session,
122        requests: *mut Request,
123        request_count: usize,
124    ),
125    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
126}
127
128impl Callbacks {
129    #[allow(dead_code)]
130    fn log(&self, msg: &str) {
131        let msg = msg.as_bytes();
132        // SAFETY: This is safe if `context` and `log` are good.
133        unsafe {
134            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
135        }
136    }
137}
138
139/// cbindgen:no-export
140#[allow(dead_code)]
141pub struct UnownedVmo(zx::sys::zx_handle_t);
142
143#[repr(C)]
144pub struct Request {
145    pub request_id: RequestId,
146    pub operation: Operation,
147    pub trace_flow_id: Option<NonZero<u64>>,
148    pub vmo: UnownedVmo,
149}
150
151unsafe impl Send for Callbacks {}
152unsafe impl Sync for Callbacks {}
153
154pub struct Session {
155    manager: Arc<SessionManager>,
156    helper: SessionHelper<SessionManager>,
157    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
158    queue: Mutex<SessionQueue>,
159    vmos: Mutex<HashMap<RequestId, Arc<zx::Vmo>>>,
160    abort_handle: AbortHandle,
161}
162
163#[derive(Default)]
164struct SessionQueue {
165    responses: VecDeque<BlockFifoResponse>,
166}
167
168pub const MAX_REQUESTS: usize = 64;
169
170impl Session {
171    fn run(&self) {
172        self.fifo_loop();
173        self.abort_handle.abort();
174    }
175
176    fn fifo_loop(&self) {
177        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
178
179        loop {
180            // Send queued responses.
181            let is_queue_empty = {
182                let mut queue = self.queue.lock();
183                while !queue.responses.is_empty() {
184                    let (front, _) = queue.responses.as_slices();
185                    match self.fifo.write(front) {
186                        Ok(count) => {
187                            let full = count < front.len();
188                            queue.responses.drain(..count);
189                            if full {
190                                break;
191                            }
192                        }
193                        Err(zx::Status::SHOULD_WAIT) => break,
194                        Err(_) => return,
195                    }
196                }
197                queue.responses.is_empty()
198            };
199
200            // Process pending reads.
201            match self.fifo.read_uninit(&mut requests) {
202                Ok(valid_requests) => self.handle_requests(valid_requests.iter()),
203                Err(zx::Status::SHOULD_WAIT) => {
204                    let mut signals =
205                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
206                    if !is_queue_empty {
207                        signals |= zx::Signals::OBJECT_WRITABLE;
208                    }
209                    let Ok(signals) =
210                        self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE)
211                    else {
212                        return;
213                    };
214                    if signals.contains(zx::Signals::USER_0) {
215                        return;
216                    }
217                    // Clear USER_1 signal if it's set.
218                    if signals.contains(zx::Signals::USER_1) {
219                        let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
220                    }
221                }
222                Err(_) => return,
223            }
224        }
225    }
226
227    fn handle_requests<'a>(&self, requests: impl Iterator<Item = &'a BlockFifoRequest>) {
228        let mut decoded_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
229            unsafe { MaybeUninit::uninit().assume_init() };
230        let mut count = 0;
231        for request in requests {
232            if let Some(r) = self.helper.decode_fifo_request(request) {
233                let operation = match r.operation {
234                    Ok(Operation::CloseVmo) => {
235                        self.send_reply(r.request_tracking, zx::Status::OK);
236                        continue;
237                    }
238                    Ok(operation) => operation,
239                    Err(status) => {
240                        self.send_reply(r.request_tracking, status);
241                        continue;
242                    }
243                };
244                let RequestTracking { group_or_request, trace_flow_id } = r.request_tracking;
245                let mut request_id = group_or_request.into();
246                let vmo = if let Some(vmo) = r.vmo {
247                    let raw_handle = vmo.raw_handle();
248                    self.vmos.lock().insert(request_id, vmo);
249                    request_id = request_id.with_vmo();
250                    UnownedVmo(raw_handle)
251                } else {
252                    UnownedVmo(zx::sys::ZX_HANDLE_INVALID)
253                };
254                decoded_requests[count].write(Request {
255                    request_id,
256                    operation,
257                    trace_flow_id,
258                    vmo,
259                });
260                count += 1;
261            }
262        }
263        if count > 0 {
264            unsafe {
265                (self.manager.callbacks.on_requests)(
266                    self.manager.callbacks.context,
267                    self,
268                    decoded_requests[0].as_mut_ptr(),
269                    count,
270                );
271            }
272        }
273    }
274
275    fn send_reply(&self, tracking: RequestTracking, status: zx::Status) {
276        let response = match self.helper.finish_fifo_request(tracking, status) {
277            Some(response) => response,
278            None => return,
279        };
280        let mut queue = self.queue.lock();
281        if queue.responses.is_empty() {
282            match self.fifo.write_one(&response) {
283                Ok(()) => {
284                    return;
285                }
286                Err(_) => {
287                    // Wake `fifo_loop`.
288                    let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
289                }
290            }
291        }
292        queue.responses.push_back(response);
293    }
294
295    fn terminate(&self) {
296        let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
297        self.abort_handle.abort();
298    }
299}
300
301impl Drop for Session {
302    fn drop(&mut self) {
303        let notify = {
304            let mut open_sessions = self.manager.open_sessions.lock();
305            open_sessions.remove(&(self as *const _ as usize));
306            open_sessions.is_empty()
307        };
308        if notify {
309            self.manager.condvar.notify_all();
310        }
311    }
312}
313
314pub struct BlockServer {
315    server: super::BlockServer<SessionManager>,
316    ehandle: EHandle,
317    abort_handle: AbortHandle,
318}
319
320struct ExecutorMailbox(Mutex<Mail>, Condvar);
321
322impl ExecutorMailbox {
323    /// Returns the old mail.
324    fn post(&self, mail: Mail) -> Mail {
325        let old = std::mem::replace(&mut *self.0.lock(), mail);
326        self.1.notify_all();
327        old
328    }
329
330    fn new() -> Self {
331        Self(Mutex::default(), Condvar::new())
332    }
333}
334
335type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
336
337#[derive(Default)]
338enum Mail {
339    #[default]
340    None,
341    Initialized(EHandle, AbortHandle),
342    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
343    Finished,
344}
345
346impl Drop for BlockServer {
347    fn drop(&mut self) {
348        self.abort_handle.abort();
349        let manager = &self.server.session_manager;
350        let mut mbox = manager.mbox.0.lock();
351        manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
352        manager.terminate();
353        debug_assert!(Arc::strong_count(manager) > 0);
354    }
355}
356
357#[repr(C)]
358pub struct PartitionInfo {
359    pub device_flags: u32,
360    pub start_block: u64,
361    pub block_count: u64,
362    pub block_size: u32,
363    pub type_guid: [u8; 16],
364    pub instance_guid: [u8; 16],
365    pub name: *const c_char,
366    pub flags: u64,
367}
368
369/// cbindgen:no-export
370#[allow(non_camel_case_types)]
371type zx_handle_t = zx::sys::zx_handle_t;
372
373/// cbindgen:no-export
374#[allow(non_camel_case_types)]
375type zx_status_t = zx::sys::zx_status_t;
376
377impl PartitionInfo {
378    unsafe fn to_rust(&self) -> super::DeviceInfo {
379        super::DeviceInfo::Partition(super::PartitionInfo {
380            device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
381            block_range: Some(self.start_block..self.start_block + self.block_count),
382            type_guid: self.type_guid,
383            instance_guid: self.instance_guid,
384            name: if self.name.is_null() {
385                "".to_string()
386            } else {
387                String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
388            },
389            flags: self.flags,
390        })
391    }
392}
393
394/// # Safety
395///
396/// All callbacks in `callbacks` must be safe.
397#[no_mangle]
398pub unsafe extern "C" fn block_server_new(
399    partition_info: &PartitionInfo,
400    callbacks: Callbacks,
401) -> *mut BlockServer {
402    let session_manager = Arc::new(SessionManager {
403        callbacks,
404        open_sessions: Mutex::default(),
405        condvar: Condvar::new(),
406        mbox: ExecutorMailbox::new(),
407        info: partition_info.to_rust(),
408    });
409
410    (session_manager.callbacks.start_thread)(
411        session_manager.callbacks.context,
412        Arc::into_raw(session_manager.clone()) as *const c_void,
413    );
414
415    let mbox = &session_manager.mbox;
416    let mail = {
417        let mut mail = mbox.0.lock();
418        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
419        std::mem::replace(&mut *mail, Mail::None)
420    };
421
422    let block_size = partition_info.block_size;
423    match mail {
424        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
425            server: super::BlockServer::new(block_size, session_manager),
426            ehandle,
427            abort_handle,
428        })),
429        Mail::Finished => std::ptr::null_mut(),
430        _ => unreachable!(),
431    }
432}
433
434/// # Safety
435///
436/// `arg` must be the value passed to the `start_thread` callback.
437#[no_mangle]
438pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
439    let session_manager = &*(arg as *const SessionManager);
440
441    let mut executor = fasync::LocalExecutor::new();
442    let (abort_handle, registration) = AbortHandle::new_pair();
443
444    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
445
446    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
447}
448
449/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
450/// thread is successful or not.
451///
452/// # Safety
453///
454/// `arg` must be the value passed to the `start_thread` callback.
455#[no_mangle]
456pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
457    let mail = {
458        let session_manager = Arc::from_raw(arg as *const SessionManager);
459        debug_assert!(Arc::strong_count(&session_manager) > 0);
460        session_manager.mbox.post(Mail::Finished)
461    };
462
463    if let Mail::AsyncShutdown(server, callback, arg) = mail {
464        std::mem::drop(server);
465        // SAFETY: Whoever supplied the callback must guarantee it's safe.
466        unsafe {
467            callback(arg);
468        }
469    }
470}
471
472/// # Safety
473///
474/// `block_server` must be valid.
475#[no_mangle]
476pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
477    let _ = Box::from_raw(block_server);
478}
479
480/// # Safety
481///
482/// `block_server` must be valid.
483#[no_mangle]
484pub unsafe extern "C" fn block_server_delete_async(
485    block_server: *mut BlockServer,
486    callback: ShutdownCallback,
487    arg: *mut c_void,
488) {
489    let block_server = Box::from_raw(block_server);
490    let session_manager = block_server.server.session_manager.clone();
491    let abort_handle = block_server.abort_handle.clone();
492    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
493    abort_handle.abort();
494}
495
496/// Serves the Volume protocol for this server.  `handle` is consumed.
497///
498/// # Safety
499///
500/// `block_server` and `handle` must be valid.
501#[no_mangle]
502pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
503    let block_server = &*block_server;
504    let ehandle = &block_server.ehandle;
505    let handle = zx::Handle::from_raw(handle);
506    ehandle.global_scope().spawn(async move {
507        let _ = block_server
508            .server
509            .handle_requests(fvolume::VolumeRequestStream::from_channel(
510                fasync::Channel::from_channel(handle.into()),
511            ))
512            .await;
513    });
514}
515
516/// # Safety
517///
518/// `session` must be valid.
519#[no_mangle]
520pub unsafe extern "C" fn block_server_session_run(session: &Session) {
521    session.run();
522}
523
524/// # Safety
525///
526/// `session` must be valid.
527#[no_mangle]
528pub unsafe extern "C" fn block_server_session_release(session: &Session) {
529    session.terminate();
530    Arc::from_raw(session);
531}
532
533/// # Safety
534///
535/// `session` must be valid.
536#[no_mangle]
537pub unsafe extern "C" fn block_server_send_reply(
538    session: &Session,
539    request_id: RequestId,
540    trace_flow_id: Option<NonZero<u64>>,
541    status: zx_status_t,
542) {
543    if request_id.did_have_vmo() {
544        session.vmos.lock().remove(&request_id);
545    }
546    session.send_reply(
547        RequestTracking { group_or_request: request_id.into(), trace_flow_id },
548        zx::Status::from_raw(status),
549    );
550}