1use anyhow::{Context, Error, Result};
6use fidl::endpoints::{ProtocolMarker, Request, RequestStream};
7use fuchsia_async as fasync;
8use futures::TryStreamExt;
9use log::error;
10
11pub trait RequestHandler<P: ProtocolMarker>: Send {
13 fn handle_request(&self, request: Request<P>) -> Result<(), Error>;
15}
16
17#[async_trait::async_trait]
19pub trait AsyncRequestHandler<P: ProtocolMarker>: Send + Sync {
20 async fn handle_request(&self, request: Request<P>) -> Result<(), Error>;
22}
23
24impl<P, F> RequestHandler<P> for F
25where
26 P: ProtocolMarker,
27 F: Fn(Request<P>) -> Result<(), Error> + Send,
28{
29 fn handle_request(&self, request: Request<P>) -> Result<(), Error> {
30 self(request)
31 }
32}
33
34pub async fn serve<S, H>(mut stream: S, handler: H) -> Result<(), Error>
39where
40 S: RequestStream,
41 H: RequestHandler<S::Protocol>,
42{
43 while let Some(request) = stream
44 .try_next()
45 .await
46 .with_context(|| format!("error reading {} request", S::Protocol::DEBUG_NAME))?
47 {
48 handler
49 .handle_request(request)
50 .with_context(|| format!("error handling {} request", S::Protocol::DEBUG_NAME))?;
51 }
52 Ok(())
53}
54
55pub async fn serve_async<S, H>(mut stream: S, handler: H) -> Result<(), Error>
60where
61 S: RequestStream,
62 S::Ok: Send,
63 H: AsyncRequestHandler<S::Protocol>,
64{
65 while let Some(request) = stream
66 .try_next()
67 .await
68 .with_context(|| format!("error reading {} request", S::Protocol::DEBUG_NAME))?
69 {
70 handler
71 .handle_request(request)
72 .await
73 .with_context(|| format!("error handling {} request", S::Protocol::DEBUG_NAME))?;
74 }
75 Ok(())
76}
77
78pub async fn serve_async_concurrent<S, H>(
83 stream: S,
84 limit: impl Into<Option<usize>>,
85 handler: H,
86) -> Result<(), Error>
87where
88 S: RequestStream,
89 S::Ok: 'static + Send,
90 H: AsyncRequestHandler<S::Protocol> + 'static,
91{
92 let handler = std::sync::Arc::new(handler);
93
94 let fut = stream.try_for_each_concurrent(limit, |request| async {
95 handler
96 .clone()
97 .handle_request(request)
98 .await
99 .with_context(|| format!("error handling {} request", S::Protocol::DEBUG_NAME))
100 .unwrap();
101
102 Ok(())
103 });
104
105 fut.await.with_context(|| format!("error reading {} request", S::Protocol::DEBUG_NAME))?;
106
107 Ok(())
108}
109
110pub fn serve_detached<S, H>(stream: S, handler: H)
123where
124 S: RequestStream + 'static,
125 H: RequestHandler<S::Protocol> + 'static,
126{
127 fasync::Task::spawn(async move {
128 if let Err(err) = serve(stream, handler).await {
129 error!("{:?}", err);
130 }
131 })
132 .detach();
133}
134
135pub fn serve_async_detached<S, H>(stream: S, handler: H)
148where
149 S: RequestStream + 'static,
150 S::Ok: Send,
151 H: AsyncRequestHandler<S::Protocol> + 'static,
152{
153 fasync::Task::spawn(async move {
154 if let Err(err) = serve_async(stream, handler).await {
155 error!("{:?}", err);
156 }
157 })
158 .detach();
159}