fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::mem::{ManuallyDrop, MaybeUninit};
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::encoder::InternalHandleEncoder;
15use fidl_next_codec::{Encode, EncodeError, EncoderExt as _, Wire, wire};
16use fuchsia_loom::sync::Arc;
17use fuchsia_loom::sync::atomic::{AtomicI64, Ordering};
18use pin_project::pin_project;
19
20use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
21use crate::wire::MessageHeader;
22use crate::{
23 Flexibility, FrameworkError, Message, NonBlockingTransport, ProtocolError, SendFuture,
24 Transport,
25};
26
27struct ServerInner<T: Transport> {
28 connection: Connection<T>,
29 epitaph: AtomicI64,
30}
31
32impl<T: Transport> ServerInner<T> {
33 const EPITAPH_NONE: i64 = i64::MAX;
34
35 fn new(shared: T::Shared) -> Self {
36 Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
37 }
38
39 fn close_with_epitaph(&self, epitaph: Option<i32>) {
40 if let Some(epitaph) = epitaph {
41 self.epitaph.store(epitaph as i64, Ordering::Relaxed);
42 }
43 self.connection.stop();
44 }
45
46 fn epitaph(&self) -> Option<i32> {
47 let epitaph = self.epitaph.load(Ordering::Relaxed);
48 if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
49 }
50}
51
52pub struct Server<T: Transport> {
54 inner: Arc<ServerInner<T>>,
55}
56
57impl<T: Transport> Server<T> {
58 pub fn close(&self) {
60 self.inner.close_with_epitaph(None);
61 }
62
63 pub fn close_with_epitaph(&self, epitaph: i32) {
65 self.inner.close_with_epitaph(Some(epitaph));
66 }
67
68 pub fn send_event<W>(
70 &self,
71 ordinal: u64,
72 flexibility: Flexibility,
73 event: impl Encode<W, T::SendBuffer>,
74 ) -> Result<SendFuture<'_, T>, EncodeError>
75 where
76 W: Wire<Constraint = ()>,
77 {
78 let state = self.inner.connection.send_message_raw(|buffer| {
79 buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
80 buffer.encode_next(event)
81 })?;
82
83 Ok(SendFuture::from_raw_parts(&self.inner.connection, state))
84 }
85}
86
87impl<T: Transport> Clone for Server<T> {
88 fn clone(&self) -> Self {
89 Self { inner: self.inner.clone() }
90 }
91}
92
93pub trait ServerHandler<T: Transport>: Send {
99 fn on_one_way(
105 &mut self,
106 message: Message<T>,
107 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
108
109 fn on_two_way(
115 &mut self,
116 message: Message<T>,
117 responder: Responder<T>,
118 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
119}
120
121pub trait LocalServerHandler<T: Transport> {
126 fn on_one_way(
130 &mut self,
131 message: Message<T>,
132 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
133
134 fn on_two_way(
138 &mut self,
139 message: Message<T>,
140 responder: Responder<T>,
141 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
142}
143
144#[repr(transparent)]
146pub struct ServerHandlerToLocalAdapter<H>(pub H);
147
148impl<T, H> LocalServerHandler<T> for ServerHandlerToLocalAdapter<H>
149where
150 T: Transport,
151 H: ServerHandler<T>,
152{
153 #[inline]
154 fn on_one_way(
155 &mut self,
156 message: Message<T>,
157 ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
158 self.0.on_one_way(message)
159 }
160
161 #[inline]
162 fn on_two_way(
163 &mut self,
164 message: Message<T>,
165 responder: Responder<T>,
166 ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
167 self.0.on_two_way(message, responder)
168 }
169}
170
171pub struct ServerDispatcher<T: Transport> {
179 inner: Arc<ServerInner<T>>,
180 exclusive: T::Exclusive,
181 is_terminated: bool,
182}
183
184impl<T: Transport> Drop for ServerDispatcher<T> {
185 fn drop(&mut self) {
186 if !self.is_terminated {
187 unsafe {
189 self.inner.connection.terminate(ProtocolError::Stopped);
190 }
191 }
192 }
193}
194
195impl<T: Transport> ServerDispatcher<T> {
196 pub fn new(transport: T) -> Self {
198 let (shared, exclusive) = transport.split();
199 Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
200 }
201
202 pub fn server(&self) -> Server<T> {
204 Server { inner: self.inner.clone() }
205 }
206
207 pub async fn run<H>(self, handler: H) -> Result<H, ProtocolError<T::Error>>
209 where
210 H: ServerHandler<T>,
211 {
212 self.run_local(ServerHandlerToLocalAdapter(handler)).await.map(|adapter| adapter.0)
215 }
216
217 pub async fn run_local<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
219 where
220 H: LocalServerHandler<T>,
221 {
222 let error = loop {
228 let result = unsafe { self.run_one(&mut handler).await };
230 if let Err(error) = result {
231 break error;
232 }
233 };
234
235 if matches!(error, ProtocolError::Stopped)
238 && let Some(epitaph) = self.inner.epitaph()
239 {
240 let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
245 }
246
247 unsafe {
249 self.inner.connection.terminate(error.clone());
250 }
251 self.is_terminated = true;
252
253 match error {
254 ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
257
258 _ => Err(error),
260 }
261 }
262
263 async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
267 where
268 H: LocalServerHandler<T>,
269 {
270 let buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
272 let mut message = Message::decode(buffer).map_err(ProtocolError::InvalidMessageHeader)?;
273
274 if let Some(txid) = NonZeroU32::new(*message.header().txid) {
275 let responder = Responder { server: self.server(), txid };
276 handler.on_two_way(message, responder).await?;
277 } else {
278 handler.on_one_way(message).await?;
279 }
280
281 Ok(())
282 }
283}
284
285#[must_use = "responders close the underlying FIDL connection when dropped"]
287pub struct Responder<T: Transport> {
288 server: Server<T>,
289 txid: NonZeroU32,
290}
291
292impl<T: Transport> Drop for Responder<T> {
293 fn drop(&mut self) {
294 self.server.close();
295 }
296}
297
298impl<T: Transport> Responder<T> {
299 pub fn respond<W>(
301 self,
302 ordinal: u64,
303 flexibility: Flexibility,
304 response: impl Encode<W, T::SendBuffer>,
305 ) -> Result<RespondFuture<T>, EncodeError>
306 where
307 W: Wire<Constraint = ()>,
308 {
309 let state = self.server.inner.connection.send_message_raw(|buffer| {
310 buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
311 buffer.encode_next(response)
312 })?;
313
314 let this = ManuallyDrop::new(self);
315 let server = unsafe { ptr::read(&this.server) };
318
319 Ok(RespondFuture { server, state })
320 }
321
322 pub fn respond_framework_error(
324 self,
325 ordinal: u64,
326 framework_error: FrameworkError,
327 ) -> Result<RespondFuture<T>, EncodeError> {
328 struct FlexibleResponse {
329 ordinal: u64,
330 framework_error: FrameworkError,
331 }
332
333 unsafe impl<E: InternalHandleEncoder> Encode<wire::Union, E> for FlexibleResponse {
334 fn encode(
335 self,
336 encoder: &mut E,
337 out: &mut MaybeUninit<wire::Union>,
338 _: (),
339 ) -> Result<(), EncodeError> {
340 wire::Union::encode_as_static(
341 self.framework_error as i32,
342 self.ordinal,
343 encoder,
344 out,
345 (),
346 )
347 }
348 }
349
350 self.respond(ordinal, Flexibility::Flexible, FlexibleResponse { ordinal, framework_error })
351 }
352}
353
354#[must_use = "futures do nothing unless polled"]
356#[pin_project]
357pub struct RespondFuture<T: Transport> {
358 server: Server<T>,
359 #[pin]
360 state: SendFutureState<T>,
361}
362
363impl<T: Transport> Future for RespondFuture<T> {
364 type Output = SendFutureOutput<T>;
365
366 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367 let this = self.project();
368
369 this.state.poll_send(cx, &this.server.inner.connection)
370 }
371}
372
373impl<T: NonBlockingTransport> RespondFuture<T> {
374 pub fn send_immediately(self) -> SendFutureOutput<T> {
385 self.state.send_immediately(&self.server.inner.connection)
386 }
387}