Skip to main content

fidl_next_protocol/endpoints/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FIDL protocol servers.
6
7use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{
15    AsDecoder as _, DecoderExt as _, Encode, EncodeError, EncoderExt as _, Wire,
16};
17use pin_project::pin_project;
18
19use crate::concurrency::sync::Arc;
20use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
21use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
22use crate::wire::MessageHeader;
23use crate::{Body, Flexibility, NonBlockingTransport, ProtocolError, SendFuture, Transport};
24
25struct ServerInner<T: Transport> {
26    connection: Connection<T>,
27    epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerInner<T> {
31    const EPITAPH_NONE: i64 = i64::MAX;
32
33    fn new(shared: T::Shared) -> Self {
34        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35    }
36
37    fn close_with_epitaph(&self, epitaph: Option<i32>) {
38        if let Some(epitaph) = epitaph {
39            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40        }
41        self.connection.stop();
42    }
43
44    fn epitaph(&self) -> Option<i32> {
45        let epitaph = self.epitaph.load(Ordering::Relaxed);
46        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47    }
48}
49
50/// A server endpoint.
51pub struct Server<T: Transport> {
52    inner: Arc<ServerInner<T>>,
53}
54
55impl<T: Transport> Server<T> {
56    /// Closes the channel from the server end.
57    pub fn close(&self) {
58        self.inner.close_with_epitaph(None);
59    }
60
61    /// Closes the channel from the server end after sending an epitaph message.
62    pub fn close_with_epitaph(&self, epitaph: i32) {
63        self.inner.close_with_epitaph(Some(epitaph));
64    }
65
66    /// Send an event.
67    pub fn send_event<W>(
68        &self,
69        ordinal: u64,
70        flexibility: Flexibility,
71        event: impl Encode<W, T::SendBuffer>,
72    ) -> Result<SendFuture<'_, T>, EncodeError>
73    where
74        W: Wire<Constraint = ()>,
75    {
76        let state = self.inner.connection.send_message_raw(|buffer| {
77            buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
78            buffer.encode_next(event)
79        })?;
80
81        Ok(SendFuture::from_raw_parts(&self.inner.connection, state))
82    }
83}
84
85impl<T: Transport> Clone for Server<T> {
86    fn clone(&self) -> Self {
87        Self { inner: self.inner.clone() }
88    }
89}
90
91/// A type which handles incoming events for a server.
92///
93/// The futures returned by `on_one_way` and `on_two_way` are required to be `Send`. See
94/// `LocalServerHandler` for a version of this trait which does not require the returned futures to
95/// be `Send`.
96pub trait ServerHandler<T: Transport>: Send {
97    /// Handles a received one-way server message.
98    ///
99    /// The client cannot handle more messages until `on_one_way` completes. If
100    /// `on_one_way` may block, or would perform asynchronous work that takes a
101    /// long time, it should offload work to an async task and return.
102    fn on_one_way(
103        &mut self,
104        ordinal: u64,
105        flexibility: Flexibility,
106        body: Body<T>,
107    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
108
109    /// Handles a received two-way server message.
110    ///
111    /// The client cannot handle more messages until `on_two_way` completes. If
112    /// `on_two_way` may block, or would perform asynchronous work that takes a
113    /// long time, it should offload work to an async task and return.
114    fn on_two_way(
115        &mut self,
116        ordinal: u64,
117        flexibility: Flexibility,
118        body: Body<T>,
119        responder: Responder<T>,
120    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
121}
122
123/// A type which handles incoming events for a local server.
124///
125/// This is a variant of [`ServerHandler`] that does not require implementing
126/// `Send` and only supports local-thread executors.
127pub trait LocalServerHandler<T: Transport> {
128    /// Handles a received one-way server message.
129    ///
130    /// See [`ServerHandler::on_one_way`] for more information.
131    fn on_one_way(
132        &mut self,
133        ordinal: u64,
134        flexibility: Flexibility,
135        body: Body<T>,
136    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
137
138    /// Handles a received two-way server message.
139    ///
140    /// See [`ServerHandler::on_two_way`] for more information.
141    fn on_two_way(
142        &mut self,
143        ordinal: u64,
144        flexibility: Flexibility,
145        body: Body<T>,
146        responder: Responder<T>,
147    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
148}
149
150/// An adapter for a [`ServerHandler`] which implements [`LocalServerHandler`].
151#[repr(transparent)]
152pub struct ServerHandlerToLocalAdapter<H>(H);
153
154impl<T, H> LocalServerHandler<T> for ServerHandlerToLocalAdapter<H>
155where
156    T: Transport,
157    H: ServerHandler<T>,
158{
159    #[inline]
160    fn on_one_way(
161        &mut self,
162        ordinal: u64,
163        flexibility: Flexibility,
164        body: Body<T>,
165    ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
166        self.0.on_one_way(ordinal, flexibility, body)
167    }
168
169    #[inline]
170    fn on_two_way(
171        &mut self,
172        ordinal: u64,
173        flexibility: Flexibility,
174        body: Body<T>,
175        responder: Responder<T>,
176    ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
177        self.0.on_two_way(ordinal, flexibility, body, responder)
178    }
179}
180
181/// A dispatcher for a server endpoint.
182///
183/// A server dispatcher receives all of the incoming requests and dispatches them to the server
184/// handler. It acts as the message pump for the server.
185///
186/// The dispatcher must be actively polled to receive requests. If the dispatcher is not
187/// [`run`](ServerDispatcher::run), then requests will not be received.
188pub struct ServerDispatcher<T: Transport> {
189    inner: Arc<ServerInner<T>>,
190    exclusive: T::Exclusive,
191    is_terminated: bool,
192}
193
194impl<T: Transport> Drop for ServerDispatcher<T> {
195    fn drop(&mut self) {
196        if !self.is_terminated {
197            // SAFETY: We checked that the connection has not been terminated.
198            unsafe {
199                self.inner.connection.terminate(ProtocolError::Stopped);
200            }
201        }
202    }
203}
204
205impl<T: Transport> ServerDispatcher<T> {
206    /// Creates a new server from a transport.
207    pub fn new(transport: T) -> Self {
208        let (shared, exclusive) = transport.split();
209        Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
210    }
211
212    /// Returns the dispatcher's server.
213    pub fn server(&self) -> Server<T> {
214        Server { inner: self.inner.clone() }
215    }
216
217    /// Runs the server with the provided handler.
218    pub async fn run<H>(self, handler: H) -> Result<H, ProtocolError<T::Error>>
219    where
220        H: ServerHandler<T>,
221    {
222        // The bounds on `H` prove that the future returned by `run_local` is
223        // `Send`.
224        self.run_local(ServerHandlerToLocalAdapter(handler)).await.map(|adapter| adapter.0)
225    }
226
227    /// Runs the server with the provided local handler.
228    pub async fn run_local<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
229    where
230        H: LocalServerHandler<T>,
231    {
232        // We may assume that the connection has not been terminated because
233        // connections are only terminated by `run` and `drop`. Neither of those
234        // could have been called before this method because `run` consumes
235        // `self` and `drop` is only ever called once.
236
237        let error = loop {
238            // SAFETY: The connection has not been terminated.
239            let result = unsafe { self.run_one(&mut handler).await };
240            if let Err(error) = result {
241                break error;
242            }
243        };
244
245        // If we closed locally, we may have an epitaph to send before
246        // terminating the connection.
247        if matches!(error, ProtocolError::Stopped)
248            && let Some(epitaph) = self.inner.epitaph()
249        {
250            // Note that we don't care whether sending the epitaph succeeds
251            // or fails; it's best-effort.
252
253            // SAFETY: The connection has not been terminated.
254            let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
255        }
256
257        // SAFETY: The connection has not been terminated.
258        unsafe {
259            self.inner.connection.terminate(error.clone());
260        }
261        self.is_terminated = true;
262
263        match error {
264            // We consider servers to have finished successfully if they stop
265            // themselves manually, or if the client disconnects.
266            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
267
268            // Otherwise, the server finished with an error.
269            _ => Err(error),
270        }
271    }
272
273    /// # Safety
274    ///
275    /// The connection must not be terminated.
276    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
277    where
278        H: LocalServerHandler<T>,
279    {
280        // SAFETY: The caller guaranteed that the connection is not terminated.
281        let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
282
283        let header = buffer
284            .as_decoder()
285            .decode_prefix::<MessageHeader>()
286            .map_err(ProtocolError::InvalidMessageHeader)?;
287
288        if let Some(txid) = NonZeroU32::new(*header.txid) {
289            let responder = Responder { server: self.server(), txid };
290            handler
291                .on_two_way(*header.ordinal, header.flexibility(), Body::new(buffer), responder)
292                .await?;
293        } else {
294            handler.on_one_way(*header.ordinal, header.flexibility(), Body::new(buffer)).await?;
295        }
296
297        Ok(())
298    }
299}
300
301/// A responder for a two-way message.
302#[must_use = "responders close the underlying FIDL connection when dropped"]
303pub struct Responder<T: Transport> {
304    server: Server<T>,
305    txid: NonZeroU32,
306}
307
308impl<T: Transport> Drop for Responder<T> {
309    fn drop(&mut self) {
310        self.server.close();
311    }
312}
313
314impl<T: Transport> Responder<T> {
315    /// Send a response to a two-way message.
316    pub fn respond<W>(
317        self,
318        ordinal: u64,
319        flexibility: Flexibility,
320        response: impl Encode<W, T::SendBuffer>,
321    ) -> Result<RespondFuture<T>, EncodeError>
322    where
323        W: Wire<Constraint = ()>,
324    {
325        let state = self.server.inner.connection.send_message_raw(|buffer| {
326            buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
327            buffer.encode_next(response)
328        })?;
329
330        let this = ManuallyDrop::new(self);
331        // SAFETY: `this` is a `ManuallyDrop` and so `server` won't be dropped
332        // twice.
333        let server = unsafe { ptr::read(&this.server) };
334
335        Ok(RespondFuture { server, state })
336    }
337}
338
339/// A future which responds to a request over a connection.
340#[must_use = "futures do nothing unless polled"]
341#[pin_project]
342pub struct RespondFuture<T: Transport> {
343    server: Server<T>,
344    #[pin]
345    state: SendFutureState<T>,
346}
347
348impl<T: Transport> Future for RespondFuture<T> {
349    type Output = SendFutureOutput<T>;
350
351    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
352        let this = self.project();
353
354        this.state.poll_send(cx, &this.server.inner.connection)
355    }
356}
357
358impl<T: NonBlockingTransport> RespondFuture<T> {
359    /// Completes the send operation synchronously and without blocking.
360    ///
361    /// Using this method prevents transports from applying backpressure. Prefer
362    /// awaiting when possible to allow for backpressure.
363    ///
364    /// Because failed sends return immediately, `send_immediately` may observe
365    /// transport closure prematurely. This can manifest as this method
366    /// returning `Err(PeerClosed)` or `Err(Stopped)` when it should have
367    /// returned `Err(PeerClosedWithEpitaph)`. Prefer awaiting when possible for
368    /// correctness.
369    pub fn send_immediately(self) -> SendFutureOutput<T> {
370        self.state.send_immediately(&self.server.inner.connection)
371    }
372}