openthread/ot/
tasklets.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::prelude_internal::*;
6
7use std::pin::Pin;
8use std::task::{Context, Poll, Waker};
9
10/// Methods from the
11/// [OpenThread "Tasklets" Module](https://openthread.io/reference/group/api-tasklets).
12///
13/// This trait has additional methods to allow for efficient use in asynchronous rust.
14pub trait Tasklets: Unpin {
15    /// Sets the waker to be used to wake up the tasklet future.
16    fn set_waker(&self, waker: Waker);
17
18    /// Wakes the waker previously passed to [`set_waker`].
19    fn wake_waker(&self);
20
21    /// Functional equivalent to [`otsys::otTaskletsProcess`](crate::otsys::otTaskletsProcess).
22    fn process(&self);
23
24    /// Functional equivalent to
25    /// [`otsys::otTaskletsHasPending`](crate::otsys::otTaskletsHasPending).
26    fn has_pending(&self) -> bool;
27}
28
29impl<T: Tasklets + ot::Boxable> Tasklets for ot::Box<T> {
30    fn set_waker(&self, waker: Waker) {
31        self.as_ref().set_waker(waker)
32    }
33
34    fn wake_waker(&self) {
35        self.as_ref().wake_waker()
36    }
37
38    fn process(&self) {
39        self.as_ref().process()
40    }
41
42    fn has_pending(&self) -> bool {
43        self.as_ref().has_pending()
44    }
45}
46
47/// Trait that provides the [`process_poll()`] method.
48pub trait ProcessPollAsync {
49    /// Processes all tasks that need to be handled for this instance,
50    /// including those from the platform implementation.
51    fn process_poll(&self, cx: &mut Context<'_>) -> std::task::Poll<Option<()>>;
52}
53
54impl ProcessPollAsync for ot::Instance {
55    fn process_poll(&self, cx: &mut Context<'_>) -> std::task::Poll<Option<()>> {
56        if let Err(err) = self.platform_poll(cx) {
57            warn!("process_poll terminating: {:?}", err);
58            return std::task::Poll::Ready(None);
59        };
60        self.set_waker(cx.waker().clone());
61        if self.has_pending() {
62            std::task::Poll::Ready({
63                self.process();
64                Some(())
65            })
66        } else {
67            std::task::Poll::Pending
68        }
69    }
70}
71
72/// Hook for signal from OpenThread that there are tasklets that need processing.
73#[no_mangle]
74unsafe extern "C" fn otTaskletsSignalPending(instance: *mut otInstance) {
75    trace!("otTaskletsSignalPending");
76    Instance::ref_from_ot_ptr(instance).unwrap().wake_waker();
77}
78
79/// Provides an asynchronous interface to [`ot::Tasklets`](Tasklets).
80///
81/// Created by [`TaskletsStreamExt::tasklets_stream()`].
82#[derive(Debug)]
83pub struct TaskletsStream<'a, T: ?Sized>(&'a T);
84impl<T: TaskletsStreamExt + ?Sized> Stream for TaskletsStream<'_, T> {
85    type Item = ();
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        self.0.tasklets_poll(cx)
88    }
89}
90
91/// Trait used to wrap around a mutex that polls internally without needing to externally
92/// unlock the mutex.
93pub trait TaskletsStreamExt {
94    /// Method which polls the tasklets for processing updates.
95    fn tasklets_poll(&self, cx: &mut Context<'_>) -> Poll<Option<()>>;
96
97    /// Returns an asynchronous stream which can be used for processing tasklets.
98    fn tasklets_stream(&self) -> TaskletsStream<'_, Self> {
99        TaskletsStream(self)
100    }
101}
102
103impl<T: AsRef<ot::Instance>> TaskletsStreamExt for fuchsia_sync::Mutex<T> {
104    fn tasklets_poll(&self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
105        use std::ops::Deref;
106        let guard = self.lock();
107        guard.deref().as_ref().process_poll(cx)
108    }
109}
110
111impl<T: AsRef<ot::Instance>> TaskletsStreamExt for std::sync::Mutex<T> {
112    fn tasklets_poll(&self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
113        use std::ops::Deref;
114        let guard = self.lock().expect("Lock is poisoned");
115        guard.deref().as_ref().process_poll(cx)
116    }
117}