fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{Constrained, Encode, EncodeError, EncoderExt as _};
15use pin_project::pin_project;
16
17use crate::concurrency::sync::Arc;
18use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
19use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
20use crate::{ProtocolError, SendFuture, Transport, decode_header, encode_header};
21
22struct ServerInner<T: Transport> {
23 connection: Connection<T>,
24 epitaph: AtomicI64,
25}
26
27impl<T: Transport> ServerInner<T> {
28 const EPITAPH_NONE: i64 = i64::MAX;
29
30 fn new(shared: T::Shared) -> Self {
31 Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
32 }
33
34 fn close_with_epitaph(&self, epitaph: Option<i32>) {
35 if let Some(epitaph) = epitaph {
36 self.epitaph.store(epitaph as i64, Ordering::Relaxed);
37 }
38 self.connection.stop();
39 }
40
41 fn epitaph(&self) -> Option<i32> {
42 let epitaph = self.epitaph.load(Ordering::Relaxed);
43 if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
44 }
45}
46
47pub struct Server<T: Transport> {
49 inner: Arc<ServerInner<T>>,
50}
51
52impl<T: Transport> Server<T> {
53 pub fn close(&self) {
55 self.inner.close_with_epitaph(None);
56 }
57
58 pub fn close_with_epitaph(&self, epitaph: i32) {
60 self.inner.close_with_epitaph(Some(epitaph));
61 }
62
63 pub fn send_event<M>(&self, ordinal: u64, event: M) -> Result<SendFuture<'_, T>, EncodeError>
65 where
66 M: Encode<T::SendBuffer>,
67 M::Encoded: Constrained<Constraint = ()>,
68 {
69 self.inner.connection.send_message(|buffer| {
70 encode_header::<T>(buffer, 0, ordinal)?;
71 buffer.encode_next(event, ())
72 })
73 }
74}
75
76impl<T: Transport> Clone for Server<T> {
77 fn clone(&self) -> Self {
78 Self { inner: self.inner.clone() }
79 }
80}
81
82pub trait ServerHandler<T: Transport> {
88 fn on_one_way(
94 &mut self,
95 ordinal: u64,
96 buffer: T::RecvBuffer,
97 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
98
99 fn on_two_way(
105 &mut self,
106 ordinal: u64,
107 buffer: T::RecvBuffer,
108 responder: Responder<T>,
109 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
110}
111
112pub struct ServerDispatcher<T: Transport> {
120 inner: Arc<ServerInner<T>>,
121 exclusive: T::Exclusive,
122 is_terminated: bool,
123}
124
125impl<T: Transport> Drop for ServerDispatcher<T> {
126 fn drop(&mut self) {
127 if !self.is_terminated {
128 unsafe {
130 self.inner.connection.terminate(ProtocolError::Stopped);
131 }
132 }
133 }
134}
135
136impl<T: Transport> ServerDispatcher<T> {
137 pub fn new(transport: T) -> Self {
139 let (shared, exclusive) = transport.split();
140 Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
141 }
142
143 pub fn server(&self) -> Server<T> {
145 Server { inner: self.inner.clone() }
146 }
147
148 pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
150 where
151 H: ServerHandler<T>,
152 {
153 let error = loop {
159 let result = unsafe { self.run_one(&mut handler).await };
161 if let Err(error) = result {
162 break error;
163 }
164 };
165
166 if matches!(error, ProtocolError::Stopped) {
169 if let Some(epitaph) = self.inner.epitaph() {
170 let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
175 }
176 }
177
178 unsafe {
180 self.inner.connection.terminate(error.clone());
181 }
182 self.is_terminated = true;
183
184 match error {
185 ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
188
189 _ => Err(error),
191 }
192 }
193
194 async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
198 where
199 H: ServerHandler<T>,
200 {
201 let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
203
204 let (txid, ordinal) =
205 decode_header::<T>(&mut buffer).map_err(ProtocolError::InvalidMessageHeader)?;
206 if let Some(txid) = NonZeroU32::new(txid) {
207 let responder = Responder { server: self.server(), txid };
208 handler.on_two_way(ordinal, buffer, responder).await?;
209 } else {
210 handler.on_one_way(ordinal, buffer).await?;
211 }
212
213 Ok(())
214 }
215}
216
217#[must_use = "responders close the underlying FIDL connection when dropped"]
219pub struct Responder<T: Transport> {
220 server: Server<T>,
221 txid: NonZeroU32,
222}
223
224impl<T: Transport> Drop for Responder<T> {
225 fn drop(&mut self) {
226 self.server.close();
227 }
228}
229
230impl<T: Transport> Responder<T> {
231 pub fn respond<M>(self, ordinal: u64, response: M) -> Result<RespondFuture<T>, EncodeError>
233 where
234 M: Encode<T::SendBuffer>,
235 M::Encoded: Constrained<Constraint = ()>,
236 {
237 let state = self.server.inner.connection.send_message_raw(|buffer| {
238 encode_header::<T>(buffer, self.txid.get(), ordinal)?;
239 buffer.encode_next(response, ())
240 })?;
241
242 let this = ManuallyDrop::new(self);
243 let server = unsafe { ptr::read(&this.server) };
246
247 Ok(RespondFuture { server, state })
248 }
249}
250
251#[must_use = "futures do nothing unless polled"]
253#[pin_project]
254pub struct RespondFuture<T: Transport> {
255 server: Server<T>,
256 #[pin]
257 state: SendFutureState<T>,
258}
259
260impl<T: Transport> Future for RespondFuture<T> {
261 type Output = SendFutureOutput<T>;
262
263 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
264 let this = self.project();
265
266 this.state.poll_send(cx, &this.server.inner.connection)
267 }
268}