Skip to main content

fidl_next_protocol/endpoints/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FIDL protocol servers.
6
7use core::future::Future;
8use core::mem::{ManuallyDrop, MaybeUninit};
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::{Encode, EncodeError, EncoderExt as _, Wire, wire};
16use fuchsia_loom::sync::Arc;
17use fuchsia_loom::sync::atomic::{AtomicI64, Ordering};
18use pin_project::pin_project;
19
20use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
21use crate::wire::MessageHeader;
22use crate::{
23    Flexibility, FrameworkError, Message, NonBlockingTransport, ProtocolError, SendFuture,
24    Transport,
25};
26
27struct ServerInner<T: Transport> {
28    connection: Connection<T>,
29    epitaph: AtomicI64,
30}
31
32impl<T: Transport> ServerInner<T> {
33    const EPITAPH_NONE: i64 = i64::MAX;
34
35    fn new(shared: T::Shared) -> Self {
36        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
37    }
38
39    fn close_with_epitaph(&self, epitaph: Option<i32>) {
40        if let Some(epitaph) = epitaph {
41            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
42        }
43        self.connection.stop();
44    }
45
46    fn epitaph(&self) -> Option<i32> {
47        let epitaph = self.epitaph.load(Ordering::Relaxed);
48        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
49    }
50}
51
52/// A server endpoint.
53pub struct Server<T: Transport> {
54    inner: Arc<ServerInner<T>>,
55}
56
57impl<T: Transport> Server<T> {
58    /// Closes the channel from the server end.
59    pub fn close(&self) {
60        self.inner.close_with_epitaph(None);
61    }
62
63    /// Closes the channel from the server end after sending an epitaph message.
64    pub fn close_with_epitaph(&self, epitaph: i32) {
65        self.inner.close_with_epitaph(Some(epitaph));
66    }
67
68    /// Send an event.
69    pub fn send_event<W>(
70        &self,
71        ordinal: u64,
72        flexibility: Flexibility,
73        event: impl Encode<W, T::SendBuffer>,
74    ) -> Result<SendFuture<'_, T>, EncodeError>
75    where
76        W: Wire<Constraint = ()>,
77    {
78        let state = self.inner.connection.send_message_raw(|buffer| {
79            buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
80            buffer.encode_next(event)
81        })?;
82
83        Ok(SendFuture::from_raw_parts(&self.inner.connection, state))
84    }
85}
86
87impl<T: Transport> Clone for Server<T> {
88    fn clone(&self) -> Self {
89        Self { inner: self.inner.clone() }
90    }
91}
92
93/// A type which handles incoming events for a server.
94///
95/// The futures returned by `on_one_way` and `on_two_way` are required to be `Send`. See
96/// `LocalServerHandler` for a version of this trait which does not require the returned futures to
97/// be `Send`.
98pub trait ServerHandler<T: Transport>: Send {
99    /// Handles a received one-way server message.
100    ///
101    /// The client cannot handle more messages until `on_one_way` completes. If
102    /// `on_one_way` may block, or would perform asynchronous work that takes a
103    /// long time, it should offload work to an async task and return.
104    fn on_one_way(
105        &mut self,
106        message: Message<T>,
107    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
108
109    /// Handles a received two-way server message.
110    ///
111    /// The client cannot handle more messages until `on_two_way` completes. If
112    /// `on_two_way` may block, or would perform asynchronous work that takes a
113    /// long time, it should offload work to an async task and return.
114    fn on_two_way(
115        &mut self,
116        message: Message<T>,
117        responder: Responder<T>,
118    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
119}
120
121/// A type which handles incoming events for a local server.
122///
123/// This is a variant of [`ServerHandler`] that does not require implementing
124/// `Send` and only supports local-thread executors.
125pub trait LocalServerHandler<T: Transport> {
126    /// Handles a received one-way server message.
127    ///
128    /// See [`ServerHandler::on_one_way`] for more information.
129    fn on_one_way(
130        &mut self,
131        message: Message<T>,
132    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
133
134    /// Handles a received two-way server message.
135    ///
136    /// See [`ServerHandler::on_two_way`] for more information.
137    fn on_two_way(
138        &mut self,
139        message: Message<T>,
140        responder: Responder<T>,
141    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
142}
143
144/// An adapter for a [`ServerHandler`] which implements [`LocalServerHandler`].
145#[repr(transparent)]
146pub struct ServerHandlerToLocalAdapter<H>(pub H);
147
148impl<T, H> LocalServerHandler<T> for ServerHandlerToLocalAdapter<H>
149where
150    T: Transport,
151    H: ServerHandler<T>,
152{
153    #[inline]
154    fn on_one_way(
155        &mut self,
156        message: Message<T>,
157    ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
158        self.0.on_one_way(message)
159    }
160
161    #[inline]
162    fn on_two_way(
163        &mut self,
164        message: Message<T>,
165        responder: Responder<T>,
166    ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
167        self.0.on_two_way(message, responder)
168    }
169}
170
171/// A dispatcher for a server endpoint.
172///
173/// A server dispatcher receives all of the incoming requests and dispatches them to the server
174/// handler. It acts as the message pump for the server.
175///
176/// The dispatcher must be actively polled to receive requests. If the dispatcher is not
177/// [`run`](ServerDispatcher::run), then requests will not be received.
178pub struct ServerDispatcher<T: Transport> {
179    inner: Arc<ServerInner<T>>,
180    exclusive: T::Exclusive,
181    is_terminated: bool,
182}
183
184impl<T: Transport> Drop for ServerDispatcher<T> {
185    fn drop(&mut self) {
186        if !self.is_terminated {
187            // SAFETY: We checked that the connection has not been terminated.
188            unsafe {
189                self.inner.connection.terminate(ProtocolError::Stopped);
190            }
191        }
192    }
193}
194
195impl<T: Transport> ServerDispatcher<T> {
196    /// Creates a new server from a transport.
197    pub fn new(transport: T) -> Self {
198        let (shared, exclusive) = transport.split();
199        Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
200    }
201
202    /// Returns the dispatcher's server.
203    pub fn server(&self) -> Server<T> {
204        Server { inner: self.inner.clone() }
205    }
206
207    /// Runs the server with the provided handler.
208    pub async fn run<H>(self, handler: H) -> Result<H, ProtocolError<T::Error>>
209    where
210        H: ServerHandler<T>,
211    {
212        // The bounds on `H` prove that the future returned by `run_local` is
213        // `Send`.
214        self.run_local(ServerHandlerToLocalAdapter(handler)).await.map(|adapter| adapter.0)
215    }
216
217    /// Runs the server with the provided local handler.
218    pub async fn run_local<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
219    where
220        H: LocalServerHandler<T>,
221    {
222        // We may assume that the connection has not been terminated because
223        // connections are only terminated by `run` and `drop`. Neither of those
224        // could have been called before this method because `run` consumes
225        // `self` and `drop` is only ever called once.
226
227        let error = loop {
228            // SAFETY: The connection has not been terminated.
229            let result = unsafe { self.run_one(&mut handler).await };
230            if let Err(error) = result {
231                break error;
232            }
233        };
234
235        // If we closed locally, we may have an epitaph to send before
236        // terminating the connection.
237        if matches!(error, ProtocolError::Stopped)
238            && let Some(epitaph) = self.inner.epitaph()
239        {
240            // Note that we don't care whether sending the epitaph succeeds
241            // or fails; it's best-effort.
242
243            // SAFETY: The connection has not been terminated.
244            let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
245        }
246
247        // SAFETY: The connection has not been terminated.
248        unsafe {
249            self.inner.connection.terminate(error.clone());
250        }
251        self.is_terminated = true;
252
253        match error {
254            // We consider servers to have finished successfully if they stop
255            // themselves manually, or if the client disconnects.
256            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
257
258            // Otherwise, the server finished with an error.
259            _ => Err(error),
260        }
261    }
262
263    /// # Safety
264    ///
265    /// The connection must not be terminated.
266    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
267    where
268        H: LocalServerHandler<T>,
269    {
270        // SAFETY: The caller guaranteed that the connection is not terminated.
271        let buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
272        let mut message = Message::decode(buffer).map_err(ProtocolError::InvalidMessageHeader)?;
273
274        if let Some(txid) = NonZeroU32::new(*message.header().txid) {
275            let responder = Responder { server: self.server(), txid };
276            handler.on_two_way(message, responder).await?;
277        } else {
278            handler.on_one_way(message).await?;
279        }
280
281        Ok(())
282    }
283}
284
285/// A responder for a two-way message.
286#[must_use = "responders close the underlying FIDL connection when dropped"]
287pub struct Responder<T: Transport> {
288    server: Server<T>,
289    txid: NonZeroU32,
290}
291
292impl<T: Transport> Drop for Responder<T> {
293    fn drop(&mut self) {
294        self.server.close();
295    }
296}
297
298impl<T: Transport> Responder<T> {
299    /// Send a response to a two-way message.
300    pub fn respond<W>(
301        self,
302        ordinal: u64,
303        flexibility: Flexibility,
304        response: impl Encode<W, T::SendBuffer>,
305    ) -> Result<RespondFuture<T>, EncodeError>
306    where
307        W: Wire<Constraint = ()>,
308    {
309        let state = self.server.inner.connection.send_message_raw(|buffer| {
310            buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
311            buffer.encode_next(response)
312        })?;
313
314        let this = ManuallyDrop::new(self);
315        // SAFETY: `this` is a `ManuallyDrop` and so `server` won't be dropped
316        // twice.
317        let server = unsafe { ptr::read(&this.server) };
318
319        Ok(RespondFuture { server, state })
320    }
321
322    /// Send a framework error response to a two-way message.
323    pub fn respond_framework_error(
324        self,
325        ordinal: u64,
326        framework_error: FrameworkError,
327    ) -> Result<RespondFuture<T>, EncodeError> {
328        struct FlexibleResponse {
329            ordinal: u64,
330            framework_error: FrameworkError,
331        }
332
333        unsafe impl<E: InternalHandleEncoder> Encode<wire::Union, E> for FlexibleResponse {
334            fn encode(
335                self,
336                encoder: &mut E,
337                out: &mut MaybeUninit<wire::Union>,
338                _: (),
339            ) -> Result<(), EncodeError> {
340                wire::Union::encode_as_static(
341                    self.framework_error as i32,
342                    self.ordinal,
343                    encoder,
344                    out,
345                    (),
346                )
347            }
348        }
349
350        self.respond(ordinal, Flexibility::Flexible, FlexibleResponse { ordinal, framework_error })
351    }
352}
353
354/// A future which responds to a request over a connection.
355#[must_use = "futures do nothing unless polled"]
356#[pin_project]
357pub struct RespondFuture<T: Transport> {
358    server: Server<T>,
359    #[pin]
360    state: SendFutureState<T>,
361}
362
363impl<T: Transport> Future for RespondFuture<T> {
364    type Output = SendFutureOutput<T>;
365
366    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367        let this = self.project();
368
369        this.state.poll_send(cx, &this.server.inner.connection)
370    }
371}
372
373impl<T: NonBlockingTransport> RespondFuture<T> {
374    /// Completes the send operation synchronously and without blocking.
375    ///
376    /// Using this method prevents transports from applying backpressure. Prefer
377    /// awaiting when possible to allow for backpressure.
378    ///
379    /// Because failed sends return immediately, `send_immediately` may observe
380    /// transport closure prematurely. This can manifest as this method
381    /// returning `Err(PeerClosed)` or `Err(Stopped)` when it should have
382    /// returned `Err(PeerClosedWithEpitaph)`. Prefer awaiting when possible for
383    /// correctness.
384    pub fn send_immediately(self) -> SendFutureOutput<T> {
385        self.state.send_immediately(&self.server.inner.connection)
386    }
387}