Skip to main content

input_pipeline/
dispatcher.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::task::Context;
6use fidl_next::ClientEnd;
7use futures::prelude::*;
8use futures::task::Poll;
9use pin_project_lite::pin_project;
10use std::pin::Pin;
11
12#[cfg(feature = "dso")]
13pub use dso::*;
14
15#[cfg(not(feature = "dso"))]
16pub use elf::*;
17
18pin_project! {
19    #[derive(Debug)]
20    #[must_use = "futures do nothing unless polled"]
21    pub struct OnTimeout<F, T, OT> {
22        #[pin]
23        timer: T,
24        #[pin]
25        future: F,
26        on_timeout: Option<OT>,
27    }
28}
29
30impl<F: Future, T, OT> Future for OnTimeout<F, T, OT>
31where
32    T: Future<Output = ()> + 'static,
33    OT: FnOnce() -> F::Output,
34{
35    type Output = F::Output;
36
37    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        let this = self.project();
39        if let Poll::Ready(item) = this.future.poll(cx) {
40            return Poll::Ready(item);
41        }
42        if let Poll::Ready(()) = this.timer.poll(cx) {
43            let ot = this.on_timeout.take().expect("polled with timeout after completion");
44            let item = (ot)();
45            return Poll::Ready(item);
46        }
47        Poll::Pending
48    }
49}
50
51/// A wrapper for a future which will complete with a provided closure when a timeout occurs. This
52/// is forked from [`fuchsia_async::OnTimeout`] because that has a fixed dependency on
53/// [`fuchsia_async::Timer`] which driver dispatcher does not support.
54pub trait TimeoutExt: Future + Sized {
55    fn on_timeout<T, OT>(self, timer: T, on_timeout: OT) -> OnTimeout<Self, T, OT>
56    where
57        T: Future<Output = ()> + 'static,
58        OT: FnOnce() -> Self::Output,
59    {
60        OnTimeout { timer, future: self, on_timeout: Some(on_timeout) }
61    }
62}
63
64impl<F: Future + Sized> TimeoutExt for F {}
65
66#[derive(Clone, Default)]
67pub struct Dispatcher {}
68
69mod dso {
70    #![cfg(feature = "dso")]
71
72    pub use super::*;
73
74    #[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
75    #[repr(transparent)]
76    pub struct MonotonicInstant(zx::MonotonicInstant);
77
78    impl From<zx::MonotonicInstant> for MonotonicInstant {
79        fn from(o: zx::MonotonicInstant) -> Self {
80            Self(o)
81        }
82    }
83
84    impl From<MonotonicInstant> for zx::MonotonicInstant {
85        fn from(o: MonotonicInstant) -> Self {
86            o.0
87        }
88    }
89
90    impl MonotonicInstant {
91        pub fn now() -> Self {
92            Self(zx::MonotonicInstant::get())
93        }
94
95        pub fn into_nanos(&self) -> i64 {
96            self.0.into_nanos()
97        }
98
99        pub fn into_zx(self) -> zx::MonotonicInstant {
100            self.0
101        }
102
103        pub fn after(duration: zx::MonotonicDuration) -> Self {
104            Self(zx::MonotonicInstant::after(duration))
105        }
106    }
107
108    pub type Transport = libasync_fidl::AsyncChannel<Dispatcher>;
109    pub type DriverTransport = fdf_fidl::DriverChannel<fdf::CurrentDispatcher>;
110
111    #[derive(Debug)]
112    pub struct TaskHandle<T> {
113        handle: Option<::libasync::JoinHandle<T>>,
114        detached: bool,
115    }
116
117    use fdf::{OnDispatcher, OnDriverDispatcher};
118
119    impl<T> Drop for TaskHandle<T> {
120        fn drop(&mut self) {
121            if !self.detached {
122                self.handle.as_mut().take().map(|h| {
123                    _ = h.abort();
124                });
125            }
126        }
127    }
128
129    impl TaskHandle<()> {
130        pub fn detach(mut self) {
131            self.detached = true
132        }
133    }
134
135    impl<T: 'static> Future for TaskHandle<T> {
136        type Output = T;
137
138        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139            match self.handle.as_mut().unwrap().poll_unpin(cx) {
140                Poll::Pending => Poll::Pending,
141                Poll::Ready(Ok(t)) => Poll::Ready(t),
142                Poll::Ready(Err(e)) => panic!("TaskHandle: polled unexpected error {e:?}"),
143            }
144        }
145    }
146
147    impl Dispatcher {
148        #[must_use]
149        pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
150        where
151            Self: 'static,
152        {
153            // This should never panic if the dispatcher is valid.
154            TaskHandle {
155                handle: Some(
156                    fdf::CurrentDispatcher.spawn_local(future).expect("Dispatcher::spawn_local"),
157                ),
158                detached: false,
159            }
160        }
161
162        pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
163            let f = fdf::CurrentDispatcher.after_deadline(deadline.into());
164            async move {
165                // This should never panic if the dispatcher is valid.
166                f.await.expect("Dispatcher::after_deadline");
167            }
168        }
169
170        pub fn client_from_zx_channel<P>(
171            client_end: ClientEnd<P, zx::Channel>,
172        ) -> ClientEnd<P, Transport> {
173            libasync_fidl::AsyncChannel::<Dispatcher>::client_from_zx_channel(client_end)
174        }
175    }
176
177    impl fdf::OnDispatcher for Dispatcher {
178        fn on_dispatcher<R>(&self, f: impl FnOnce(Option<fdf::AsyncDispatcherRef<'_>>) -> R) -> R {
179            fdf::CurrentDispatcher.on_dispatcher(f)
180        }
181    }
182}
183
184mod elf {
185    #![cfg(not(feature = "dso"))]
186
187    pub use super::*;
188
189    pub type MonotonicInstant = fuchsia_async::MonotonicInstant;
190
191    pub type Transport = zx::Channel;
192
193    #[derive(Debug)]
194    pub struct TaskHandle<T>(fuchsia_async::Task<T>);
195
196    impl TaskHandle<()> {
197        pub fn detach(self) {
198            self.0.detach();
199        }
200    }
201
202    #[cfg(test)]
203    impl<T: 'static> From<fuchsia_async::Task<T>> for TaskHandle<T> {
204        fn from(task: fuchsia_async::Task<T>) -> Self {
205            Self(task)
206        }
207    }
208
209    impl<T: 'static> Future for TaskHandle<T> {
210        type Output = T;
211
212        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213            match self.0.poll_unpin(cx) {
214                Poll::Ready(t) => Poll::Ready(t),
215                Poll::Pending => Poll::Pending,
216            }
217        }
218    }
219
220    impl Dispatcher {
221        #[must_use]
222        pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
223        where
224            Self: 'static,
225        {
226            TaskHandle(fuchsia_async::Task::local(future))
227        }
228
229        pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
230            fuchsia_async::Timer::new(deadline)
231        }
232
233        pub fn client_from_zx_channel<P>(
234            client_end: fidl_next::ClientEnd<P, zx::Channel>,
235        ) -> ClientEnd<P, Transport> {
236            client_end
237        }
238    }
239}