fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{
15 AsDecoder as _, DecoderExt as _, Encode, EncodeError, EncoderExt as _, Wire,
16};
17use pin_project::pin_project;
18
19use crate::concurrency::sync::Arc;
20use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
21use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
22use crate::wire::MessageHeader;
23use crate::{Body, Flexibility, NonBlockingTransport, ProtocolError, SendFuture, Transport};
24
25struct ServerInner<T: Transport> {
26 connection: Connection<T>,
27 epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerInner<T> {
31 const EPITAPH_NONE: i64 = i64::MAX;
32
33 fn new(shared: T::Shared) -> Self {
34 Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35 }
36
37 fn close_with_epitaph(&self, epitaph: Option<i32>) {
38 if let Some(epitaph) = epitaph {
39 self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40 }
41 self.connection.stop();
42 }
43
44 fn epitaph(&self) -> Option<i32> {
45 let epitaph = self.epitaph.load(Ordering::Relaxed);
46 if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47 }
48}
49
50pub struct Server<T: Transport> {
52 inner: Arc<ServerInner<T>>,
53}
54
55impl<T: Transport> Server<T> {
56 pub fn close(&self) {
58 self.inner.close_with_epitaph(None);
59 }
60
61 pub fn close_with_epitaph(&self, epitaph: i32) {
63 self.inner.close_with_epitaph(Some(epitaph));
64 }
65
66 pub fn send_event<W>(
68 &self,
69 ordinal: u64,
70 flexibility: Flexibility,
71 event: impl Encode<W, T::SendBuffer>,
72 ) -> Result<SendFuture<'_, T>, EncodeError>
73 where
74 W: Wire<Constraint = ()>,
75 {
76 let state = self.inner.connection.send_message_raw(|buffer| {
77 buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
78 buffer.encode_next(event)
79 })?;
80
81 Ok(SendFuture::from_raw_parts(&self.inner.connection, state))
82 }
83}
84
85impl<T: Transport> Clone for Server<T> {
86 fn clone(&self) -> Self {
87 Self { inner: self.inner.clone() }
88 }
89}
90
91pub trait ServerHandler<T: Transport>: Send {
97 fn on_one_way(
103 &mut self,
104 ordinal: u64,
105 flexibility: Flexibility,
106 body: Body<T>,
107 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
108
109 fn on_two_way(
115 &mut self,
116 ordinal: u64,
117 flexibility: Flexibility,
118 body: Body<T>,
119 responder: Responder<T>,
120 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
121}
122
123pub trait LocalServerHandler<T: Transport> {
128 fn on_one_way(
132 &mut self,
133 ordinal: u64,
134 flexibility: Flexibility,
135 body: Body<T>,
136 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
137
138 fn on_two_way(
142 &mut self,
143 ordinal: u64,
144 flexibility: Flexibility,
145 body: Body<T>,
146 responder: Responder<T>,
147 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
148}
149
150#[repr(transparent)]
152pub struct ServerHandlerToLocalAdapter<H>(H);
153
154impl<T, H> LocalServerHandler<T> for ServerHandlerToLocalAdapter<H>
155where
156 T: Transport,
157 H: ServerHandler<T>,
158{
159 #[inline]
160 fn on_one_way(
161 &mut self,
162 ordinal: u64,
163 flexibility: Flexibility,
164 body: Body<T>,
165 ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
166 self.0.on_one_way(ordinal, flexibility, body)
167 }
168
169 #[inline]
170 fn on_two_way(
171 &mut self,
172 ordinal: u64,
173 flexibility: Flexibility,
174 body: Body<T>,
175 responder: Responder<T>,
176 ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
177 self.0.on_two_way(ordinal, flexibility, body, responder)
178 }
179}
180
181pub struct ServerDispatcher<T: Transport> {
189 inner: Arc<ServerInner<T>>,
190 exclusive: T::Exclusive,
191 is_terminated: bool,
192}
193
194impl<T: Transport> Drop for ServerDispatcher<T> {
195 fn drop(&mut self) {
196 if !self.is_terminated {
197 unsafe {
199 self.inner.connection.terminate(ProtocolError::Stopped);
200 }
201 }
202 }
203}
204
205impl<T: Transport> ServerDispatcher<T> {
206 pub fn new(transport: T) -> Self {
208 let (shared, exclusive) = transport.split();
209 Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
210 }
211
212 pub fn server(&self) -> Server<T> {
214 Server { inner: self.inner.clone() }
215 }
216
217 pub async fn run<H>(self, handler: H) -> Result<H, ProtocolError<T::Error>>
219 where
220 H: ServerHandler<T>,
221 {
222 self.run_local(ServerHandlerToLocalAdapter(handler)).await.map(|adapter| adapter.0)
225 }
226
227 pub async fn run_local<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
229 where
230 H: LocalServerHandler<T>,
231 {
232 let error = loop {
238 let result = unsafe { self.run_one(&mut handler).await };
240 if let Err(error) = result {
241 break error;
242 }
243 };
244
245 if matches!(error, ProtocolError::Stopped)
248 && let Some(epitaph) = self.inner.epitaph()
249 {
250 let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
255 }
256
257 unsafe {
259 self.inner.connection.terminate(error.clone());
260 }
261 self.is_terminated = true;
262
263 match error {
264 ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
267
268 _ => Err(error),
270 }
271 }
272
273 async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
277 where
278 H: LocalServerHandler<T>,
279 {
280 let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
282
283 let header = buffer
284 .as_decoder()
285 .decode_prefix::<MessageHeader>()
286 .map_err(ProtocolError::InvalidMessageHeader)?;
287
288 if let Some(txid) = NonZeroU32::new(*header.txid) {
289 let responder = Responder { server: self.server(), txid };
290 handler
291 .on_two_way(*header.ordinal, header.flexibility(), Body::new(buffer), responder)
292 .await?;
293 } else {
294 handler.on_one_way(*header.ordinal, header.flexibility(), Body::new(buffer)).await?;
295 }
296
297 Ok(())
298 }
299}
300
301#[must_use = "responders close the underlying FIDL connection when dropped"]
303pub struct Responder<T: Transport> {
304 server: Server<T>,
305 txid: NonZeroU32,
306}
307
308impl<T: Transport> Drop for Responder<T> {
309 fn drop(&mut self) {
310 self.server.close();
311 }
312}
313
314impl<T: Transport> Responder<T> {
315 pub fn respond<W>(
317 self,
318 ordinal: u64,
319 flexibility: Flexibility,
320 response: impl Encode<W, T::SendBuffer>,
321 ) -> Result<RespondFuture<T>, EncodeError>
322 where
323 W: Wire<Constraint = ()>,
324 {
325 let state = self.server.inner.connection.send_message_raw(|buffer| {
326 buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
327 buffer.encode_next(response)
328 })?;
329
330 let this = ManuallyDrop::new(self);
331 let server = unsafe { ptr::read(&this.server) };
334
335 Ok(RespondFuture { server, state })
336 }
337}
338
339#[must_use = "futures do nothing unless polled"]
341#[pin_project]
342pub struct RespondFuture<T: Transport> {
343 server: Server<T>,
344 #[pin]
345 state: SendFutureState<T>,
346}
347
348impl<T: Transport> Future for RespondFuture<T> {
349 type Output = SendFutureOutput<T>;
350
351 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
352 let this = self.project();
353
354 this.state.poll_send(cx, &this.server.inner.connection)
355 }
356}
357
358impl<T: NonBlockingTransport> RespondFuture<T> {
359 pub fn send_immediately(self) -> SendFutureOutput<T> {
370 self.state.send_immediately(&self.server.inner.connection)
371 }
372}