1use std::collections::HashMap;
6use std::pin::Pin;
7
8use {
9 fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name, fidl_fuchsia_net_ndp as fnet_ndp,
10 fidl_fuchsia_net_ndp_ext as fnet_ndp_ext,
11};
12
13use anyhow::Context;
14use async_utils::stream::{Tagged, WithTag as _};
15use dns_server_watcher::{DnsServers, DnsServersUpdateSource};
16use fidl::endpoints::{ControlHandle as _, Responder as _};
17use futures::stream::BoxStream;
18use futures::{Stream, StreamExt};
19use log::{error, info, trace, warn};
20use net_types::{Scope, ScopeableAddress};
21use packet_formats::icmp::ndp as packet_formats_ndp;
22
23const DNS_PORT: u16 = 53;
24
25use crate::network;
26
27pub(super) async fn update_servers(
29 lookup_admin: &fnet_name::LookupAdminProxy,
30 dns_servers: &mut DnsServers,
31 dns_server_watch_responders: &mut DnsServerWatchResponders,
32 networks_service: &mut network::NetpolNetworksService,
33 source: DnsServersUpdateSource,
34 servers: Vec<fnet_name::DnsServer_>,
35) {
36 trace!("updating DNS servers obtained from {:?} to {:?}", source, servers);
37
38 let servers_before = dns_servers.consolidated();
39 dns_servers.set_servers_from_source(source, servers);
40 let servers = dns_servers.consolidated();
41 if servers_before == servers {
42 trace!("Update skipped because dns server list has not changed");
43 return;
44 }
45 trace!("updating LookupAdmin with DNS servers = {:?}", servers);
46
47 match lookup_admin.set_dns_servers(&servers).await {
48 Ok(Ok(())) => {}
49 Ok(Err(e)) => warn!("error setting DNS servers: {:?}", zx::Status::from_raw(e)),
50 Err(e) => warn!("error sending set DNS servers request: {:?}", e),
51 }
52
53 dns_server_watch_responders.send(dns_servers.consolidated_dns_servers());
54
55 networks_service.update(network::PropertyUpdate::default().dns(dns_servers)).await;
56}
57
58pub(super) async fn create_rdnss_stream(
62 watcher_provider: &fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
63 source: DnsServersUpdateSource,
64 interface_id: u64,
65) -> Option<
66 Result<
67 impl Stream<Item = (DnsServersUpdateSource, Result<Vec<fnet_name::DnsServer_>, fidl::Error>)>,
68 fidl::Error,
69 >,
70> {
71 let watcher_result = fnet_ndp_ext::create_watcher_stream(
72 &watcher_provider,
73 &fnet_ndp::RouterAdvertisementOptionWatcherParams {
74 interest_types: Some(vec![
75 packet_formats_ndp::options::NdpOptionType::RecursiveDnsServer.into(),
76 ]),
77 interest_interface_id: Some(interface_id),
78 ..Default::default()
79 },
80 )
81 .await?;
82
83 let watcher = match watcher_result {
86 Ok(res) => res,
87 Err(e) => return Some(Err(e)),
88 };
89
90 Some(Ok(watcher
91 .filter_map(move |entry_res| async move {
92 let entry = match entry_res {
93 Ok(entry) => entry,
94 Err(fnet_ndp_ext::OptionWatchStreamError::Fidl(e)) => {
95 return Some(Err(e));
96 }
97 Err(fnet_ndp_ext::OptionWatchStreamError::Conversion(e)) => {
98 error!("Failed to convert OptionWatchStream item: {e:?}");
101 return None;
102 }
103 };
104 match entry {
105 fnet_ndp_ext::OptionWatchStreamItem::Entry(entry) => {
106 match entry.try_parse_as_rdnss() {
107 fnet_ndp_ext::TryParseAsOptionResult::Parsed(option) => Some(Ok(option
108 .iter_addresses()
109 .into_iter()
110 .map(|addr| fnet_name::DnsServer_ {
111 address: Some(fnet::SocketAddress::Ipv6(fnet::Ipv6SocketAddress {
112 address: fnet::Ipv6Address { addr: addr.ipv6_bytes() },
113 port: DNS_PORT,
114 zone_index: addr
117 .scope()
118 .can_have_zone()
119 .then_some(interface_id)
120 .unwrap_or_default(),
121 })),
122 source: Some(fnet_name::DnsServerSource::Ndp(
123 fnet_name::NdpDnsServerSource {
124 source_interface: Some(interface_id),
125 ..Default::default()
126 },
127 )),
128 ..Default::default()
129 })
130 .collect::<Vec<_>>())),
131 fnet_ndp_ext::TryParseAsOptionResult::OptionTypeMismatch => {
132 error!("Option type provided did not match RDNSS option type");
134 None
135 }
136 fnet_ndp_ext::TryParseAsOptionResult::ParseErr(err) => {
137 warn!("Error while parsing as OptionResult: {err:?}");
139 None
140 }
141 }
142 }
143 fnet_ndp_ext::OptionWatchStreamItem::Dropped(num) => {
144 warn!(
145 "The server dropped ({num}) NDP options \
146 due to the HangingGet falling behind"
147 );
148 None
149 }
150 }
151 })
152 .tagged(source)))
153}
154
155pub(super) async fn add_rdnss_watcher(
156 watcher_provider: &fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
157 interface_id: crate::InterfaceId,
158 watchers: &mut crate::DnsServerWatchers<'_>,
159) -> Result<(), anyhow::Error> {
160 let source = DnsServersUpdateSource::Ndp { interface_id: interface_id.get() };
161
162 let stream = create_rdnss_stream(watcher_provider, source, interface_id.get()).await;
164
165 match stream {
166 Some(result) => {
167 if let Some(o) =
168 watchers.insert(source, result.context("failed to create watcher stream")?.boxed())
169 {
170 let _: Pin<Box<BoxStream<'_, _>>> = o;
171 unreachable!("DNS server watchers must not contain key {:?}", source);
172 }
173 info!("started NDP watcher on host interface (id={interface_id})");
174 }
175 None => {
176 info!(
177 "NDP protocol unavailable: not starting watcher for interface (id={interface_id})"
178 );
179 }
180 }
181 Ok(())
182}
183
184pub(super) async fn remove_rdnss_watcher(
185 lookup_admin: &fnet_name::LookupAdminProxy,
186 dns_servers: &mut DnsServers,
187 dns_server_watch_responders: &mut DnsServerWatchResponders,
188 netpol_networks_service: &mut network::NetpolNetworksService,
189 interface_id: crate::InterfaceId,
190 watchers: &mut crate::DnsServerWatchers<'_>,
191) {
192 let source = DnsServersUpdateSource::Ndp { interface_id: interface_id.get() };
193
194 if let None = watchers.remove(&source) {
195 warn!(
199 "DNS Watcher for key not present; multiple futures stopped NDP \
200 watcher for key {:?}; interface_id={}",
201 source, interface_id
202 );
203 }
204
205 update_servers(
206 lookup_admin,
207 dns_servers,
208 dns_server_watch_responders,
209 netpol_networks_service,
210 source,
211 vec![],
212 )
213 .await
214}
215
216#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub(crate) struct ConnectionId(usize);
218
219#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
220pub(crate) struct UpdateGeneration(usize);
221
222#[derive(Default)]
227pub(crate) struct DnsServerWatchResponders {
228 generation: UpdateGeneration,
231
232 generations: HashMap<ConnectionId, UpdateGeneration>,
234
235 responders: HashMap<ConnectionId, fnet_name::DnsServerWatcherWatchServersResponder>,
237}
238
239impl DnsServerWatchResponders {
240 fn send(&mut self, next_servers: Vec<fnet_name::DnsServer_>) {
241 let responders = std::mem::take(&mut self.responders);
242 self.generation.0 += 1;
243 for (id, responder) in responders {
244 match responder.send(&next_servers) {
245 Ok(()) => {
246 let _: Option<UpdateGeneration> = self.generations.insert(id, self.generation);
247 }
248 Err(e) => warn!("Error responding to DnsServerWatcher request: {e:?}"),
249 }
250 }
251 }
252
253 pub(crate) fn handle_request(
256 &mut self,
257 id: ConnectionId,
258 request: Result<fnet_name::DnsServerWatcherRequest, fidl::Error>,
259 servers: &DnsServers,
260 ) -> Result<(), fidl::Error> {
261 use std::collections::hash_map::Entry;
262 match request {
263 Ok(fnet_name::DnsServerWatcherRequest::WatchServers { responder }) => {
264 match self.responders.entry(id) {
265 Entry::Occupied(_) => {
266 warn!(
267 "Only one call to fuchsia.net.name/DnsServerWatcher.WatchServers \
268 may be active at once"
269 );
270 responder.control_handle().shutdown()
271 }
272 Entry::Vacant(vacant_entry) => {
273 if self.generations.get(&id) < Some(&self.generation) {
276 let _: Option<_> = self.generations.insert(id, self.generation);
277 responder.send(&servers.consolidated_dns_servers())?;
278 } else {
279 let _: &fnet_name::DnsServerWatcherWatchServersResponder =
280 vacant_entry.insert(responder);
281 }
282 }
283 }
284 }
285 Err(e) => {
286 error!("fuchsia.net.name/DnsServerWatcher request error: {:?}", e)
287 }
288 }
289
290 Ok(())
291 }
292}
293
294#[derive(Default)]
297pub(crate) struct DnsServerWatcherRequestStreams {
298 next_id: ConnectionId,
300
301 request_streams:
303 futures::stream::SelectAll<Tagged<ConnectionId, fnet_name::DnsServerWatcherRequestStream>>,
304}
305
306impl DnsServerWatcherRequestStreams {
307 pub fn handle_request_stream(&mut self, req_stream: fnet_name::DnsServerWatcherRequestStream) {
308 self.request_streams.push(req_stream.tagged(self.next_id));
309 self.next_id.0 += 1;
310 }
311}
312
313impl futures::Stream for DnsServerWatcherRequestStreams {
314 type Item = (ConnectionId, Result<fnet_name::DnsServerWatcherRequest, fidl::Error>);
315
316 fn poll_next(
317 mut self: std::pin::Pin<&mut Self>,
318 cx: &mut std::task::Context<'_>,
319 ) -> std::task::Poll<Option<Self::Item>> {
320 std::pin::Pin::new(&mut self.request_streams).poll_next(cx)
321 }
322}
323
324impl futures::stream::FusedStream for DnsServerWatcherRequestStreams {
325 fn is_terminated(&self) -> bool {
326 self.request_streams.is_terminated()
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use anyhow::{anyhow, Context as _};
333 use fuchsia_component::server::{ServiceFs, ServiceFsDir};
334 use fuchsia_component_test::{
335 Capability, ChildOptions, LocalComponentHandles, RealmBuilder, RealmInstance, Ref, Route,
336 };
337 use futures::channel::mpsc;
338 use futures::{
339 FutureExt as _, SinkExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _,
340 };
341 use net_declare::fidl_socket_addr;
342 use pretty_assertions::assert_eq;
343
344 use super::*;
345
346 enum StubbedServices {
347 LookupAdmin(fnet_name::LookupAdminRequestStream),
348 }
349
350 async fn run_lookup_admin(handles: LocalComponentHandles) -> Result<(), anyhow::Error> {
351 let mut fs = ServiceFs::new();
352 let _: &mut ServiceFsDir<'_, _> =
353 fs.dir("svc").add_fidl_service(StubbedServices::LookupAdmin);
354 let _: &mut ServiceFs<_> = fs.serve_connection(handles.outgoing_dir)?;
355
356 fs.for_each_concurrent(0, move |StubbedServices::LookupAdmin(stream)| async move {
357 stream
358 .try_for_each(|request| async move {
359 match request {
360 fidl_fuchsia_net_name::LookupAdminRequest::SetDnsServers { .. } => {
361 }
363 fidl_fuchsia_net_name::LookupAdminRequest::GetDnsServers { .. } => {
364 unimplemented!("Unused in this test")
365 }
366 }
367 Ok(())
368 })
369 .await
370 .context("Failed to serve request stream")
371 .unwrap_or_else(|e| warn!("Error encountered: {:?}", e))
372 })
373 .await;
374
375 Ok(())
376 }
377
378 enum IncomingService {
379 DnsServerWatcher(fnet_name::DnsServerWatcherRequestStream),
380 }
381
382 async fn run_dns_server_watcher(
383 handles: LocalComponentHandles,
384 mut receiver: mpsc::Receiver<(crate::DnsServersUpdateSource, Vec<fnet_name::DnsServer_>)>,
385 ) -> Result<(), anyhow::Error> {
386 let connection = handles.connect_to_protocol::<fnet_name::LookupAdminMarker>()?;
387
388 let mut fs = ServiceFs::new();
389 let _: &mut ServiceFsDir<'_, _> =
390 fs.dir("svc").add_fidl_service(IncomingService::DnsServerWatcher);
391 let _: &mut ServiceFs<_> = fs.serve_connection(handles.outgoing_dir)?;
392
393 let mut dns_server_watcher_incoming_requests = DnsServerWatcherRequestStreams::default();
394 let mut dns_servers = DnsServers::default();
395 let mut dns_server_watch_responders = DnsServerWatchResponders::default();
396 let mut netpol_networks_service = network::NetpolNetworksService::default();
397
398 let mut fs = futures::StreamExt::fuse(fs);
399
400 loop {
401 futures::select! {
402 req_stream = fs.select_next_some() => {
403 match req_stream {
404 IncomingService::DnsServerWatcher(stream) => {
405 dns_server_watcher_incoming_requests.handle_request_stream(stream)
406 }
407 }
408 }
409 req = dns_server_watcher_incoming_requests.select_next_some() => {
410 let (id, req) = req;
411 dns_server_watch_responders.handle_request(
412 id,
413 req,
414 &dns_servers,
415 )?;
416 }
417 update = receiver.select_next_some() => {
418 let (source, servers) = update;
419 update_servers(
420 &connection,
421 &mut dns_servers,
422 &mut dns_server_watch_responders,
423 &mut netpol_networks_service,
424 source,
425 servers,
426 ).await
427 }
428 }
429 }
430 }
431
432 async fn setup_test() -> Result<
433 (RealmInstance, mpsc::Sender<(crate::DnsServersUpdateSource, Vec<fnet_name::DnsServer_>)>),
434 anyhow::Error,
435 > {
436 let (tx, rx) = mpsc::channel(1);
437 let builder = RealmBuilder::new().await?;
438 let admin_server = builder
439 .add_local_child(
440 "lookup_admin",
441 move |handles: LocalComponentHandles| Box::pin(run_lookup_admin(handles)),
442 ChildOptions::new(),
443 )
444 .await?;
445
446 let dns_server_watcher = builder
447 .add_local_child(
448 "dns_server_watcher",
449 {
450 let rx = std::sync::Mutex::new(Some(rx));
451 move |handles: LocalComponentHandles| {
452 Box::pin(run_dns_server_watcher(
453 handles,
454 rx.lock()
455 .expect("lock poison")
456 .take()
457 .expect("Only one instance of run_dns_server_watcher should exist"),
458 ))
459 }
460 },
461 ChildOptions::new(),
462 )
463 .await?;
464
465 builder
466 .add_route(
467 Route::new()
468 .capability(Capability::protocol::<fnet_name::DnsServerWatcherMarker>())
469 .from(&dns_server_watcher)
470 .to(Ref::parent()),
471 )
472 .await?;
473 builder
474 .add_route(
475 Route::new()
476 .capability(Capability::protocol::<fnet_name::LookupAdminMarker>())
477 .from(&admin_server)
478 .to(&dns_server_watcher),
479 )
480 .await?;
481
482 let realm = builder.build().await?;
483
484 Ok((realm, tx))
485 }
486
487 fn server(address: fidl_fuchsia_net::SocketAddress) -> fnet_name::DnsServer_ {
488 fnet_name::DnsServer_ { address: Some(address), ..fnet_name::DnsServer_::default() }
489 }
490
491 #[fuchsia::test]
492 async fn test_dns_server_watcher() -> Result<(), anyhow::Error> {
493 let (realm, mut tx) = setup_test().await?;
494
495 let watcher1 = realm
496 .root
497 .connect_to_protocol_at_exposed_dir::<fnet_name::DnsServerWatcherMarker>()
498 .context("While connecting to DnsServerWatcher")?;
499 let watcher2 = realm
500 .root
501 .connect_to_protocol_at_exposed_dir::<fnet_name::DnsServerWatcherMarker>()
502 .context("While connecting to DnsServerWatcher")?;
503
504 assert_eq!(watcher1.watch_servers().await?, vec![]);
505 assert_eq!(watcher2.watch_servers().await?, vec![]);
506
507 let mut watcher1_call = watcher1.watch_servers().fuse();
509 futures::select! {
510 _ = watcher1_call => {
511 return Err(
512 anyhow!("WatchServers should not respond here, there have been no updates")
513 );
514 },
515 _ = fuchsia_async::Timer::new(std::time::Duration::from_millis(100)).fuse() => {}
516 }
517
518 let (watch1, watch2, _) = futures::try_join!(
520 watcher1_call.map_err(|e| anyhow::Error::from(e)),
522 watcher2.watch_servers().map_err(|e| anyhow::Error::from(e)),
523 tx.send((
524 DnsServersUpdateSource::Default,
525 vec![server(fidl_socket_addr!("203.0.113.1:1"))],
526 ))
527 .map_err(|e| anyhow::Error::from(e)),
528 )?;
529 assert_eq!(watch1, vec![server(fidl_socket_addr!("203.0.113.1:1")),]);
530 assert_eq!(watch2, vec![server(fidl_socket_addr!("203.0.113.1:1")),]);
531
532 let (watch1, watch2, _) = futures::try_join!(
534 watcher1.watch_servers().map_err(|e| anyhow::Error::from(e)),
535 watcher2.watch_servers().map_err(|e| anyhow::Error::from(e)),
536 tx.send((
537 DnsServersUpdateSource::Dhcpv4 { interface_id: 1 },
538 vec![server(fidl_socket_addr!("203.0.113.1:2")),],
539 ))
540 .map_err(|e| anyhow::Error::from(e)),
541 )?;
542 let expectation = vec![
545 server(fidl_socket_addr!("203.0.113.1:2")),
546 server(fidl_socket_addr!("203.0.113.1:1")),
547 ];
548 assert_eq!(watch1, expectation);
549 assert_eq!(watch2, expectation);
550
551 let (watch1, _) = futures::try_join!(
553 watcher1.watch_servers().map_err(|e| anyhow::Error::from(e)),
554 tx.send((
555 DnsServersUpdateSource::Dhcpv6 { interface_id: 1 },
556 vec![server(fidl_socket_addr!("[2001:db8::]:1")),],
557 ))
558 .map_err(|e| anyhow::Error::from(e)),
559 )?;
560 let expectation = vec![
562 server(fidl_socket_addr!("203.0.113.1:2")),
563 server(fidl_socket_addr!("[2001:db8::]:1")),
564 server(fidl_socket_addr!("203.0.113.1:1")),
565 ];
566 assert_eq!(watch1, expectation);
567
568 tx.send((
572 DnsServersUpdateSource::Default,
573 vec![fnet_name::DnsServer_ {
574 address: Some(fidl_socket_addr!("203.0.113.1:5")),
575 ..fnet_name::DnsServer_::default()
576 }],
577 ))
578 .await?;
579 let (watch1, watch2) = futures::try_join!(
580 watcher1.watch_servers().map_err(|e| anyhow::Error::from(e)),
581 watcher2.watch_servers().map_err(|e| anyhow::Error::from(e)),
582 )?;
583 let expectation = vec![
585 server(fidl_socket_addr!("203.0.113.1:2")),
586 server(fidl_socket_addr!("[2001:db8::]:1")),
587 server(fidl_socket_addr!("203.0.113.1:5")),
588 ];
589 assert_eq!(watch1, expectation);
590
591 assert_eq!(watch2, expectation);
593
594 Ok(())
595 }
596}