openthread_fuchsia/backing/
trel.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::*;
6use crate::to_escaped_string::*;
7use anyhow::Context as _;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_net_mdns::*;
10use fuchsia_async::Task;
11use futures::stream::FusedStream;
12use openthread_sys::*;
13use ot::{PlatTrel as _, TrelCounters};
14use std::collections::HashMap;
15use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
16use std::task::{Context, Poll};
17
18pub(crate) struct TrelInstance {
19    socket: fasync::net::UdpSocket,
20    publication_responder: Option<Task<Result<(), anyhow::Error>>>,
21    instance_name: String,
22    peer_instance_sockaddr_map: HashMap<String, ot::SockAddr>,
23
24    #[allow(dead_code)] // This field must be kept around for <https://fxbug.dev/42182233>
25    subscriber: ServiceSubscriber2Proxy,
26
27    subscriber_request_stream: ServiceSubscriptionListenerRequestStream,
28
29    counters: RefCell<TrelCounters>,
30}
31
32// Converts an optional vector of strings to a single DNS-compatible string.
33fn flatten_txt(txt: Option<Vec<Vec<u8>>>) -> Vec<u8> {
34    let mut ret = vec![];
35
36    for mut txt in txt.iter().flat_map(|x| x.iter()).map(Vec::as_slice) {
37        if txt.len() > u8::MAX as usize {
38            // Limit the size of the records to 255 characters.
39            txt = &txt[0..(u8::MAX as usize) + 1];
40        }
41        ret.push(u8::try_from(txt.len()).unwrap());
42        ret.extend_from_slice(txt);
43    }
44
45    ret
46}
47
48/// Converts an iterator over [`fidl_fuchsia_net::SocketAddress`]es to a vector of
49/// [`ot::Ip6Address`]es and a port.
50fn process_addresses_from_socket_addresses<
51    T: IntoIterator<Item = fidl_fuchsia_net::SocketAddress>,
52>(
53    addresses: T,
54) -> (Vec<ot::Ip6Address>, Option<u16>) {
55    let mut ret_port: Option<u16> = None;
56    let mut addresses =
57        addresses
58            .into_iter()
59            .flat_map(|x| {
60                if let fidl_fuchsia_net::SocketAddress::Ipv6(
61                    fidl_fuchsia_net::Ipv6SocketAddress { address, port, .. },
62                ) = x
63                {
64                    let addr = ot::Ip6Address::from(address.addr);
65                    if ret_port.is_none() {
66                        ret_port = Some(port);
67                    } else if ret_port != Some(port) {
68                        warn!(
69                            tag = "trel";
70                            "mDNS service has multiple ports for the same service, {:?} != {:?}",
71                            ret_port.unwrap(),
72                            port
73                        );
74                    }
75                    // Require link-local. Thread requires TREL peers to advertise link-local via mDNS.
76                    if ipv6addr_is_unicast_link_local(&addr) {
77                        return Some(addr);
78                    }
79                }
80                None
81            })
82            .collect::<Vec<_>>();
83    addresses.sort();
84    (addresses, ret_port)
85}
86
87/// Returns `true` if the address is a unicast address with link-local scope.
88///
89/// The official equivalent of this method is [`std::net::Ipv6Addr::is_unicast_link_local()`],
90/// however that method is [still experimental](https://github.com/rust-lang/rust/issues/27709).
91fn ipv6addr_is_unicast_link_local(addr: &std::net::Ipv6Addr) -> bool {
92    (addr.segments()[0] & 0xffc0) == 0xfe80
93}
94
95// Splits the TXT record into individual values.
96fn split_txt(txt: &[u8]) -> Vec<Vec<u8>> {
97    info!(tag = "trel"; "trel:split_txt: Splitting TXT record: {:?}", hex::encode(txt));
98    let txt =
99        ot::DnsTxtEntryIterator::try_new(txt).expect("can't parse TXT records from OpenThread");
100    txt.map(|x| x.expect("can't parse TXT records from OpenThread").to_vec()).collect::<Vec<_>>()
101}
102
103impl TrelInstance {
104    fn new(instance_name: String) -> Result<TrelInstance, anyhow::Error> {
105        let (client, server) = create_endpoints::<ServiceSubscriptionListenerMarker>();
106
107        let subscriber =
108            fuchsia_component::client::connect_to_protocol::<ServiceSubscriber2Marker>().unwrap();
109
110        subscriber
111            .subscribe_to_service(
112                ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
113                &ServiceSubscriptionOptions { exclude_local: Some(true), ..Default::default() },
114                client,
115            )
116            .context("Unable to subscribe to TREL services")?;
117
118        Ok(TrelInstance {
119            socket: fasync::net::UdpSocket::bind(&SocketAddr::V6(SocketAddrV6::new(
120                Ipv6Addr::UNSPECIFIED,
121                0,
122                0,
123                0,
124            )))
125            .context("Unable to open TREL UDP socket")?,
126            publication_responder: None,
127            instance_name,
128            peer_instance_sockaddr_map: HashMap::default(),
129            subscriber,
130            subscriber_request_stream: server.into_stream(),
131            counters: RefCell::new(TrelCounters::default()),
132        })
133    }
134
135    fn port(&self) -> u16 {
136        self.socket.local_addr().unwrap().port()
137    }
138
139    fn register_service(&mut self, port: u16, txt: &[u8]) {
140        let txt = split_txt(txt);
141
142        let (client, server) = create_endpoints::<ServiceInstancePublicationResponder_Marker>();
143
144        let publisher =
145            fuchsia_component::client::connect_to_protocol::<ServiceInstancePublisherMarker>()
146                .unwrap();
147
148        let publish_init_future = publisher
149            .publish_service_instance(
150                ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
151                self.instance_name.as_str(),
152                &ServiceInstancePublicationOptions::default(),
153                client,
154            )
155            .map(|x| -> Result<(), anyhow::Error> {
156                match x {
157                    Ok(Ok(x)) => Ok(x),
158                    Ok(Err(err)) => Err(anyhow::format_err!("{:?}", err)),
159                    Err(zx_err) => Err(zx_err.into()),
160                }
161            });
162
163        let publish_responder_future = server.into_stream().map_err(Into::into).try_for_each(
164            move |ServiceInstancePublicationResponder_Request::OnPublication {
165                      responder, ..
166                  }| {
167                let txt = txt.clone();
168                let _publisher = publisher.clone();
169                async move {
170                    responder
171                        .send(Ok(&ServiceInstancePublication {
172                            port: Some(port),
173                            text: Some(txt),
174                            ..Default::default()
175                        }))
176                        .map_err(Into::into)
177                }
178            },
179        );
180
181        let future =
182            futures::future::try_join(publish_init_future, publish_responder_future).map_ok(|_| ());
183
184        self.publication_responder = Some(fuchsia_async::Task::spawn(future));
185    }
186
187    pub fn handle_service_subscriber_request(
188        &mut self,
189        ot_instance: &ot::Instance,
190        service_subscriber_request: ServiceSubscriptionListenerRequest,
191    ) -> Result<(), anyhow::Error> {
192        match service_subscriber_request {
193            // A DNS-SD IPv6 service instance has been discovered.
194            ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
195                instance:
196                    ServiceInstance {
197                        instance: Some(instance_name),
198                        addresses: Some(addresses),
199                        text_strings,
200                        ..
201                    },
202                responder,
203            } => {
204                let txt = flatten_txt(text_strings);
205
206                let (addresses, port) = process_addresses_from_socket_addresses(addresses);
207
208                info!(
209                    tag = "trel";
210                    "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
211                );
212
213                // Pick the smallest link-local to be robust to reorderings.
214                if let Some(address) = addresses.first() {
215                    let sockaddr = ot::SockAddr::new(*address, port.unwrap());
216
217                    self.peer_instance_sockaddr_map.insert(instance_name, sockaddr);
218
219                    let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
220                    info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Adding {:?}", info);
221                    ot_instance.plat_trel_handle_discovered_peer_info(&info);
222                } else {
223                    warn!(
224                        tag = "trel";
225                        "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
226                    );
227                }
228
229                responder.send().context("Unable to respond to OnInstanceDiscovered")?;
230            }
231
232            // A DNS-SD IPv6 service instance has changed.
233            ServiceSubscriptionListenerRequest::OnInstanceChanged {
234                instance:
235                    ServiceInstance {
236                        instance: Some(instance_name),
237                        addresses: Some(addresses),
238                        text_strings,
239                        ..
240                    },
241                responder,
242            } => {
243                let txt = flatten_txt(text_strings);
244                let (addresses, port) = process_addresses_from_socket_addresses(addresses);
245
246                info!(
247                    tag = "trel";
248                    "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
249                );
250
251                // Pick the smallest link-local to be robust to reorderings.
252                if let Some(address) = addresses.first() {
253                    let sockaddr = ot::SockAddr::new(*address, port.unwrap());
254
255                    if let Some(old_sockaddr) =
256                        self.peer_instance_sockaddr_map.insert(instance_name, sockaddr)
257                    {
258                        if old_sockaddr != sockaddr {
259                            // Remove old sockaddr with the same instance name
260                            let info_old = ot::PlatTrelPeerInfo::new(true, &[], old_sockaddr);
261                            info!(
262                                tag = "trel";
263                                "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info_old
264                            );
265                            ot_instance.plat_trel_handle_discovered_peer_info(&info_old);
266                        }
267
268                        let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
269                        info!(
270                            tag = "trel";
271                            "otPlatTrelHandleDiscoveredPeerInfo: Updating {:?}", info
272                        );
273                        ot_instance.plat_trel_handle_discovered_peer_info(&info);
274                    }
275                } else {
276                    warn!(
277                        tag = "trel";
278                        "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
279                    );
280                }
281
282                responder.send().context("Unable to respond to OnInstanceChanged")?;
283            }
284
285            // A DNS-SD IPv6 service instance has been lost.
286            ServiceSubscriptionListenerRequest::OnInstanceLost { instance, responder, .. } => {
287                info!(
288                    tag = "trel";
289                    "ServiceSubscriptionListenerRequest::OnInstanceLost [PII]({instance:?})"
290                );
291                if let Some(sockaddr) = self.peer_instance_sockaddr_map.remove(&instance) {
292                    let info = ot::PlatTrelPeerInfo::new(true, &[], sockaddr);
293                    info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info);
294                    ot_instance.plat_trel_handle_discovered_peer_info(&info);
295                }
296
297                responder.send().context("Unable to respond to OnInstanceLost")?;
298            }
299
300            ServiceSubscriptionListenerRequest::OnInstanceChanged { instance, responder } => {
301                warn!(
302                    tag = "trel";
303                    "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance:?})"
304                );
305                // Skip changes without an IPv6 address.
306                responder.send().context("Unable to respond to OnInstanceChanged")?;
307            }
308
309            ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
310                instance,
311                responder,
312                ..
313            } => {
314                warn!(
315                    tag = "trel";
316                    "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance:?})"
317                );
318                // Skip discoveries without an IPv6 address.
319                responder.send().context("Unable to respond to OnInstanceDiscovered")?;
320            }
321
322            ServiceSubscriptionListenerRequest::OnQuery { resource_type, responder, .. } => {
323                info!(
324                    tag = "trel";
325                    "ServiceSubscriptionListenerRequest::OnQuery: {resource_type:?}"
326                );
327
328                // We don't care about queries.
329                responder.send().context("Unable to respond to OnQuery")?;
330            }
331        }
332        Ok(())
333    }
334
335    pub fn get_trel_counters(&self) -> *const otPlatTrelCounters {
336        self.counters.borrow().as_ot_ptr()
337    }
338
339    pub fn reset_trel_counters(&self) {
340        self.counters.borrow_mut().reset_counters()
341    }
342
343    /// Async entrypoint for I/O.
344    ///
345    /// This is explicitly not `mut` so that `on_trel_send` can be called reentrantly from here.
346    pub fn poll_io(&self, instance: &ot::Instance, cx: &mut Context<'_>) {
347        let mut buffer = [0u8; crate::UDP_PACKET_MAX_LENGTH];
348        loop {
349            match self.socket.async_recv_from(&mut buffer, cx) {
350                Poll::Ready(Ok((len, sockaddr))) => {
351                    let sockaddr: ot::SockAddr = sockaddr.as_socket_ipv6().unwrap().into();
352                    debug!(tag = "trel"; "Incoming {} byte TREL packet from {:?}", len, sockaddr);
353                    {
354                        let mut counters = self.counters.borrow_mut();
355                        counters.update_rx_bytes(len.try_into().unwrap());
356                        counters.update_rx_packets(1);
357                    }
358                    instance.plat_trel_handle_received(&buffer[..len], &sockaddr)
359                }
360                Poll::Ready(Err(err)) => {
361                    warn!(tag = "trel"; "Error receiving packet: {:?}", err);
362                    break;
363                }
364                _ => {
365                    break;
366                }
367            }
368        }
369    }
370
371    /// Async entrypoint for non-I/O
372    pub fn poll(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
373        if let Some(task) = &mut self.publication_responder {
374            if let Poll::Ready(x) = task.poll_unpin(cx) {
375                warn!(
376                    tag = "trel";
377                    "TrelInstance: publication_responder finished unexpectedly: {:?}", x
378                );
379                self.publication_responder = None;
380            }
381        }
382
383        if !self.subscriber_request_stream.is_terminated() {
384            while let Poll::Ready(Some(event)) = self.subscriber_request_stream.poll_next_unpin(cx)
385            {
386                match event {
387                    Ok(event) => {
388                        if let Err(err) = self.handle_service_subscriber_request(instance, event) {
389                            error!(
390                                tag = "trel";
391                                "Error handling service subscriber request: {err:?}"
392                            );
393                        }
394                    }
395                    Err(err) => {
396                        error!(tag = "trel"; "subscriber_request_stream FIDL error: {:?}", err);
397                    }
398                }
399            }
400        }
401    }
402}
403
404impl PlatformBacking {
405    fn on_trel_enable(&self, instance: &ot::Instance) -> Result<u16, anyhow::Error> {
406        let mut trel = self.trel.borrow_mut();
407        if let Some(trel) = trel.as_ref() {
408            Ok(trel.port())
409        } else {
410            let instance_name = hex::encode(instance.get_extended_address().as_slice());
411            let trel_instance = TrelInstance::new(instance_name)?;
412            let port = trel_instance.port();
413            trel.replace(trel_instance);
414            Ok(port)
415        }
416    }
417
418    fn on_trel_disable(&self, _instance: &ot::Instance) {
419        self.trel.replace(None);
420    }
421
422    fn on_trel_register_service(&self, _instance: &ot::Instance, port: u16, txt: &[u8]) {
423        let mut trel = self.trel.borrow_mut();
424        if let Some(trel) = trel.as_mut() {
425            info!(
426                tag = "trel";
427                "otPlatTrelRegisterService: port:{} txt:{:?}",
428                port,
429                txt.to_escaped_string()
430            );
431            trel.register_service(port, txt);
432        } else {
433            debug!(tag = "trel"; "otPlatTrelRegisterService: TREL is disabled, cannot register.");
434        }
435    }
436
437    fn on_trel_send(&self, _instance: &ot::Instance, payload: &[u8], sockaddr: &ot::SockAddr) {
438        let trel = self.trel.borrow();
439        if let Some(trel) = trel.as_ref() {
440            let mut counters = trel.counters.borrow_mut();
441            debug!(tag = "trel"; "otPlatTrelSend: {:?} -> {}", sockaddr, hex::encode(payload));
442            match trel.socket.send_to(payload, (*sockaddr).into()).now_or_never() {
443                Some(Ok(_)) => {
444                    counters.update_tx_bytes(payload.len().try_into().unwrap());
445                    counters.update_tx_packets(1);
446                }
447                Some(Err(err)) => {
448                    counters.update_tx_failure(1);
449                    warn!(tag = "trel"; "otPlatTrelSend: send_to failed: {:?}", err);
450                }
451                None => {
452                    warn!(tag = "trel"; "otPlatTrelSend: send_to didn't finish immediately");
453                }
454            }
455        } else {
456            debug!(tag = "trel"; "otPlatTrelSend: TREL is disabled, cannot send.");
457        }
458    }
459}
460
461#[no_mangle]
462unsafe extern "C" fn otPlatTrelEnable(instance: *mut otInstance, port_ptr: *mut u16) {
463    match PlatformBacking::on_trel_enable(
464        // SAFETY: Must only be called from OpenThread thread,
465        PlatformBacking::as_ref(),
466        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
467        //         which is guaranteed by the caller.
468        ot::Instance::ref_from_ot_ptr(instance).unwrap(),
469    ) {
470        Ok(port) => {
471            info!(tag = "trel"; "otPlatTrelEnable: Ready on port {}", port);
472            *port_ptr = port;
473        }
474        Err(err) => {
475            warn!(tag = "trel"; "otPlatTrelEnable: Unable to start TREL: {:?}", err);
476        }
477    }
478}
479
480#[no_mangle]
481unsafe extern "C" fn otPlatTrelDisable(instance: *mut otInstance) {
482    PlatformBacking::on_trel_disable(
483        // SAFETY: Must only be called from OpenThread thread,
484        PlatformBacking::as_ref(),
485        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
486        //         which is guaranteed by the caller.
487        ot::Instance::ref_from_ot_ptr(instance).unwrap(),
488    );
489    info!(tag = "trel"; "otPlatTrelDisable: Closed.");
490}
491
492#[no_mangle]
493unsafe extern "C" fn otPlatTrelRegisterService(
494    instance: *mut otInstance,
495    port: u16,
496    txt_data: *const u8,
497    txt_len: u8,
498) {
499    PlatformBacking::on_trel_register_service(
500        // SAFETY: Must only be called from OpenThread thread,
501        PlatformBacking::as_ref(),
502        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
503        //         which is guaranteed by the caller.
504        ot::Instance::ref_from_ot_ptr(instance).unwrap(),
505        port,
506        // SAFETY: Caller guarantees either txt_data is valid or txt_len is zero.
507        std::slice::from_raw_parts(txt_data, txt_len.into()),
508    );
509}
510
511#[no_mangle]
512unsafe extern "C" fn otPlatTrelSend(
513    instance: *mut otInstance,
514    payload_data: *const u8,
515    payload_len: u16,
516    dest: *const otSockAddr,
517) {
518    PlatformBacking::on_trel_send(
519        // SAFETY: Must only be called from OpenThread thread,
520        PlatformBacking::as_ref(),
521        // SAFETY: `instance` must be a pointer to a valid `otInstance`,
522        //         which is guaranteed by the caller.
523        ot::Instance::ref_from_ot_ptr(instance).unwrap(),
524        // SAFETY: Caller guarantees either payload_data is valid or payload_len is zero.
525        std::slice::from_raw_parts(payload_data, payload_len.into()),
526        // SAFETY: Caller guarantees dest points to a valid otSockAddr.
527        ot::SockAddr::ref_from_ot_ptr(dest).unwrap(),
528    );
529}
530
531#[no_mangle]
532unsafe extern "C" fn otPlatTrelGetCounters(
533    _instance: *mut otInstance,
534) -> *const otPlatTrelCounters {
535    if let Some(trel) = PlatformBacking::as_ref().trel.borrow().as_ref() {
536        trel.get_trel_counters()
537    } else {
538        std::ptr::null()
539    }
540}
541
542#[no_mangle]
543unsafe extern "C" fn otPlatTrelNotifyPeerSocketAddressDifference(
544    _instance: *mut otsys::otInstance,
545    peer_sock_addr: &ot::SockAddr,
546    rx_sock_addr: &ot::SockAddr,
547) {
548    info!(tag = "trel"; "otPlatTrelNotifyPeerSocketAddressDifference: Not Implemented. peer_sock_addr {}, rx_sock_addr {}", peer_sock_addr, rx_sock_addr);
549}
550
551#[no_mangle]
552unsafe extern "C" fn otPlatTrelResetCounters(_instance: *mut otInstance) {
553    if let Some(trel) = PlatformBacking::as_ref().trel.borrow().as_ref() {
554        trel.reset_trel_counters()
555    }
556}
557
558#[cfg(test)]
559mod test {
560    use super::*;
561
562    #[test]
563    fn test_split_txt() {
564        assert_eq!(
565            split_txt(b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb"),
566            vec![b"xa=a7bfc4981f4e4d22".to_vec(), b"xp=029c6f4dbae059cb".to_vec()]
567        );
568    }
569
570    #[test]
571    fn test_flatten_txt() {
572        assert_eq!(flatten_txt(None), vec![]);
573        assert_eq!(flatten_txt(Some(vec![])), vec![]);
574        assert_eq!(
575            flatten_txt(Some(vec![b"xa=a7bfc4981f4e4d22".to_vec()])),
576            b"\x13xa=a7bfc4981f4e4d22".to_vec()
577        );
578        assert_eq!(
579            flatten_txt(Some(vec![
580                b"xa=a7bfc4981f4e4d22".to_vec(),
581                b"xp=029c6f4dbae059cb".to_vec()
582            ])),
583            b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb".to_vec()
584        );
585    }
586}