1use super::*;
6use crate::to_escaped_string::*;
7use anyhow::Context as _;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_net_mdns::*;
10use fuchsia_async::Task;
11use futures::stream::FusedStream;
12use openthread_sys::*;
13use ot::{PlatTrel as _, TrelCounters};
14use std::collections::HashMap;
15use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
16use std::task::{Context, Poll};
17
18pub(crate) struct TrelInstance {
19 socket: fasync::net::UdpSocket,
20 publication_responder: Option<Task<Result<(), anyhow::Error>>>,
21 instance_name: String,
22 peer_instance_sockaddr_map: HashMap<String, ot::SockAddr>,
23
24 #[allow(dead_code)] subscriber: ServiceSubscriber2Proxy,
26
27 subscriber_request_stream: ServiceSubscriptionListenerRequestStream,
28
29 counters: RefCell<TrelCounters>,
30}
31
32fn flatten_txt(txt: Option<Vec<Vec<u8>>>) -> Vec<u8> {
34 let mut ret = vec![];
35
36 for mut txt in txt.iter().flat_map(|x| x.iter()).map(Vec::as_slice) {
37 if txt.len() > u8::MAX as usize {
38 txt = &txt[0..(u8::MAX as usize) + 1];
40 }
41 ret.push(u8::try_from(txt.len()).unwrap());
42 ret.extend_from_slice(txt);
43 }
44
45 ret
46}
47
48fn process_addresses_from_socket_addresses<
51 T: IntoIterator<Item = fidl_fuchsia_net::SocketAddress>,
52>(
53 addresses: T,
54) -> (Vec<ot::Ip6Address>, Option<u16>) {
55 let mut ret_port: Option<u16> = None;
56 let mut addresses =
57 addresses
58 .into_iter()
59 .flat_map(|x| {
60 if let fidl_fuchsia_net::SocketAddress::Ipv6(
61 fidl_fuchsia_net::Ipv6SocketAddress { address, port, .. },
62 ) = x
63 {
64 let addr = ot::Ip6Address::from(address.addr);
65 if ret_port.is_none() {
66 ret_port = Some(port);
67 } else if ret_port != Some(port) {
68 warn!(
69 tag = "trel";
70 "mDNS service has multiple ports for the same service, {:?} != {:?}",
71 ret_port.unwrap(),
72 port
73 );
74 }
75 if ipv6addr_is_unicast_link_local(&addr) {
77 return Some(addr);
78 }
79 }
80 None
81 })
82 .collect::<Vec<_>>();
83 addresses.sort();
84 (addresses, ret_port)
85}
86
87fn ipv6addr_is_unicast_link_local(addr: &std::net::Ipv6Addr) -> bool {
92 (addr.segments()[0] & 0xffc0) == 0xfe80
93}
94
95fn split_txt(txt: &[u8]) -> Vec<Vec<u8>> {
97 info!(tag = "trel"; "trel:split_txt: Splitting TXT record: {:?}", hex::encode(txt));
98 let txt =
99 ot::DnsTxtEntryIterator::try_new(txt).expect("can't parse TXT records from OpenThread");
100 txt.map(|x| x.expect("can't parse TXT records from OpenThread").to_vec()).collect::<Vec<_>>()
101}
102
103impl TrelInstance {
104 fn new(instance_name: String) -> Result<TrelInstance, anyhow::Error> {
105 let (client, server) = create_endpoints::<ServiceSubscriptionListenerMarker>();
106
107 let subscriber =
108 fuchsia_component::client::connect_to_protocol::<ServiceSubscriber2Marker>().unwrap();
109
110 subscriber
111 .subscribe_to_service(
112 ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
113 &ServiceSubscriptionOptions { exclude_local: Some(true), ..Default::default() },
114 client,
115 )
116 .context("Unable to subscribe to TREL services")?;
117
118 Ok(TrelInstance {
119 socket: fasync::net::UdpSocket::bind(&SocketAddr::V6(SocketAddrV6::new(
120 Ipv6Addr::UNSPECIFIED,
121 0,
122 0,
123 0,
124 )))
125 .context("Unable to open TREL UDP socket")?,
126 publication_responder: None,
127 instance_name,
128 peer_instance_sockaddr_map: HashMap::default(),
129 subscriber,
130 subscriber_request_stream: server.into_stream(),
131 counters: RefCell::new(TrelCounters::default()),
132 })
133 }
134
135 fn port(&self) -> u16 {
136 self.socket.local_addr().unwrap().port()
137 }
138
139 fn register_service(&mut self, port: u16, txt: &[u8]) {
140 let txt = split_txt(txt);
141
142 let (client, server) = create_endpoints::<ServiceInstancePublicationResponder_Marker>();
143
144 let publisher =
145 fuchsia_component::client::connect_to_protocol::<ServiceInstancePublisherMarker>()
146 .unwrap();
147
148 let publish_init_future = publisher
149 .publish_service_instance(
150 ot::TREL_DNSSD_SERVICE_NAME_WITH_DOT,
151 self.instance_name.as_str(),
152 &ServiceInstancePublicationOptions::default(),
153 client,
154 )
155 .map(|x| -> Result<(), anyhow::Error> {
156 match x {
157 Ok(Ok(x)) => Ok(x),
158 Ok(Err(err)) => Err(anyhow::format_err!("{:?}", err)),
159 Err(zx_err) => Err(zx_err.into()),
160 }
161 });
162
163 let publish_responder_future = server.into_stream().map_err(Into::into).try_for_each(
164 move |ServiceInstancePublicationResponder_Request::OnPublication {
165 responder, ..
166 }| {
167 let txt = txt.clone();
168 let _publisher = publisher.clone();
169 async move {
170 responder
171 .send(Ok(&ServiceInstancePublication {
172 port: Some(port),
173 text: Some(txt),
174 ..Default::default()
175 }))
176 .map_err(Into::into)
177 }
178 },
179 );
180
181 let future =
182 futures::future::try_join(publish_init_future, publish_responder_future).map_ok(|_| ());
183
184 self.publication_responder = Some(fuchsia_async::Task::spawn(future));
185 }
186
187 pub fn handle_service_subscriber_request(
188 &mut self,
189 ot_instance: &ot::Instance,
190 service_subscriber_request: ServiceSubscriptionListenerRequest,
191 ) -> Result<(), anyhow::Error> {
192 match service_subscriber_request {
193 ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
195 instance:
196 ServiceInstance {
197 instance: Some(instance_name),
198 addresses: Some(addresses),
199 text_strings,
200 ..
201 },
202 responder,
203 } => {
204 let txt = flatten_txt(text_strings);
205
206 let (addresses, port) = process_addresses_from_socket_addresses(addresses);
207
208 info!(
209 tag = "trel";
210 "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
211 );
212
213 if let Some(address) = addresses.first() {
215 let sockaddr = ot::SockAddr::new(*address, port.unwrap());
216
217 self.peer_instance_sockaddr_map.insert(instance_name, sockaddr);
218
219 let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
220 info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Adding {:?}", info);
221 ot_instance.plat_trel_handle_discovered_peer_info(&info);
222 } else {
223 warn!(
224 tag = "trel";
225 "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
226 );
227 }
228
229 responder.send().context("Unable to respond to OnInstanceDiscovered")?;
230 }
231
232 ServiceSubscriptionListenerRequest::OnInstanceChanged {
234 instance:
235 ServiceInstance {
236 instance: Some(instance_name),
237 addresses: Some(addresses),
238 text_strings,
239 ..
240 },
241 responder,
242 } => {
243 let txt = flatten_txt(text_strings);
244 let (addresses, port) = process_addresses_from_socket_addresses(addresses);
245
246 info!(
247 tag = "trel";
248 "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance_name:?}) port:{port:?} addresses:{addresses:?}"
249 );
250
251 if let Some(address) = addresses.first() {
253 let sockaddr = ot::SockAddr::new(*address, port.unwrap());
254
255 if let Some(old_sockaddr) =
256 self.peer_instance_sockaddr_map.insert(instance_name, sockaddr)
257 {
258 if old_sockaddr != sockaddr {
259 let info_old = ot::PlatTrelPeerInfo::new(true, &[], old_sockaddr);
261 info!(
262 tag = "trel";
263 "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info_old
264 );
265 ot_instance.plat_trel_handle_discovered_peer_info(&info_old);
266 }
267
268 let info = ot::PlatTrelPeerInfo::new(false, &txt, sockaddr);
269 info!(
270 tag = "trel";
271 "otPlatTrelHandleDiscoveredPeerInfo: Updating {:?}", info
272 );
273 ot_instance.plat_trel_handle_discovered_peer_info(&info);
274 }
275 } else {
276 warn!(
277 tag = "trel";
278 "Peer {instance_name:?} does not have any IPv6 link-local address, ignored"
279 );
280 }
281
282 responder.send().context("Unable to respond to OnInstanceChanged")?;
283 }
284
285 ServiceSubscriptionListenerRequest::OnInstanceLost { instance, responder, .. } => {
287 info!(
288 tag = "trel";
289 "ServiceSubscriptionListenerRequest::OnInstanceLost [PII]({instance:?})"
290 );
291 if let Some(sockaddr) = self.peer_instance_sockaddr_map.remove(&instance) {
292 let info = ot::PlatTrelPeerInfo::new(true, &[], sockaddr);
293 info!(tag = "trel"; "otPlatTrelHandleDiscoveredPeerInfo: Removing {:?}", info);
294 ot_instance.plat_trel_handle_discovered_peer_info(&info);
295 }
296
297 responder.send().context("Unable to respond to OnInstanceLost")?;
298 }
299
300 ServiceSubscriptionListenerRequest::OnInstanceChanged { instance, responder } => {
301 warn!(
302 tag = "trel";
303 "ServiceSubscriptionListenerRequest::OnInstanceChanged: [PII]({instance:?})"
304 );
305 responder.send().context("Unable to respond to OnInstanceChanged")?;
307 }
308
309 ServiceSubscriptionListenerRequest::OnInstanceDiscovered {
310 instance,
311 responder,
312 ..
313 } => {
314 warn!(
315 tag = "trel";
316 "ServiceSubscriptionListenerRequest::OnInstanceDiscovered: [PII]({instance:?})"
317 );
318 responder.send().context("Unable to respond to OnInstanceDiscovered")?;
320 }
321
322 ServiceSubscriptionListenerRequest::OnQuery { resource_type, responder, .. } => {
323 info!(
324 tag = "trel";
325 "ServiceSubscriptionListenerRequest::OnQuery: {resource_type:?}"
326 );
327
328 responder.send().context("Unable to respond to OnQuery")?;
330 }
331 }
332 Ok(())
333 }
334
335 pub fn get_trel_counters(&self) -> *const otPlatTrelCounters {
336 self.counters.borrow().as_ot_ptr()
337 }
338
339 pub fn reset_trel_counters(&self) {
340 self.counters.borrow_mut().reset_counters()
341 }
342
343 pub fn poll_io(&self, instance: &ot::Instance, cx: &mut Context<'_>) {
347 let mut buffer = [0u8; crate::UDP_PACKET_MAX_LENGTH];
348 loop {
349 match self.socket.async_recv_from(&mut buffer, cx) {
350 Poll::Ready(Ok((len, sockaddr))) => {
351 let sockaddr: ot::SockAddr = sockaddr.as_socket_ipv6().unwrap().into();
352 debug!(tag = "trel"; "Incoming {} byte TREL packet from {:?}", len, sockaddr);
353 {
354 let mut counters = self.counters.borrow_mut();
355 counters.update_rx_bytes(len.try_into().unwrap());
356 counters.update_rx_packets(1);
357 }
358 instance.plat_trel_handle_received(&buffer[..len], &sockaddr)
359 }
360 Poll::Ready(Err(err)) => {
361 warn!(tag = "trel"; "Error receiving packet: {:?}", err);
362 break;
363 }
364 _ => {
365 break;
366 }
367 }
368 }
369 }
370
371 pub fn poll(&mut self, instance: &ot::Instance, cx: &mut Context<'_>) {
373 if let Some(task) = &mut self.publication_responder {
374 if let Poll::Ready(x) = task.poll_unpin(cx) {
375 warn!(
376 tag = "trel";
377 "TrelInstance: publication_responder finished unexpectedly: {:?}", x
378 );
379 self.publication_responder = None;
380 }
381 }
382
383 if !self.subscriber_request_stream.is_terminated() {
384 while let Poll::Ready(Some(event)) = self.subscriber_request_stream.poll_next_unpin(cx)
385 {
386 match event {
387 Ok(event) => {
388 if let Err(err) = self.handle_service_subscriber_request(instance, event) {
389 error!(
390 tag = "trel";
391 "Error handling service subscriber request: {err:?}"
392 );
393 }
394 }
395 Err(err) => {
396 error!(tag = "trel"; "subscriber_request_stream FIDL error: {:?}", err);
397 }
398 }
399 }
400 }
401 }
402}
403
404impl PlatformBacking {
405 fn on_trel_enable(&self, instance: &ot::Instance) -> Result<u16, anyhow::Error> {
406 let mut trel = self.trel.borrow_mut();
407 if let Some(trel) = trel.as_ref() {
408 Ok(trel.port())
409 } else {
410 let instance_name = hex::encode(instance.get_extended_address().as_slice());
411 let trel_instance = TrelInstance::new(instance_name)?;
412 let port = trel_instance.port();
413 trel.replace(trel_instance);
414 Ok(port)
415 }
416 }
417
418 fn on_trel_disable(&self, _instance: &ot::Instance) {
419 self.trel.replace(None);
420 }
421
422 fn on_trel_register_service(&self, _instance: &ot::Instance, port: u16, txt: &[u8]) {
423 let mut trel = self.trel.borrow_mut();
424 if let Some(trel) = trel.as_mut() {
425 info!(
426 tag = "trel";
427 "otPlatTrelRegisterService: port:{} txt:{:?}",
428 port,
429 txt.to_escaped_string()
430 );
431 trel.register_service(port, txt);
432 } else {
433 debug!(tag = "trel"; "otPlatTrelRegisterService: TREL is disabled, cannot register.");
434 }
435 }
436
437 fn on_trel_send(&self, _instance: &ot::Instance, payload: &[u8], sockaddr: &ot::SockAddr) {
438 let trel = self.trel.borrow();
439 if let Some(trel) = trel.as_ref() {
440 let mut counters = trel.counters.borrow_mut();
441 debug!(tag = "trel"; "otPlatTrelSend: {:?} -> {}", sockaddr, hex::encode(payload));
442 match trel.socket.send_to(payload, (*sockaddr).into()).now_or_never() {
443 Some(Ok(_)) => {
444 counters.update_tx_bytes(payload.len().try_into().unwrap());
445 counters.update_tx_packets(1);
446 }
447 Some(Err(err)) => {
448 counters.update_tx_failure(1);
449 warn!(tag = "trel"; "otPlatTrelSend: send_to failed: {:?}", err);
450 }
451 None => {
452 warn!(tag = "trel"; "otPlatTrelSend: send_to didn't finish immediately");
453 }
454 }
455 } else {
456 debug!(tag = "trel"; "otPlatTrelSend: TREL is disabled, cannot send.");
457 }
458 }
459}
460
461#[no_mangle]
462unsafe extern "C" fn otPlatTrelEnable(instance: *mut otInstance, port_ptr: *mut u16) {
463 match PlatformBacking::on_trel_enable(
464 PlatformBacking::as_ref(),
466 ot::Instance::ref_from_ot_ptr(instance).unwrap(),
469 ) {
470 Ok(port) => {
471 info!(tag = "trel"; "otPlatTrelEnable: Ready on port {}", port);
472 *port_ptr = port;
473 }
474 Err(err) => {
475 warn!(tag = "trel"; "otPlatTrelEnable: Unable to start TREL: {:?}", err);
476 }
477 }
478}
479
480#[no_mangle]
481unsafe extern "C" fn otPlatTrelDisable(instance: *mut otInstance) {
482 PlatformBacking::on_trel_disable(
483 PlatformBacking::as_ref(),
485 ot::Instance::ref_from_ot_ptr(instance).unwrap(),
488 );
489 info!(tag = "trel"; "otPlatTrelDisable: Closed.");
490}
491
492#[no_mangle]
493unsafe extern "C" fn otPlatTrelRegisterService(
494 instance: *mut otInstance,
495 port: u16,
496 txt_data: *const u8,
497 txt_len: u8,
498) {
499 PlatformBacking::on_trel_register_service(
500 PlatformBacking::as_ref(),
502 ot::Instance::ref_from_ot_ptr(instance).unwrap(),
505 port,
506 std::slice::from_raw_parts(txt_data, txt_len.into()),
508 );
509}
510
511#[no_mangle]
512unsafe extern "C" fn otPlatTrelSend(
513 instance: *mut otInstance,
514 payload_data: *const u8,
515 payload_len: u16,
516 dest: *const otSockAddr,
517) {
518 PlatformBacking::on_trel_send(
519 PlatformBacking::as_ref(),
521 ot::Instance::ref_from_ot_ptr(instance).unwrap(),
524 std::slice::from_raw_parts(payload_data, payload_len.into()),
526 ot::SockAddr::ref_from_ot_ptr(dest).unwrap(),
528 );
529}
530
531#[no_mangle]
532unsafe extern "C" fn otPlatTrelGetCounters(
533 _instance: *mut otInstance,
534) -> *const otPlatTrelCounters {
535 if let Some(trel) = PlatformBacking::as_ref().trel.borrow().as_ref() {
536 trel.get_trel_counters()
537 } else {
538 std::ptr::null()
539 }
540}
541
542#[no_mangle]
543unsafe extern "C" fn otPlatTrelNotifyPeerSocketAddressDifference(
544 _instance: *mut otsys::otInstance,
545 peer_sock_addr: &ot::SockAddr,
546 rx_sock_addr: &ot::SockAddr,
547) {
548 info!(tag = "trel"; "otPlatTrelNotifyPeerSocketAddressDifference: Not Implemented. peer_sock_addr {}, rx_sock_addr {}", peer_sock_addr, rx_sock_addr);
549}
550
551#[no_mangle]
552unsafe extern "C" fn otPlatTrelResetCounters(_instance: *mut otInstance) {
553 if let Some(trel) = PlatformBacking::as_ref().trel.borrow().as_ref() {
554 trel.reset_trel_counters()
555 }
556}
557
558#[cfg(test)]
559mod test {
560 use super::*;
561
562 #[test]
563 fn test_split_txt() {
564 assert_eq!(
565 split_txt(b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb"),
566 vec![b"xa=a7bfc4981f4e4d22".to_vec(), b"xp=029c6f4dbae059cb".to_vec()]
567 );
568 }
569
570 #[test]
571 fn test_flatten_txt() {
572 assert_eq!(flatten_txt(None), vec![]);
573 assert_eq!(flatten_txt(Some(vec![])), vec![]);
574 assert_eq!(
575 flatten_txt(Some(vec![b"xa=a7bfc4981f4e4d22".to_vec()])),
576 b"\x13xa=a7bfc4981f4e4d22".to_vec()
577 );
578 assert_eq!(
579 flatten_txt(Some(vec![
580 b"xa=a7bfc4981f4e4d22".to_vec(),
581 b"xp=029c6f4dbae059cb".to_vec()
582 ])),
583 b"\x13xa=a7bfc4981f4e4d22\x13xp=029c6f4dbae059cb".to_vec()
584 );
585 }
586}