input_pipeline/
dispatcher.rs1use core::task::Context;
6use fidl_next::ClientEnd;
7use futures::prelude::*;
8use futures::task::Poll;
9use pin_project_lite::pin_project;
10use std::pin::Pin;
11
12#[cfg(feature = "dso")]
13pub use dso::*;
14
15#[cfg(not(feature = "dso"))]
16pub use elf::*;
17
18pin_project! {
19 #[derive(Debug)]
20 #[must_use = "futures do nothing unless polled"]
21 pub struct OnTimeout<F, T, OT> {
22 #[pin]
23 timer: T,
24 #[pin]
25 future: F,
26 on_timeout: Option<OT>,
27 }
28}
29
30impl<F: Future, T, OT> Future for OnTimeout<F, T, OT>
31where
32 T: Future<Output = ()> + 'static,
33 OT: FnOnce() -> F::Output,
34{
35 type Output = F::Output;
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 let this = self.project();
39 if let Poll::Ready(item) = this.future.poll(cx) {
40 return Poll::Ready(item);
41 }
42 if let Poll::Ready(()) = this.timer.poll(cx) {
43 let ot = this.on_timeout.take().expect("polled with timeout after completion");
44 let item = (ot)();
45 return Poll::Ready(item);
46 }
47 Poll::Pending
48 }
49}
50
51pub trait TimeoutExt: Future + Sized {
55 fn on_timeout<T, OT>(self, timer: T, on_timeout: OT) -> OnTimeout<Self, T, OT>
56 where
57 T: Future<Output = ()> + 'static,
58 OT: FnOnce() -> Self::Output,
59 {
60 OnTimeout { timer, future: self, on_timeout: Some(on_timeout) }
61 }
62}
63
64impl<F: Future + Sized> TimeoutExt for F {}
65
66#[derive(Clone, Default)]
67pub struct Dispatcher {}
68
69mod dso {
70 #![cfg(feature = "dso")]
71
72 pub use super::*;
73
74 #[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
75 #[repr(transparent)]
76 pub struct MonotonicInstant(zx::MonotonicInstant);
77
78 impl From<zx::MonotonicInstant> for MonotonicInstant {
79 fn from(o: zx::MonotonicInstant) -> Self {
80 Self(o)
81 }
82 }
83
84 impl From<MonotonicInstant> for zx::MonotonicInstant {
85 fn from(o: MonotonicInstant) -> Self {
86 o.0
87 }
88 }
89
90 impl MonotonicInstant {
91 pub fn now() -> Self {
92 Self(zx::MonotonicInstant::get())
93 }
94
95 pub fn into_nanos(&self) -> i64 {
96 self.0.into_nanos()
97 }
98
99 pub fn into_zx(self) -> zx::MonotonicInstant {
100 self.0
101 }
102
103 pub fn after(duration: zx::MonotonicDuration) -> Self {
104 Self(zx::MonotonicInstant::after(duration))
105 }
106 }
107
108 pub type Transport = libasync_fidl::AsyncChannel<Dispatcher>;
109 pub type DriverTransport = fdf_fidl::DriverChannel<fdf::CurrentDispatcher>;
110
111 #[derive(Debug)]
112 pub struct TaskHandle<T> {
113 handle: Option<::libasync::JoinHandle<T>>,
114 detached: bool,
115 }
116
117 use fdf::{OnDispatcher, OnDriverDispatcher};
118
119 impl<T> Drop for TaskHandle<T> {
120 fn drop(&mut self) {
121 if !self.detached {
122 self.handle.as_mut().take().map(|h| {
123 _ = h.abort();
124 });
125 }
126 }
127 }
128
129 impl TaskHandle<()> {
130 pub fn detach(mut self) {
131 self.detached = true
132 }
133 }
134
135 impl<T: 'static> Future for TaskHandle<T> {
136 type Output = T;
137
138 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139 match self.handle.as_mut().unwrap().poll_unpin(cx) {
140 Poll::Pending => Poll::Pending,
141 Poll::Ready(Ok(t)) => Poll::Ready(t),
142 Poll::Ready(Err(e)) => panic!("TaskHandle: polled unexpected error {e:?}"),
143 }
144 }
145 }
146
147 impl Dispatcher {
148 #[must_use]
149 pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
150 where
151 Self: 'static,
152 {
153 TaskHandle {
155 handle: Some(
156 fdf::CurrentDispatcher.spawn_local(future).expect("Dispatcher::spawn_local"),
157 ),
158 detached: false,
159 }
160 }
161
162 pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
163 let f = fdf::CurrentDispatcher.after_deadline(deadline.into());
164 async move {
165 f.await.expect("Dispatcher::after_deadline");
167 }
168 }
169
170 pub fn client_from_zx_channel<P>(
171 client_end: ClientEnd<P, zx::Channel>,
172 ) -> ClientEnd<P, Transport> {
173 libasync_fidl::AsyncChannel::<Dispatcher>::client_from_zx_channel(client_end)
174 }
175 }
176
177 impl fdf::OnDispatcher for Dispatcher {
178 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<fdf::AsyncDispatcherRef<'_>>) -> R) -> R {
179 fdf::CurrentDispatcher.on_dispatcher(f)
180 }
181 }
182}
183
184mod elf {
185 #![cfg(not(feature = "dso"))]
186
187 pub use super::*;
188
189 pub type MonotonicInstant = fuchsia_async::MonotonicInstant;
190
191 pub type Transport = zx::Channel;
192
193 #[derive(Debug)]
194 pub struct TaskHandle<T>(fuchsia_async::Task<T>);
195
196 impl TaskHandle<()> {
197 pub fn detach(self) {
198 self.0.detach();
199 }
200 }
201
202 #[cfg(test)]
203 impl<T: 'static> From<fuchsia_async::Task<T>> for TaskHandle<T> {
204 fn from(task: fuchsia_async::Task<T>) -> Self {
205 Self(task)
206 }
207 }
208
209 impl<T: 'static> Future for TaskHandle<T> {
210 type Output = T;
211
212 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213 match self.0.poll_unpin(cx) {
214 Poll::Ready(t) => Poll::Ready(t),
215 Poll::Pending => Poll::Pending,
216 }
217 }
218 }
219
220 impl Dispatcher {
221 #[must_use]
222 pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
223 where
224 Self: 'static,
225 {
226 TaskHandle(fuchsia_async::Task::local(future))
227 }
228
229 pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
230 fuchsia_async::Timer::new(deadline)
231 }
232
233 pub fn client_from_zx_channel<P>(
234 client_end: fidl_next::ClientEnd<P, zx::Channel>,
235 ) -> ClientEnd<P, Transport> {
236 client_end
237 }
238 }
239}