openthread/ot/
tasklets.rs1use crate::prelude_internal::*;
6
7use std::pin::Pin;
8use std::task::{Context, Poll, Waker};
9
10pub trait Tasklets: Unpin {
15 fn set_waker(&self, waker: Waker);
17
18 fn wake_waker(&self);
20
21 fn process(&self);
23
24 fn has_pending(&self) -> bool;
27}
28
29impl<T: Tasklets + ot::Boxable> Tasklets for ot::Box<T> {
30 fn set_waker(&self, waker: Waker) {
31 self.as_ref().set_waker(waker)
32 }
33
34 fn wake_waker(&self) {
35 self.as_ref().wake_waker()
36 }
37
38 fn process(&self) {
39 self.as_ref().process()
40 }
41
42 fn has_pending(&self) -> bool {
43 self.as_ref().has_pending()
44 }
45}
46
47pub trait ProcessPollAsync {
49 fn process_poll(&self, cx: &mut Context<'_>) -> std::task::Poll<Option<()>>;
52}
53
54impl ProcessPollAsync for ot::Instance {
55 fn process_poll(&self, cx: &mut Context<'_>) -> std::task::Poll<Option<()>> {
56 if let Err(err) = self.platform_poll(cx) {
57 warn!("process_poll terminating: {:?}", err);
58 return std::task::Poll::Ready(None);
59 };
60 self.set_waker(cx.waker().clone());
61 if self.has_pending() {
62 std::task::Poll::Ready({
63 self.process();
64 Some(())
65 })
66 } else {
67 std::task::Poll::Pending
68 }
69 }
70}
71
72#[no_mangle]
74unsafe extern "C" fn otTaskletsSignalPending(instance: *mut otInstance) {
75 trace!("otTaskletsSignalPending");
76 Instance::ref_from_ot_ptr(instance).unwrap().wake_waker();
77}
78
79#[derive(Debug)]
83pub struct TaskletsStream<'a, T: ?Sized>(&'a T);
84impl<T: TaskletsStreamExt + ?Sized> Stream for TaskletsStream<'_, T> {
85 type Item = ();
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 self.0.tasklets_poll(cx)
88 }
89}
90
91pub trait TaskletsStreamExt {
94 fn tasklets_poll(&self, cx: &mut Context<'_>) -> Poll<Option<()>>;
96
97 fn tasklets_stream(&self) -> TaskletsStream<'_, Self> {
99 TaskletsStream(self)
100 }
101}
102
103impl<T: AsRef<ot::Instance>> TaskletsStreamExt for fuchsia_sync::Mutex<T> {
104 fn tasklets_poll(&self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
105 use std::ops::Deref;
106 let guard = self.lock();
107 guard.deref().as_ref().process_poll(cx)
108 }
109}
110
111impl<T: AsRef<ot::Instance>> TaskletsStreamExt for std::sync::Mutex<T> {
112 fn tasklets_poll(&self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
113 use std::ops::Deref;
114 let guard = self.lock().expect("Lock is poisoned");
115 guard.deref().as_ref().process_poll(cx)
116 }
117}