fuchsia_async/handle/zircon/
on_interrupt.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::Stream;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::task::Poll;
9use zx::{sys, AsHandleRef, Interrupt, InterruptKind};
10
11use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
12use futures::task::{AtomicWaker, Context};
13
14struct OnInterruptReceiver {
15    maybe_timestamp: AtomicUsize,
16    task: AtomicWaker,
17}
18
19impl OnInterruptReceiver {
20    fn get_interrupt(&self, cx: &mut Context<'_>) -> Poll<sys::zx_time_t> {
21        let mut timestamp = self.maybe_timestamp.swap(0, Ordering::Relaxed);
22        if timestamp == 0 {
23            // The interrupt did not fire -- register to receive a wakeup when it does.
24            self.task.register(cx.waker());
25            // Check again for a timestamp after registering for a wakeup in case it fired
26            // between registering and the initial load.
27            // NOTE: We might be able to use a weaker ordering because we use AtomicWaker.
28            timestamp = self.maybe_timestamp.swap(0, Ordering::SeqCst);
29        }
30        if timestamp == 0 {
31            Poll::Pending
32        } else {
33            Poll::Ready(timestamp as i64)
34        }
35    }
36
37    fn set_timestamp(&self, timestamp: sys::zx_time_t) {
38        self.maybe_timestamp.store(timestamp as usize, Ordering::SeqCst);
39        self.task.wake();
40    }
41}
42
43impl PacketReceiver for OnInterruptReceiver {
44    fn receive_packet(&self, packet: zx::Packet) {
45        let zx::PacketContents::Interrupt(interrupt) = packet.contents() else {
46            return;
47        };
48        self.set_timestamp(interrupt.timestamp());
49    }
50}
51
52pin_project_lite::pin_project! {
53/// A stream that returns each time an interrupt fires.
54#[must_use = "future streams do nothing unless polled"]
55pub struct OnInterrupt<K: InterruptKind> {
56    interrupt: Interrupt<K>,
57    #[pin]
58    registration: RawReceiverRegistration<OnInterruptReceiver>,
59}
60
61impl<K: InterruptKind> PinnedDrop for OnInterrupt<K> {
62        fn drop(mut this: Pin<&mut Self>) {
63        this.unregister()
64    }
65}
66
67}
68
69impl<K: InterruptKind> OnInterrupt<K> {
70    /// Creates a new OnInterrupt object which will notifications when `interrupt` fires.
71    /// NOTE: This will only work on a port that was created with the BIND_TO_INTERRUPT option.
72    pub fn new(interrupt: Interrupt<K>) -> Self {
73        Self {
74            interrupt,
75            registration: RawReceiverRegistration::new(OnInterruptReceiver {
76                maybe_timestamp: AtomicUsize::new(0),
77                task: AtomicWaker::new(),
78            }),
79        }
80    }
81
82    fn register(
83        mut registration: Pin<&mut RawReceiverRegistration<OnInterruptReceiver>>,
84        interrupt: &Interrupt<K>,
85        cx: Option<&mut Context<'_>>,
86    ) -> Result<(), zx::Status> {
87        registration.as_mut().register(EHandle::local());
88
89        // If a context has been supplied, we must register it now before calling
90        // `bind_port` below to avoid races.
91        if let Some(cx) = cx {
92            registration.receiver().task.register(cx.waker());
93        }
94
95        interrupt.bind_port(registration.port().unwrap(), registration.key().unwrap())?;
96
97        Ok(())
98    }
99
100    fn unregister(self: Pin<&mut Self>) {
101        let mut this = self.project();
102        if let Some((ehandle, key)) = this.registration.as_mut().unregister() {
103            let _ = ehandle.port().cancel(this.interrupt, key);
104        }
105    }
106}
107
108impl<K: InterruptKind> AsHandleRef for OnInterrupt<K> {
109    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
110        self.interrupt.as_handle_ref()
111    }
112}
113
114impl<K: InterruptKind> AsRef<Interrupt<K>> for OnInterrupt<K> {
115    fn as_ref(&self) -> &Interrupt<K> {
116        &self.interrupt
117    }
118}
119
120impl<K: InterruptKind> Stream for OnInterrupt<K> {
121    type Item = Result<zx::BootInstant, zx::Status>;
122    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123        if !self.registration.is_registered() {
124            let mut this = self.project();
125            Self::register(this.registration.as_mut(), this.interrupt, Some(cx))?;
126            Poll::Pending
127        } else {
128            match self.registration.receiver().get_interrupt(cx) {
129                Poll::Ready(timestamp) => {
130                    Poll::Ready(Some(Ok(zx::BootInstant::from_nanos(timestamp))))
131                }
132                Poll::Pending => Poll::Pending,
133            }
134        }
135    }
136}
137
138#[cfg(test)]
139mod test {
140    use super::*;
141    use futures::future::pending;
142
143    #[test]
144    fn wait_for_event() -> Result<(), zx::Status> {
145        let port = zx::Port::create_with_opts(zx::PortOptions::BIND_TO_INTERRUPT);
146        let mut exec = crate::TestExecutor::new_with_port(port);
147        let mut deliver_events =
148            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
149
150        let irq = zx::VirtualInterrupt::create_virtual()?;
151        let mut irq = std::pin::pin!(OnInterrupt::new(irq));
152        let (waker, waker_count) = futures_test::task::new_count_waker();
153        let cx = &mut std::task::Context::from_waker(&waker);
154
155        // Check that `irq` is still pending before the interrupt has fired.
156        assert_eq!(irq.as_mut().poll_next(cx), Poll::Pending);
157        deliver_events();
158        assert_eq!(waker_count, 0);
159        assert_eq!(irq.as_mut().poll_next(cx), Poll::Pending);
160
161        // Trigger the interrupt and check that we receive the same timestamp.
162        let timestamp = zx::BootInstant::from_nanos(10);
163        irq.interrupt.trigger(timestamp)?;
164        deliver_events();
165        assert_eq!(waker_count, 1);
166        let expected: Result<_, zx::Status> = Ok(timestamp);
167        assert_eq!(irq.as_mut().poll_next(cx), Poll::Ready(Some(expected)));
168
169        // Check that we are polling pending now.
170        deliver_events();
171        assert_eq!(irq.as_mut().poll_next(cx), Poll::Pending);
172
173        // Signal a second time to check that the stream works.
174        irq.interrupt.ack()?;
175        let timestamp = zx::BootInstant::from_nanos(20);
176        irq.interrupt.trigger(timestamp)?;
177        deliver_events();
178        let expected: Result<_, zx::Status> = Ok(timestamp);
179        assert_eq!(irq.as_mut().poll_next(cx), Poll::Ready(Some(expected)));
180
181        Ok(())
182    }
183
184    #[test]
185    fn incorrect_port() -> Result<(), zx::Status> {
186        let _exec = crate::TestExecutor::new();
187
188        let irq = zx::VirtualInterrupt::create_virtual()?;
189        let mut irq = std::pin::pin!(OnInterrupt::new(irq));
190        let (waker, _waker_count) = futures_test::task::new_count_waker();
191        let cx = &mut std::task::Context::from_waker(&waker);
192
193        // Polling the interrupt should cause an error.
194        assert_eq!(irq.as_mut().poll_next(cx), Poll::Ready(Some(Err(zx::Status::WRONG_TYPE))));
195
196        Ok(())
197    }
198}