openthread_fuchsia/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This crate contains the rust OpenThread platform implementation for Fuchsia.
6
7#![warn(rust_2018_idioms)]
8
9mod backing;
10mod binding;
11pub mod logging;
12pub(crate) mod to_escaped_string;
13
14use futures::prelude::*;
15use openthread::prelude::*;
16
17use backing::*;
18use binding::*;
19
20use anyhow::Error;
21use fuchsia_async as fasync;
22use futures::channel::mpsc as fmpsc;
23use lowpan_driver_common::spinel::*;
24use std::cell::RefCell;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::mpsc;
27use std::task::{Context, Poll};
28
29#[allow(unused_imports)]
30use log::{debug, error, info, trace, warn};
31
32// Number of entries in the frame-ready channel.
33// This length doesn't need to be very large, as it
34// is effectively just a wakeup flag.
35const FRAME_READY_BUFFER_LEN: usize = 2;
36
37const UDP_PACKET_MAX_LENGTH: usize = 1280usize;
38
39/// Builder type for the OpenThread platform. Create using [`Platform::builder()`].
40pub struct PlatformBuilder {
41    pub(crate) thread_netif_index: Option<u32>,
42    pub(crate) backbone_netif_index: Option<u32>,
43}
44
45impl PlatformBuilder {
46    #[must_use]
47    pub fn thread_netif_index(mut self, index: u32) -> Self {
48        self.thread_netif_index = Some(index);
49        self
50    }
51
52    #[must_use]
53    pub fn backbone_netif_index(mut self, index: u32) -> Self {
54        self.backbone_netif_index = Some(index);
55        self
56    }
57
58    /// Initializes the OpenThread platform.
59    ///
60    /// The instance returned by this method must be passed to
61    /// [`ot::Instance::new()`](openthread_rust::ot::Instance::new).
62    ///
63    /// The returned object is a singleton. Attempting to have more than one instance
64    /// around at a time will cause a panic.
65    pub fn init<SpinelSink, SpinelStream>(
66        self,
67        spinel_sink: SpinelSink,
68        spinel_stream: SpinelStream,
69    ) -> Platform
70    where
71        SpinelSink: SpinelDeviceClient + 'static,
72        SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
73    {
74        Platform::init(self, spinel_sink, spinel_stream)
75    }
76}
77
78/// OpenThread Singleton Platform Implementation.
79///
80/// An instance of this type must be passed to
81/// [`ot::Instance::new()`](openthread_rust::ot::Instance::new).
82///
83/// This type is a singleton. Attempting to init more than a
84/// single instance of `Platform` at a time will cause a panic.
85pub struct Platform {
86    timer_receiver: fmpsc::Receiver<usize>,
87    rcp_to_ot_frame_ready_receiver: fmpsc::Receiver<()>,
88    nat64_platform_instance: Nat64PlatformInstance,
89    ot_to_rcp_task: fasync::Task<()>,
90    rcp_to_ot_task: fasync::Task<()>,
91}
92
93impl Platform {
94    #[must_use]
95    pub fn build() -> PlatformBuilder {
96        PlatformBuilder { thread_netif_index: None, backbone_netif_index: None }
97    }
98
99    /// This method is called by `PlatformBuilder::init`.
100    fn init<SpinelSink, SpinelStream>(
101        builder: PlatformBuilder,
102        mut spinel_sink: SpinelSink,
103        mut spinel_stream: SpinelStream,
104    ) -> Self
105    where
106        SpinelSink: SpinelDeviceClient + 'static,
107        SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
108    {
109        // OpenThread to RCP data-pump and related machinery.
110        let (alarm, timer_receiver) = backing::AlarmInstance::new();
111        let (ot_to_rcp_sender, ot_to_rcp_receiver) = mpsc::channel::<Vec<u8>>();
112        let ot_to_rcp_task = fasync::Task::spawn(async move {
113            spinel_sink.open().await.expect("Unable to open spinel stream");
114            loop {
115                trace!(tag = "ot_to_rcp_task"; "waiting on frame from OpenThread");
116
117                let frame = match ot_to_rcp_receiver.recv() {
118                    Ok(frame) => frame,
119                    Err(e) => {
120                        warn!(
121                            tag = "ot_to_rcp_task";
122                            "ot_to_rcp_receiver.recv() failed with {:?}", e
123                        );
124                        break;
125                    }
126                };
127
128                trace!(tag = "ot_to_rcp_task"; "sending frame from OpenThread to RCP");
129                if let Err(e) = spinel_sink.send(frame.as_slice()).await {
130                    warn!(tag = "ot_to_rcp_task"; "spinel_sink.send() failed with {:?}", e);
131                    break;
132                }
133            }
134        });
135
136        // RCP to OpenThread data-pump and related machinery.
137        let (mut rcp_to_ot_frame_ready_sender, rcp_to_ot_frame_ready_receiver) =
138            fmpsc::channel(FRAME_READY_BUFFER_LEN);
139        let (rcp_to_ot_sender, rcp_to_ot_receiver) = mpsc::channel::<Vec<u8>>();
140        let rcp_to_ot_task = fasync::Task::spawn(async move {
141            while let Some(frame_result) = spinel_stream.next().await {
142                match frame_result {
143                    Ok(frame) => {
144                        trace!(tag = "rcp_to_ot_task"; "sending frame from RCP to OpenThread");
145
146                        if let Err(e) = rcp_to_ot_sender.send(frame) {
147                            warn!(
148                                tag = "rcp_to_ot_task";
149                                "rcp_to_ot_sender.send() failed with {:?}", e
150                            );
151                            break;
152                        }
153
154                        // Notify our `process_poll` that it needs to call `platformRadioProcess`.
155                        match rcp_to_ot_frame_ready_sender.try_send(()) {
156                            Ok(()) => {}
157                            Err(e) if e.is_full() => {}
158                            Err(e) => {
159                                warn!(
160                                    tag = "rcp_to_ot_task";
161                                    "rcp_to_ot_frame_ready_sender.send() failed with {:?}", e
162                                );
163                                break;
164                            }
165                        }
166                    }
167                    Err(e) => {
168                        warn!(tag = "rcp_to_ot_task"; "spinel_stream.next() failed with {:?}", e);
169                        break;
170                    }
171                }
172            }
173            trace!(tag = "rcp_to_ot_task"; "Stream ended.");
174        });
175
176        let (nat64_prefix_req_sender, nat64_prefix_req_receiver) = fmpsc::unbounded();
177
178        unsafe {
179            // Initialize our singleton
180            PlatformBacking::set_singleton(PlatformBacking {
181                ot_to_rcp_sender: RefCell::new(ot_to_rcp_sender),
182                rcp_to_ot_receiver: RefCell::new(rcp_to_ot_receiver),
183                alarm,
184                netif_index_thread: builder.thread_netif_index,
185                netif_index_backbone: builder.backbone_netif_index,
186                trel: RefCell::new(None),
187                infra_if: InfraIfInstance::new(builder.backbone_netif_index.unwrap_or(0)),
188                is_platform_reset_requested: AtomicBool::new(false),
189                nat64: Nat64Instance::new(nat64_prefix_req_sender),
190                resolver: Resolver::new(),
191            });
192
193            // Initialize the lower-level platform implementation
194            otSysInit(&mut otPlatformConfig { reset_rcp: false } as *mut otPlatformConfig);
195        };
196
197        Platform {
198            timer_receiver,
199            rcp_to_ot_frame_ready_receiver,
200            ot_to_rcp_task,
201            rcp_to_ot_task,
202            nat64_platform_instance: Nat64PlatformInstance::new(nat64_prefix_req_receiver),
203        }
204    }
205}
206
207impl Drop for Platform {
208    fn drop(&mut self) {
209        debug!(tag = "openthread_fuchsia"; "Dropping Platform");
210        unsafe {
211            // SAFETY: Both calls below must only be called from Drop.
212            otSysDeinit();
213            PlatformBacking::drop_singleton()
214        }
215    }
216}
217
218impl ot::Platform for Platform {
219    unsafe fn process_poll(
220        &mut self,
221        instance: &ot::Instance,
222        cx: &mut Context<'_>,
223    ) -> Result<(), Error> {
224        self.process_poll_alarm(instance, cx);
225        self.process_poll_radio(instance, cx);
226        self.process_poll_udp(instance, cx);
227        self.process_poll_trel(instance, cx);
228        self.process_poll_infra_if(instance, cx);
229        self.process_poll_nat64(instance, cx);
230        self.process_poll_tasks(cx);
231        PlatformBacking::as_ref().resolver.process_poll_resolver(instance, cx);
232        if PlatformBacking::as_ref().is_platform_reset_requested.load(Ordering::SeqCst) {
233            return Err(PlatformResetRequested {}.into());
234        }
235        Ok(())
236    }
237}
238
239impl Platform {
240    fn process_poll_radio(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
241        let instance_ptr = instance.as_ot_ptr();
242
243        while let Poll::Ready(Some(())) = self.rcp_to_ot_frame_ready_receiver.poll_next_unpin(cx) {
244            trace!(tag = "rcp"; "Firing platformRadioProcess");
245
246            // SAFETY: Must be called with a valid pointer to otInstance,
247            //         must also only be called from the main OpenThread thread,
248            //         which is a guarantee of this method.
249            unsafe {
250                platformSpinelManagerProcess(instance_ptr, std::ptr::null());
251                platformRadioProcess(instance_ptr, std::ptr::null());
252            }
253        }
254    }
255
256    fn process_poll_udp(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
257        for udp_socket in instance.udp_get_sockets() {
258            // This `poll` call comes from the trait `UdpSocketHelpers` in `backing/udp.rs`
259            if let Poll::Ready(Err(err)) = poll_ot_udp_socket(udp_socket, instance, cx) {
260                error!(tag = "udp"; "Error in {:?}: {:?}", udp_socket, err);
261            }
262        }
263    }
264
265    fn process_poll_trel(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
266        // Poll for discovery. This must be `mut` because it calls poll methods internally,
267        // and is not reentrant.
268        {
269            // SAFETY: Guaranteed to only be called from the OpenThread thread.
270            let mut trel = unsafe { PlatformBacking::as_ref() }.trel.borrow_mut();
271            if let Some(trel) = trel.as_mut() {
272                trel.poll(instance, cx);
273            }
274        }
275
276        // Poll for I/O. This is separate from the above poll because it must not be
277        // `mut` so that it can be reentrant.
278        {
279            // SAFETY: Guaranteed to only be called from the OpenThread thread.
280            let trel = unsafe { PlatformBacking::as_ref() }.trel.borrow();
281            if let Some(trel) = trel.as_ref() {
282                trel.poll_io(instance, cx);
283            }
284        }
285    }
286
287    fn process_poll_infra_if(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
288        // SAFETY: Guaranteed to only be called from the OpenThread thread.
289        let infra_if = unsafe { PlatformBacking::as_ref() }.infra_if.as_ref();
290        if let Some(infra_if) = infra_if {
291            infra_if.poll(instance, cx);
292        }
293    }
294
295    fn process_poll_tasks(&mut self, cx: &mut Context<'_>) {
296        if Poll::Ready(()) == self.rcp_to_ot_task.poll_unpin(cx) {
297            panic!("Platform: rcp_to_ot_task finished unexpectedly");
298        }
299
300        if Poll::Ready(()) == self.ot_to_rcp_task.poll_unpin(cx) {
301            panic!("Platform: ot_to_rcp_task finished unexpectedly");
302        }
303    }
304}