fidl_next_protocol/endpoints/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FIDL protocol servers.
6
7use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{Constrained, Encode, EncodeError, EncoderExt as _};
15use pin_project::pin_project;
16
17use crate::concurrency::sync::Arc;
18use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
19use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
20use crate::{ProtocolError, SendFuture, Transport, decode_header, encode_header};
21
22struct ServerInner<T: Transport> {
23    connection: Connection<T>,
24    epitaph: AtomicI64,
25}
26
27impl<T: Transport> ServerInner<T> {
28    const EPITAPH_NONE: i64 = i64::MAX;
29
30    fn new(shared: T::Shared) -> Self {
31        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
32    }
33
34    fn close_with_epitaph(&self, epitaph: Option<i32>) {
35        if let Some(epitaph) = epitaph {
36            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
37        }
38        self.connection.stop();
39    }
40
41    fn epitaph(&self) -> Option<i32> {
42        let epitaph = self.epitaph.load(Ordering::Relaxed);
43        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
44    }
45}
46
47/// A server endpoint.
48pub struct Server<T: Transport> {
49    inner: Arc<ServerInner<T>>,
50}
51
52impl<T: Transport> Server<T> {
53    /// Closes the channel from the server end.
54    pub fn close(&self) {
55        self.inner.close_with_epitaph(None);
56    }
57
58    /// Closes the channel from the server end after sending an epitaph message.
59    pub fn close_with_epitaph(&self, epitaph: i32) {
60        self.inner.close_with_epitaph(Some(epitaph));
61    }
62
63    /// Send an event.
64    pub fn send_event<M>(&self, ordinal: u64, event: M) -> Result<SendFuture<'_, T>, EncodeError>
65    where
66        M: Encode<T::SendBuffer>,
67        M::Encoded: Constrained<Constraint = ()>,
68    {
69        self.inner.connection.send_message(|buffer| {
70            encode_header::<T>(buffer, 0, ordinal)?;
71            buffer.encode_next(event, ())
72        })
73    }
74}
75
76impl<T: Transport> Clone for Server<T> {
77    fn clone(&self) -> Self {
78        Self { inner: self.inner.clone() }
79    }
80}
81
82/// A type which handles incoming events for a server.
83///
84/// The futures returned by `on_one_way` and `on_two_way` are required to be `Send`. See
85/// `LocalServerHandler` for a version of this trait which does not require the returned futures to
86/// be `Send`.
87pub trait ServerHandler<T: Transport> {
88    /// Handles a received one-way server message.
89    ///
90    /// The server cannot handle more messages until `on_one_way` completes. If `on_one_way` may
91    /// block, perform asynchronous work, or take a long time to process a message, it should
92    /// offload work to an async task.
93    fn on_one_way(
94        &mut self,
95        ordinal: u64,
96        buffer: T::RecvBuffer,
97    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
98
99    /// Handles a received two-way server message.
100    ///
101    /// The server cannot handle more messages until `on_two_way` completes. If `on_two_way` may
102    /// block, perform asynchronous work, or take a long time to process a message, it should
103    /// offload work to an async task.
104    fn on_two_way(
105        &mut self,
106        ordinal: u64,
107        buffer: T::RecvBuffer,
108        responder: Responder<T>,
109    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
110}
111
112/// A dispatcher for a server endpoint.
113///
114/// A server dispatcher receives all of the incoming requests and dispatches them to the server
115/// handler. It acts as the message pump for the server.
116///
117/// The dispatcher must be actively polled to receive requests. If the dispatcher is not
118/// [`run`](ServerDispatcher::run), then requests will not be received.
119pub struct ServerDispatcher<T: Transport> {
120    inner: Arc<ServerInner<T>>,
121    exclusive: T::Exclusive,
122    is_terminated: bool,
123}
124
125impl<T: Transport> Drop for ServerDispatcher<T> {
126    fn drop(&mut self) {
127        if !self.is_terminated {
128            // SAFETY: We checked that the connection has not been terminated.
129            unsafe {
130                self.inner.connection.terminate(ProtocolError::Stopped);
131            }
132        }
133    }
134}
135
136impl<T: Transport> ServerDispatcher<T> {
137    /// Creates a new server from a transport.
138    pub fn new(transport: T) -> Self {
139        let (shared, exclusive) = transport.split();
140        Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
141    }
142
143    /// Returns the dispatcher's server.
144    pub fn server(&self) -> Server<T> {
145        Server { inner: self.inner.clone() }
146    }
147
148    /// Runs the server with the provided handler.
149    pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
150    where
151        H: ServerHandler<T>,
152    {
153        // We may assume that the connection has not been terminated because
154        // connections are only terminated by `run` and `drop`. Neither of those
155        // could have been called before this method because `run` consumes
156        // `self` and `drop` is only ever called once.
157
158        let error = loop {
159            // SAFETY: The connection has not been terminated.
160            let result = unsafe { self.run_one(&mut handler).await };
161            if let Err(error) = result {
162                break error;
163            }
164        };
165
166        // If we closed locally, we may have an epitaph to send before
167        // terminating the connection.
168        if matches!(error, ProtocolError::Stopped) {
169            if let Some(epitaph) = self.inner.epitaph() {
170                // Note that we don't care whether sending the epitaph succeeds
171                // or fails; it's best-effort.
172
173                // SAFETY: The connection has not been terminated.
174                let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
175            }
176        }
177
178        // SAFETY: The connection has not been terminated.
179        unsafe {
180            self.inner.connection.terminate(error.clone());
181        }
182        self.is_terminated = true;
183
184        match error {
185            // We consider servers to have finished successfully if they stop
186            // themselves manually, or if the client disconnects.
187            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
188
189            // Otherwise, the server finished with an error.
190            _ => Err(error),
191        }
192    }
193
194    /// # Safety
195    ///
196    /// The connection must not be terminated.
197    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
198    where
199        H: ServerHandler<T>,
200    {
201        // SAFETY: The caller guaranteed that the connection is not terminated.
202        let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
203
204        let (txid, ordinal) =
205            decode_header::<T>(&mut buffer).map_err(ProtocolError::InvalidMessageHeader)?;
206        if let Some(txid) = NonZeroU32::new(txid) {
207            let responder = Responder { server: self.server(), txid };
208            handler.on_two_way(ordinal, buffer, responder).await?;
209        } else {
210            handler.on_one_way(ordinal, buffer).await?;
211        }
212
213        Ok(())
214    }
215}
216
217/// A responder for a two-way message.
218#[must_use = "responders close the underlying FIDL connection when dropped"]
219pub struct Responder<T: Transport> {
220    server: Server<T>,
221    txid: NonZeroU32,
222}
223
224impl<T: Transport> Drop for Responder<T> {
225    fn drop(&mut self) {
226        self.server.close();
227    }
228}
229
230impl<T: Transport> Responder<T> {
231    /// Send a response to a two-way message.
232    pub fn respond<M>(self, ordinal: u64, response: M) -> Result<RespondFuture<T>, EncodeError>
233    where
234        M: Encode<T::SendBuffer>,
235        M::Encoded: Constrained<Constraint = ()>,
236    {
237        let state = self.server.inner.connection.send_message_raw(|buffer| {
238            encode_header::<T>(buffer, self.txid.get(), ordinal)?;
239            buffer.encode_next(response, ())
240        })?;
241
242        let this = ManuallyDrop::new(self);
243        // SAFETY: `this` is a `ManuallyDrop` and so `server` won't be dropped
244        // twice.
245        let server = unsafe { ptr::read(&this.server) };
246
247        Ok(RespondFuture { server, state })
248    }
249}
250
251/// A future which responds to a request over a connection.
252#[must_use = "futures do nothing unless polled"]
253#[pin_project]
254pub struct RespondFuture<T: Transport> {
255    server: Server<T>,
256    #[pin]
257    state: SendFutureState<T>,
258}
259
260impl<T: Transport> Future for RespondFuture<T> {
261    type Output = SendFutureOutput<T>;
262
263    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
264        let this = self.project();
265
266        this.state.poll_send(cx, &this.server.inner.connection)
267    }
268}