fuchsia_async/handle/zircon/
on_interrupt.rs1use futures::Stream;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::task::Poll;
9use zx::{sys, AsHandleRef, Interrupt, InterruptKind};
10
11use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
12use futures::task::{AtomicWaker, Context};
13
14struct OnInterruptReceiver {
15 maybe_timestamp: AtomicUsize,
16 task: AtomicWaker,
17}
18
19impl OnInterruptReceiver {
20 fn get_interrupt(&self, cx: &mut Context<'_>) -> Poll<sys::zx_time_t> {
21 let mut timestamp = self.maybe_timestamp.swap(0, Ordering::Relaxed);
22 if timestamp == 0 {
23 self.task.register(cx.waker());
25 timestamp = self.maybe_timestamp.swap(0, Ordering::SeqCst);
29 }
30 if timestamp == 0 {
31 Poll::Pending
32 } else {
33 Poll::Ready(timestamp as i64)
34 }
35 }
36
37 fn set_timestamp(&self, timestamp: sys::zx_time_t) {
38 self.maybe_timestamp.store(timestamp as usize, Ordering::SeqCst);
39 self.task.wake();
40 }
41}
42
43impl PacketReceiver for OnInterruptReceiver {
44 fn receive_packet(&self, packet: zx::Packet) {
45 let zx::PacketContents::Interrupt(interrupt) = packet.contents() else {
46 return;
47 };
48 self.set_timestamp(interrupt.timestamp());
49 }
50}
51
52pin_project_lite::pin_project! {
53#[must_use = "future streams do nothing unless polled"]
55pub struct OnInterrupt<K: InterruptKind> {
56 interrupt: Interrupt<K>,
57 #[pin]
58 registration: RawReceiverRegistration<OnInterruptReceiver>,
59}
60
61impl<K: InterruptKind> PinnedDrop for OnInterrupt<K> {
62 fn drop(mut this: Pin<&mut Self>) {
63 this.unregister()
64 }
65}
66
67}
68
69impl<K: InterruptKind> OnInterrupt<K> {
70 pub fn new(interrupt: Interrupt<K>) -> Self {
73 Self {
74 interrupt,
75 registration: RawReceiverRegistration::new(OnInterruptReceiver {
76 maybe_timestamp: AtomicUsize::new(0),
77 task: AtomicWaker::new(),
78 }),
79 }
80 }
81
82 fn register(
83 mut registration: Pin<&mut RawReceiverRegistration<OnInterruptReceiver>>,
84 interrupt: &Interrupt<K>,
85 cx: Option<&mut Context<'_>>,
86 ) -> Result<(), zx::Status> {
87 registration.as_mut().register(EHandle::local());
88
89 if let Some(cx) = cx {
92 registration.receiver().task.register(cx.waker());
93 }
94
95 interrupt.bind_port(registration.port().unwrap(), registration.key().unwrap())?;
96
97 Ok(())
98 }
99
100 fn unregister(self: Pin<&mut Self>) {
101 let mut this = self.project();
102 if let Some((ehandle, key)) = this.registration.as_mut().unregister() {
103 let _ = ehandle.port().cancel(this.interrupt, key);
104 }
105 }
106}
107
108impl<K: InterruptKind> AsHandleRef for OnInterrupt<K> {
109 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
110 self.interrupt.as_handle_ref()
111 }
112}
113
114impl<K: InterruptKind> AsRef<Interrupt<K>> for OnInterrupt<K> {
115 fn as_ref(&self) -> &Interrupt<K> {
116 &self.interrupt
117 }
118}
119
120impl<K: InterruptKind> Stream for OnInterrupt<K> {
121 type Item = Result<zx::BootInstant, zx::Status>;
122 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123 if !self.registration.is_registered() {
124 let mut this = self.project();
125 Self::register(this.registration.as_mut(), this.interrupt, Some(cx))?;
126 Poll::Pending
127 } else {
128 match self.registration.receiver().get_interrupt(cx) {
129 Poll::Ready(timestamp) => {
130 Poll::Ready(Some(Ok(zx::BootInstant::from_nanos(timestamp))))
131 }
132 Poll::Pending => Poll::Pending,
133 }
134 }
135 }
136}
137
138#[cfg(test)]
139mod test {
140 use super::*;
141 use futures::future::pending;
142
143 #[test]
144 fn wait_for_event() -> Result<(), zx::Status> {
145 let port = zx::Port::create_with_opts(zx::PortOptions::BIND_TO_INTERRUPT);
146 let mut exec = crate::TestExecutor::new_with_port(port);
147 let mut deliver_events =
148 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
149
150 let irq = zx::VirtualInterrupt::create_virtual()?;
151 let mut irq = std::pin::pin!(OnInterrupt::new(irq));
152 let (waker, waker_count) = futures_test::task::new_count_waker();
153 let cx = &mut std::task::Context::from_waker(&waker);
154
155 assert_eq!(irq.as_mut().poll_next(cx), Poll::Pending);
157 deliver_events();
158 assert_eq!(waker_count, 0);
159 assert_eq!(irq.as_mut().poll_next(cx), Poll::Pending);
160
161 let timestamp = zx::BootInstant::from_nanos(10);
163 irq.interrupt.trigger(timestamp)?;
164 deliver_events();
165 assert_eq!(waker_count, 1);
166 let expected: Result<_, zx::Status> = Ok(timestamp);
167 assert_eq!(irq.as_mut().poll_next(cx), Poll::Ready(Some(expected)));
168
169 deliver_events();
171 assert_eq!(irq.as_mut().poll_next(cx), Poll::Pending);
172
173 irq.interrupt.ack()?;
175 let timestamp = zx::BootInstant::from_nanos(20);
176 irq.interrupt.trigger(timestamp)?;
177 deliver_events();
178 let expected: Result<_, zx::Status> = Ok(timestamp);
179 assert_eq!(irq.as_mut().poll_next(cx), Poll::Ready(Some(expected)));
180
181 Ok(())
182 }
183
184 #[test]
185 fn incorrect_port() -> Result<(), zx::Status> {
186 let _exec = crate::TestExecutor::new();
187
188 let irq = zx::VirtualInterrupt::create_virtual()?;
189 let mut irq = std::pin::pin!(OnInterrupt::new(irq));
190 let (waker, _waker_count) = futures_test::task::new_count_waker();
191 let cx = &mut std::task::Context::from_waker(&waker);
192
193 assert_eq!(irq.as_mut().poll_next(cx), Poll::Ready(Some(Err(zx::Status::WRONG_TYPE))));
195
196 Ok(())
197 }
198}