block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::WriteFlags;
6
7use super::{
8    ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
9    SessionHelper, TraceFlowId,
10};
11use anyhow::Error;
12use block_protocol::{BlockFifoRequest, BlockFifoResponse};
13use fidl::endpoints::RequestStream;
14use fidl_fuchsia_storage_block as fblock;
15use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
16use fuchsia_async::{self as fasync, EHandle};
17use fuchsia_sync::{Condvar, Mutex};
18use futures::TryStreamExt;
19use futures::stream::{AbortHandle, Abortable};
20use std::borrow::Cow;
21use std::collections::{HashMap, VecDeque};
22use std::ffi::{CStr, c_char, c_void};
23use std::mem::MaybeUninit;
24use std::num::NonZero;
25use std::ops::{Deref, DerefMut};
26use std::sync::{Arc, Weak};
27
28pub struct SessionManager {
29    callbacks: Callbacks,
30    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
31    open_sessions_condvar: Condvar,
32    active_requests: ActiveRequests<Arc<Session>>,
33    inflight_requests: Mutex<usize>,
34    no_inflight_requests_condvar: Condvar,
35    mbox: ExecutorMailbox,
36    info: super::DeviceInfo,
37}
38
39unsafe impl Send for SessionManager {}
40unsafe impl Sync for SessionManager {}
41
42impl SessionManager {
43    fn submit_requests(&self, requests: &mut [Request]) {
44        *self.inflight_requests.lock() += requests.len();
45        // SAFETY: `request` points to a valid array of `requests.len()` elements.
46        // The callback implementation is assumed to uphold its contract.
47        unsafe {
48            (self.callbacks.on_requests)(
49                self.callbacks.context,
50                std::ptr::from_mut(&mut requests[0]),
51                requests.len(),
52            )
53        }
54    }
55
56    /// Waits for there to be no requests in-flight.
57    ///
58    /// NOTE: To void TOCTOUs, this must be called on the same thread which calls
59    /// [`Self::submit_requests`].
60    fn wait_for_no_inflight_requests(&self) {
61        let mut guard = self.inflight_requests.lock();
62        self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
63    }
64
65    /// Called instead of `[Self::complete_request]` when a request is completed before it was
66    /// actually submitted.
67    fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
68        if let Some((session, response)) =
69            self.active_requests.complete_and_take_response(request_id, status)
70        {
71            session.send_response(response);
72        }
73    }
74
75    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
76        let notify = {
77            let mut inflight_requests = self.inflight_requests.lock();
78            *inflight_requests -= 1;
79            *inflight_requests == 0
80        };
81        self.complete_unsubmitted_request(request_id, status);
82        if notify {
83            self.no_inflight_requests_condvar.notify_all();
84        }
85    }
86
87    fn terminate(&self) {
88        {
89            // We must drop references to sessions whilst we're not holding the lock for
90            // `open_sessions` because `Session::drop` needs to take that same lock.
91            #[allow(clippy::collection_is_never_read)]
92            let mut terminated_sessions = Vec::new();
93            for (_, session) in &*self.open_sessions.lock() {
94                if let Some(session) = session.upgrade() {
95                    session.terminate();
96                    terminated_sessions.push(session);
97                }
98            }
99        }
100        let mut guard = self.open_sessions.lock();
101        self.open_sessions_condvar.wait_while(&mut guard, |s| !s.is_empty());
102    }
103}
104
105impl super::SessionManager for SessionManager {
106    const SUPPORTS_DECOMPRESSION: bool = false;
107    type Session = Arc<Session>;
108
109    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
110        Ok(())
111    }
112
113    async fn open_session(
114        self: Arc<Self>,
115        mut stream: fblock::SessionRequestStream,
116        offset_map: OffsetMap,
117        block_size: u32,
118    ) -> Result<(), Error> {
119        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
120        let (abort_handle, registration) = AbortHandle::new_pair();
121        let session = Arc::new(Session {
122            manager: self.clone(),
123            helper,
124            fifo,
125            queue: Mutex::default(),
126            abort_handle,
127        });
128        self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
129        unsafe {
130            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
131        }
132
133        let result = Abortable::new(
134            async {
135                while let Some(request) = stream.try_next().await? {
136                    session.helper.handle_request(request).await?;
137                }
138                Ok(())
139            },
140            registration,
141        )
142        .await
143        .unwrap_or_else(|e| Err(e.into()));
144
145        let _ = session.fifo.signal(zx::Signals::empty(), zx::Signals::USER_0);
146
147        result
148    }
149
150    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
151        Cow::Borrowed(&self.info)
152    }
153
154    fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
155        &self.active_requests
156    }
157}
158
159impl Drop for SessionManager {
160    fn drop(&mut self) {
161        self.terminate();
162    }
163}
164
165impl IntoSessionManager for Arc<SessionManager> {
166    type SM = SessionManager;
167
168    fn into_session_manager(self) -> Self {
169        self
170    }
171}
172
173#[repr(C)]
174pub struct Callbacks {
175    /// An opaque context object retained by this library.  The library will pass this back into all
176    /// callbacks.  The memory pointed to by `context` must last until [`block_server_delete`] is
177    /// called.
178    pub context: *mut c_void,
179    /// Starts a thread.  The implementation must call [`block_server_thread`] on this newly created
180    /// thread, providing `arg`.  The implementation must then call [`block_server_thread_delete`]
181    /// after [`block_server_thread`] returns (but before [`block_server_delete`] is called).
182    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
183    /// Notifies the implementation of a new session.  The implementation must call
184    /// [`block_server_session_run`] on a separate thread, and must call
185    /// [`block_server_session_release`] after [`block_server_session_run`] (but before
186    /// [`block_server_delete`] is called).
187    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
188    /// Submits a batch of requests to be handled by the implementation.  The implementation must
189    /// not retain references to `requests` after it returns.  The implementation must ensure that
190    /// [`block_server_send_reply`] is called exactly once with the request ID of each entry in
191    /// `requests`, regardless of its status; this call can be asynchronous but must occur before
192    /// [`block_server_delete`] is called.
193    pub on_requests:
194        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
195    /// Logs `message` to the implementation's logger.  The implementation must not retain
196    /// references to `message`.
197    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
198}
199
200impl Callbacks {
201    #[allow(dead_code)]
202    fn log(&self, msg: &str) {
203        let msg = msg.as_bytes();
204        // SAFETY: This is safe if `context` and `log` are good.
205        unsafe {
206            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
207        }
208    }
209}
210
211/// cbindgen:no-export
212#[allow(dead_code)]
213pub struct UnownedVmo(zx::sys::zx_handle_t);
214
215#[repr(C)]
216pub struct Request {
217    pub request_id: RequestId,
218    pub operation: Operation,
219    pub trace_flow_id: TraceFlowId,
220    pub vmo: UnownedVmo,
221}
222
223unsafe impl Send for Callbacks {}
224unsafe impl Sync for Callbacks {}
225
226pub struct Session {
227    manager: Arc<SessionManager>,
228    helper: SessionHelper<SessionManager>,
229    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
230    queue: Mutex<SessionQueue>,
231    abort_handle: AbortHandle,
232}
233
234#[derive(Default)]
235struct SessionQueue {
236    responses: VecDeque<BlockFifoResponse>,
237}
238
239pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
240
241impl Session {
242    fn run(self: &Arc<Self>) {
243        self.fifo_loop();
244        self.abort_handle.abort();
245        self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
246    }
247
248    fn fifo_loop(self: &Arc<Self>) {
249        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
250
251        loop {
252            // Send queued responses.
253            let is_queue_empty = {
254                let mut queue = self.queue.lock();
255                while !queue.responses.is_empty() {
256                    let (front, _) = queue.responses.as_slices();
257                    match self.fifo.write(front) {
258                        Ok(count) => {
259                            let full = count < front.len();
260                            queue.responses.drain(..count);
261                            if full {
262                                break;
263                            }
264                        }
265                        Err(zx::Status::SHOULD_WAIT) => break,
266                        Err(_) => return,
267                    }
268                }
269                queue.responses.is_empty()
270            };
271
272            // Process pending reads.
273            match self.fifo.read_uninit(&mut requests) {
274                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
275                Err(zx::Status::SHOULD_WAIT) => {
276                    let mut signals =
277                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
278                    if !is_queue_empty {
279                        signals |= zx::Signals::OBJECT_WRITABLE;
280                    }
281                    let Ok(signals) =
282                        self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
283                    else {
284                        return;
285                    };
286                    if signals.contains(zx::Signals::USER_0) {
287                        return;
288                    }
289                    // Clear USER_1 signal if it's set.
290                    if signals.contains(zx::Signals::USER_1) {
291                        let _ = self.fifo.signal(zx::Signals::USER_1, zx::Signals::empty());
292                    }
293                }
294                Err(_) => return,
295            }
296        }
297    }
298
299    /// Synchronously performs a device flush.
300    fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
301        let trace_flow_id = {
302            let mut request = self.manager.active_requests.request(request_id);
303            if let Some(id) = request.trace_flow_id {
304                fuchsia_trace::async_instant!(
305                    fuchsia_trace::Id::from(id.get()),
306                    c"storage",
307                    c"block_server::SimulatedBarrier",
308                    "request_id" => request_id.0
309                );
310            }
311            request.count += 1;
312            request.trace_flow_id
313        };
314        self.manager.submit_requests(&mut [Request {
315            request_id,
316            operation: Operation::Flush,
317            trace_flow_id,
318            vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
319        }]);
320        self.manager.wait_for_no_inflight_requests();
321        let status = self.manager.active_requests.request(request_id).status;
322        match status {
323            zx::Status::OK => Ok(()),
324            status => {
325                // Respond for the unsubmitted request too.
326                self.manager.complete_unsubmitted_request(request_id, status);
327                Err(status)
328            }
329        }
330    }
331
332    /// Synchronously completes `decoded_requests`, and inserts a post-flush into `decoded_requests`
333    /// to be submitted later.
334    fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
335        if decoded_requests.len() > 0 {
336            self.manager.submit_requests(decoded_requests);
337            decoded_requests.clear();
338        }
339        self.manager.wait_for_no_inflight_requests();
340        let request = self.manager.active_requests.request(request_id);
341        match request.status {
342            zx::Status::OK => decoded_requests.push(Request {
343                request_id,
344                operation: Operation::Flush,
345                trace_flow_id: request.trace_flow_id,
346                vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
347            }),
348            status => {
349                drop(request);
350                self.manager.complete_unsubmitted_request(request_id, status)
351            }
352        }
353    }
354
355    fn handle_requests<'a>(
356        self: &Arc<Self>,
357        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
358    ) {
359        let mut decoded_requests = DecodedRequests::default();
360        for request in requests {
361            match self.helper.decode_fifo_request(self.clone(), request) {
362                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
363                    self.manager.complete_unsubmitted_request(request_id, zx::Status::OK);
364                }
365                Ok(mut request) => {
366                    let request_id = request.request_id;
367                    // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier
368                    // with a pre-flush.
369                    if !self
370                        .manager
371                        .info
372                        .device_flags()
373                        .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
374                        && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
375                        && self.pre_flush(request_id).is_err()
376                    {
377                        continue;
378                    }
379                    // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with
380                    // a post-flush.
381                    let simulate_fua =
382                        !self.manager.info.device_flags().contains(fblock::DeviceFlag::FUA_SUPPORT)
383                            && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
384                    if simulate_fua {
385                        // Account for the additional request we need at the end.
386                        self.manager.active_requests.request(request_id).count += 1;
387                    }
388
389                    loop {
390                        let result = self.helper.map_request(
391                            request,
392                            &mut self.manager.active_requests.request(request_id),
393                        );
394                        match result {
395                            Ok((
396                                DecodedRequest { request_id, operation, vmo, trace_flow_id },
397                                remainder,
398                            )) => {
399                                // We are handing out unowned references to the VMO here.  This is
400                                // safe because the VMO bin holds references to any closed VMOs
401                                // until all preceding operations have finished.
402                                decoded_requests.push(Request {
403                                    request_id,
404                                    operation,
405                                    trace_flow_id,
406                                    vmo: UnownedVmo(
407                                        vmo.as_ref()
408                                            .map(|vmo| vmo.raw_handle())
409                                            .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
410                                    ),
411                                });
412
413                                if decoded_requests.is_full() {
414                                    self.manager.submit_requests(&mut decoded_requests);
415                                    decoded_requests.clear();
416                                }
417                                if let Some(r) = remainder {
418                                    request = r;
419                                } else {
420                                    break;
421                                }
422                            }
423                            Err(status) => {
424                                self.manager.complete_unsubmitted_request(request_id, status);
425                                break;
426                            }
427                        }
428                    }
429
430                    if simulate_fua {
431                        self.post_flush(request_id, &mut decoded_requests);
432                    }
433                }
434                Err(None) => {}
435                Err(Some(response)) => self.send_response(response),
436            }
437        }
438        if !decoded_requests.is_empty() {
439            self.manager.submit_requests(&mut decoded_requests);
440        }
441    }
442
443    fn send_response(&self, response: BlockFifoResponse) {
444        let mut queue = self.queue.lock();
445        if queue.responses.is_empty() {
446            match self.fifo.write_one(&response) {
447                Ok(()) => {
448                    return;
449                }
450                Err(_) => {
451                    // Wake `fifo_loop`.
452                    let _ = self.fifo.signal(zx::Signals::empty(), zx::Signals::USER_1);
453                }
454            }
455        }
456        queue.responses.push_back(response);
457    }
458
459    fn terminate(&self) {
460        let _ = self.fifo.signal(zx::Signals::empty(), zx::Signals::USER_0);
461        self.abort_handle.abort();
462    }
463}
464
465impl Drop for Session {
466    fn drop(&mut self) {
467        let notify = {
468            let mut open_sessions = self.manager.open_sessions.lock();
469            open_sessions.remove(&(self as *const _ as usize));
470            open_sessions.is_empty()
471        };
472        if notify {
473            self.manager.open_sessions_condvar.notify_all();
474        }
475    }
476}
477
478pub struct BlockServer {
479    server: super::BlockServer<SessionManager>,
480    ehandle: EHandle,
481    abort_handle: AbortHandle,
482}
483
484struct ExecutorMailbox(Mutex<Mail>, Condvar);
485
486impl ExecutorMailbox {
487    /// Returns the old mail.
488    fn post(&self, mail: Mail) -> Mail {
489        let old = std::mem::replace(&mut *self.0.lock(), mail);
490        self.1.notify_all();
491        old
492    }
493
494    fn new() -> Self {
495        Self(Mutex::default(), Condvar::new())
496    }
497}
498
499type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
500
501#[derive(Default)]
502enum Mail {
503    #[default]
504    None,
505    Initialized(EHandle, AbortHandle),
506    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
507    Finished,
508}
509
510impl Drop for BlockServer {
511    fn drop(&mut self) {
512        self.abort_handle.abort();
513        let manager = &self.server.session_manager;
514        let mut mbox = manager.mbox.0.lock();
515        manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
516        manager.terminate();
517        debug_assert!(Arc::strong_count(manager) > 0);
518    }
519}
520
521#[repr(C)]
522pub struct PartitionInfo {
523    pub device_flags: u32,
524    pub start_block: u64,
525    pub block_count: u64,
526    pub block_size: u32,
527    pub type_guid: [u8; 16],
528    pub instance_guid: [u8; 16],
529    pub name: *const c_char,
530    pub flags: u64,
531    pub max_transfer_size: u32,
532}
533
534/// cbindgen:no-export
535#[allow(non_camel_case_types)]
536type zx_handle_t = zx::sys::zx_handle_t;
537
538/// cbindgen:no-export
539#[allow(non_camel_case_types)]
540type zx_status_t = zx::sys::zx_status_t;
541
542impl PartitionInfo {
543    /// # Safety
544    ///
545    /// [`self.name`] must point to valid, null-terminated C-string, or be a nullptr.
546    unsafe fn to_rust(&self) -> super::DeviceInfo {
547        super::DeviceInfo::Partition(super::PartitionInfo {
548            device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
549            block_range: Some(self.start_block..self.start_block + self.block_count),
550            type_guid: self.type_guid,
551            instance_guid: self.instance_guid,
552            name: if self.name.is_null() {
553                "".to_string()
554            } else {
555                String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
556            },
557            flags: self.flags,
558            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
559                NonZero::new(self.max_transfer_size / self.block_size)
560            } else {
561                None
562            },
563        })
564    }
565}
566
567struct DecodedRequests {
568    requests: [MaybeUninit<Request>; MAX_REQUESTS],
569    count: usize,
570}
571
572impl Default for DecodedRequests {
573    fn default() -> Self {
574        Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
575    }
576}
577
578impl DecodedRequests {
579    fn push(&mut self, request: Request) {
580        assert!(self.count < MAX_REQUESTS);
581        self.requests[self.count].write(request);
582        self.count += 1;
583    }
584
585    fn is_full(&self) -> bool {
586        self.count == MAX_REQUESTS
587    }
588
589    fn clear(&mut self) {
590        self.count = 0;
591    }
592}
593
594impl Deref for DecodedRequests {
595    type Target = [Request];
596
597    fn deref(&self) -> &Self::Target {
598        // SAFETY: We wrote the request in [`Self::push`].
599        unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
600    }
601}
602
603impl DerefMut for DecodedRequests {
604    fn deref_mut(&mut self) -> &mut Self::Target {
605        // SAFETY: We wrote the request in [`Self::push`].
606        unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
607    }
608}
609
610/// # Safety
611///
612/// All callbacks in `callbacks` must be safe.
613#[unsafe(no_mangle)]
614pub unsafe extern "C" fn block_server_new(
615    partition_info: &PartitionInfo,
616    callbacks: Callbacks,
617) -> *mut BlockServer {
618    let session_manager = Arc::new(SessionManager {
619        callbacks,
620        open_sessions: Mutex::default(),
621        active_requests: ActiveRequests::default(),
622        open_sessions_condvar: Condvar::new(),
623        inflight_requests: Mutex::default(),
624        no_inflight_requests_condvar: Condvar::new(),
625        mbox: ExecutorMailbox::new(),
626        info: unsafe { partition_info.to_rust() },
627    });
628
629    unsafe {
630        (session_manager.callbacks.start_thread)(
631            session_manager.callbacks.context,
632            Arc::into_raw(session_manager.clone()) as *const c_void,
633        );
634    }
635
636    let mbox = &session_manager.mbox;
637    let mail = {
638        let mut mail = mbox.0.lock();
639        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
640        std::mem::replace(&mut *mail, Mail::None)
641    };
642
643    let block_size = partition_info.block_size;
644    match mail {
645        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
646            server: super::BlockServer::new(block_size, session_manager),
647            ehandle,
648            abort_handle,
649        })),
650        Mail::Finished => std::ptr::null_mut(),
651        _ => unreachable!(),
652    }
653}
654
655/// # Safety
656///
657/// `arg` must be the value passed to the `start_thread` callback.
658#[unsafe(no_mangle)]
659pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
660    let session_manager = unsafe { &*(arg as *const SessionManager) };
661
662    let mut executor = fasync::LocalExecutor::default();
663    let (abort_handle, registration) = AbortHandle::new_pair();
664
665    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
666
667    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
668}
669
670/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
671/// thread is successful or not.
672///
673/// # Safety
674///
675/// `arg` must be the value passed to the `start_thread` callback.
676#[unsafe(no_mangle)]
677pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
678    let mail = {
679        let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
680        debug_assert!(Arc::strong_count(&session_manager) > 0);
681        session_manager.mbox.post(Mail::Finished)
682    };
683
684    if let Mail::AsyncShutdown(server, callback, arg) = mail {
685        std::mem::drop(server);
686        // SAFETY: Whoever supplied the callback must guarantee it's safe.
687        unsafe {
688            callback(arg);
689        }
690    }
691}
692
693/// # Safety
694///
695/// `block_server` must be valid.
696#[unsafe(no_mangle)]
697pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
698    let _ = unsafe { Box::from_raw(block_server) };
699}
700
701/// # Safety
702///
703/// `block_server` must be valid.
704#[unsafe(no_mangle)]
705pub unsafe extern "C" fn block_server_delete_async(
706    block_server: *mut BlockServer,
707    callback: ShutdownCallback,
708    arg: *mut c_void,
709) {
710    let block_server = unsafe { Box::from_raw(block_server) };
711    let session_manager = block_server.server.session_manager.clone();
712    let abort_handle = block_server.abort_handle.clone();
713    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
714    abort_handle.abort();
715}
716
717/// Serves the Volume protocol for this server.  `handle` is consumed.
718///
719/// # Safety
720///
721/// `block_server` and `handle` must be valid.
722#[unsafe(no_mangle)]
723pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
724    let block_server = unsafe { &*block_server };
725    let ehandle = &block_server.ehandle;
726    let handle = unsafe { zx::NullableHandle::from_raw(handle) };
727    ehandle.global_scope().spawn(async move {
728        let _ = block_server
729            .server
730            .handle_requests(fblock::BlockRequestStream::from_channel(
731                fasync::Channel::from_channel(handle.into()),
732            ))
733            .await;
734    });
735}
736
737/// # Safety
738///
739/// `session` must be valid.
740#[unsafe(no_mangle)]
741pub unsafe extern "C" fn block_server_session_run(session: &Session) {
742    let session = unsafe { Arc::from_raw(session) };
743    session.run();
744    let _ = Arc::into_raw(session);
745}
746
747/// # Safety
748///
749/// `session` must be valid.
750#[unsafe(no_mangle)]
751pub unsafe extern "C" fn block_server_session_release(session: &Session) {
752    session.terminate();
753    unsafe { Arc::from_raw(session) };
754}
755
756/// # Safety
757///
758/// `block_server` must be valid.
759#[unsafe(no_mangle)]
760pub unsafe extern "C" fn block_server_send_reply(
761    block_server: &BlockServer,
762    request_id: RequestId,
763    status: zx_status_t,
764) {
765    block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
766}