1#![warn(rust_2018_idioms)]
8
9mod backing;
10mod binding;
11pub mod logging;
12pub(crate) mod to_escaped_string;
13
14use futures::prelude::*;
15use openthread::prelude::*;
16
17use backing::*;
18use binding::*;
19
20use anyhow::Error;
21use fuchsia_async as fasync;
22use futures::channel::mpsc as fmpsc;
23use lowpan_driver_common::spinel::*;
24use std::cell::RefCell;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::mpsc;
27use std::task::{Context, Poll};
28
29#[allow(unused_imports)]
30use log::{debug, error, info, trace, warn};
31
32const FRAME_READY_BUFFER_LEN: usize = 2;
36
37const UDP_PACKET_MAX_LENGTH: usize = 1280usize;
38
39pub struct PlatformBuilder {
41 pub(crate) thread_netif_index: Option<u32>,
42 pub(crate) backbone_netif_index: Option<u32>,
43}
44
45impl PlatformBuilder {
46 #[must_use]
47 pub fn thread_netif_index(mut self, index: u32) -> Self {
48 self.thread_netif_index = Some(index);
49 self
50 }
51
52 #[must_use]
53 pub fn backbone_netif_index(mut self, index: u32) -> Self {
54 self.backbone_netif_index = Some(index);
55 self
56 }
57
58 pub fn init<SpinelSink, SpinelStream>(
66 self,
67 spinel_sink: SpinelSink,
68 spinel_stream: SpinelStream,
69 ) -> Platform
70 where
71 SpinelSink: SpinelDeviceClient + 'static,
72 SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
73 {
74 Platform::init(self, spinel_sink, spinel_stream)
75 }
76}
77
78pub struct Platform {
86 timer_receiver: fmpsc::Receiver<usize>,
87 rcp_to_ot_frame_ready_receiver: fmpsc::Receiver<()>,
88 nat64_platform_instance: Nat64PlatformInstance,
89 ot_to_rcp_task: fasync::Task<()>,
90 rcp_to_ot_task: fasync::Task<()>,
91}
92
93impl Platform {
94 #[must_use]
95 pub fn build() -> PlatformBuilder {
96 PlatformBuilder { thread_netif_index: None, backbone_netif_index: None }
97 }
98
99 fn init<SpinelSink, SpinelStream>(
101 builder: PlatformBuilder,
102 mut spinel_sink: SpinelSink,
103 mut spinel_stream: SpinelStream,
104 ) -> Self
105 where
106 SpinelSink: SpinelDeviceClient + 'static,
107 SpinelStream: Stream<Item = Result<Vec<u8>, anyhow::Error>> + 'static + Unpin + Send,
108 {
109 let (alarm, timer_receiver) = backing::AlarmInstance::new();
111 let (ot_to_rcp_sender, ot_to_rcp_receiver) = mpsc::channel::<Vec<u8>>();
112 let ot_to_rcp_task = fasync::Task::spawn(async move {
113 spinel_sink.open().await.expect("Unable to open spinel stream");
114 loop {
115 trace!(tag = "ot_to_rcp_task"; "waiting on frame from OpenThread");
116
117 let frame = match ot_to_rcp_receiver.recv() {
118 Ok(frame) => frame,
119 Err(e) => {
120 warn!(
121 tag = "ot_to_rcp_task";
122 "ot_to_rcp_receiver.recv() failed with {:?}", e
123 );
124 break;
125 }
126 };
127
128 trace!(tag = "ot_to_rcp_task"; "sending frame from OpenThread to RCP");
129 if let Err(e) = spinel_sink.send(frame.as_slice()).await {
130 warn!(tag = "ot_to_rcp_task"; "spinel_sink.send() failed with {:?}", e);
131 break;
132 }
133 }
134 });
135
136 let (mut rcp_to_ot_frame_ready_sender, rcp_to_ot_frame_ready_receiver) =
138 fmpsc::channel(FRAME_READY_BUFFER_LEN);
139 let (rcp_to_ot_sender, rcp_to_ot_receiver) = mpsc::channel::<Vec<u8>>();
140 let rcp_to_ot_task = fasync::Task::spawn(async move {
141 while let Some(frame_result) = spinel_stream.next().await {
142 match frame_result {
143 Ok(frame) => {
144 trace!(tag = "rcp_to_ot_task"; "sending frame from RCP to OpenThread");
145
146 if let Err(e) = rcp_to_ot_sender.send(frame) {
147 warn!(
148 tag = "rcp_to_ot_task";
149 "rcp_to_ot_sender.send() failed with {:?}", e
150 );
151 break;
152 }
153
154 match rcp_to_ot_frame_ready_sender.try_send(()) {
156 Ok(()) => {}
157 Err(e) if e.is_full() => {}
158 Err(e) => {
159 warn!(
160 tag = "rcp_to_ot_task";
161 "rcp_to_ot_frame_ready_sender.send() failed with {:?}", e
162 );
163 break;
164 }
165 }
166 }
167 Err(e) => {
168 warn!(tag = "rcp_to_ot_task"; "spinel_stream.next() failed with {:?}", e);
169 break;
170 }
171 }
172 }
173 trace!(tag = "rcp_to_ot_task"; "Stream ended.");
174 });
175
176 let (nat64_prefix_req_sender, nat64_prefix_req_receiver) = fmpsc::unbounded();
177
178 unsafe {
179 PlatformBacking::set_singleton(PlatformBacking {
181 ot_to_rcp_sender: RefCell::new(ot_to_rcp_sender),
182 rcp_to_ot_receiver: RefCell::new(rcp_to_ot_receiver),
183 alarm,
184 netif_index_thread: builder.thread_netif_index,
185 netif_index_backbone: builder.backbone_netif_index,
186 trel: RefCell::new(None),
187 infra_if: InfraIfInstance::new(builder.backbone_netif_index.unwrap_or(0)),
188 is_platform_reset_requested: AtomicBool::new(false),
189 nat64: Nat64Instance::new(nat64_prefix_req_sender),
190 resolver: Resolver::new(),
191 });
192
193 otSysInit(&mut otPlatformConfig { reset_rcp: false } as *mut otPlatformConfig);
195 };
196
197 Platform {
198 timer_receiver,
199 rcp_to_ot_frame_ready_receiver,
200 ot_to_rcp_task,
201 rcp_to_ot_task,
202 nat64_platform_instance: Nat64PlatformInstance::new(nat64_prefix_req_receiver),
203 }
204 }
205}
206
207impl Drop for Platform {
208 fn drop(&mut self) {
209 debug!(tag = "openthread_fuchsia"; "Dropping Platform");
210 unsafe {
211 otSysDeinit();
213 PlatformBacking::drop_singleton()
214 }
215 }
216}
217
218impl ot::Platform for Platform {
219 unsafe fn process_poll(
220 &mut self,
221 instance: &ot::Instance,
222 cx: &mut Context<'_>,
223 ) -> Result<(), Error> {
224 self.process_poll_alarm(instance, cx);
225 self.process_poll_radio(instance, cx);
226 self.process_poll_udp(instance, cx);
227 self.process_poll_trel(instance, cx);
228 self.process_poll_infra_if(instance, cx);
229 self.process_poll_nat64(instance, cx);
230 self.process_poll_tasks(cx);
231 PlatformBacking::as_ref().resolver.process_poll_resolver(instance, cx);
232 if PlatformBacking::as_ref().is_platform_reset_requested.load(Ordering::SeqCst) {
233 return Err(PlatformResetRequested {}.into());
234 }
235 Ok(())
236 }
237}
238
239impl Platform {
240 fn process_poll_radio(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
241 let instance_ptr = instance.as_ot_ptr();
242
243 while let Poll::Ready(Some(())) = self.rcp_to_ot_frame_ready_receiver.poll_next_unpin(cx) {
244 trace!(tag = "rcp"; "Firing platformRadioProcess");
245
246 unsafe {
250 platformSpinelManagerProcess(instance_ptr, std::ptr::null());
251 platformRadioProcess(instance_ptr, std::ptr::null());
252 }
253 }
254 }
255
256 fn process_poll_udp(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
257 for udp_socket in instance.udp_get_sockets() {
258 if let Poll::Ready(Err(err)) = poll_ot_udp_socket(udp_socket, instance, cx) {
260 error!(tag = "udp"; "Error in {:?}: {:?}", udp_socket, err);
261 }
262 }
263 }
264
265 fn process_poll_trel(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
266 {
269 let mut trel = unsafe { PlatformBacking::as_ref() }.trel.borrow_mut();
271 if let Some(trel) = trel.as_mut() {
272 trel.poll(instance, cx);
273 }
274 }
275
276 {
279 let trel = unsafe { PlatformBacking::as_ref() }.trel.borrow();
281 if let Some(trel) = trel.as_ref() {
282 trel.poll_io(instance, cx);
283 }
284 }
285 }
286
287 fn process_poll_infra_if(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
288 let infra_if = unsafe { PlatformBacking::as_ref() }.infra_if.as_ref();
290 if let Some(infra_if) = infra_if {
291 infra_if.poll(instance, cx);
292 }
293 }
294
295 fn process_poll_tasks(&mut self, cx: &mut Context<'_>) {
296 if Poll::Ready(()) == self.rcp_to_ot_task.poll_unpin(cx) {
297 panic!("Platform: rcp_to_ot_task finished unexpectedly");
298 }
299
300 if Poll::Ready(()) == self.ot_to_rcp_task.poll_unpin(cx) {
301 panic!("Platform: ot_to_rcp_task finished unexpectedly");
302 }
303 }
304}