dns_server_watcher/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! DNS Server watcher stream.
6
7use fidl_fuchsia_net_name::{
8    DnsServerSource, DnsServerWatcherProxy, DnsServer_, SocketProxyDnsServerSource,
9};
10
11use async_utils::stream::WithTag as _;
12use fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy;
13use futures::future::TryFutureExt as _;
14use futures::stream::Stream;
15
16/// The possible sources of DNS server updates.
17#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
18pub enum DnsServersUpdateSource {
19    Default,
20    Netstack,
21    Dhcpv4 { interface_id: u64 },
22    Dhcpv6 { interface_id: u64 },
23    Ndp { interface_id: u64 },
24    SocketProxy,
25}
26
27/// Returns a `Stream` of [`DnsServerWatcherEvent`]s from watching the server configuration
28/// provided by `proxy`.
29pub fn new_dns_server_stream(
30    source: DnsServersUpdateSource,
31    proxy: DnsServerWatcherProxy,
32) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
33    futures::stream::try_unfold(proxy, move |proxy| {
34        proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
35    })
36    .tagged(source)
37}
38
39/// Returns a `Stream` of [`DnsServerWatcherEvent`]s from watching the server configuration
40/// provided by fnp_socketproxy's `DnsServerWatcher`.
41pub fn new_dns_server_stream_socketproxy(
42    proxy: fnp_socketproxy::DnsServerWatcherProxy,
43) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
44    futures::stream::try_unfold(proxy, move |proxy| {
45        proxy.watch_servers().map_ok(move |lists| {
46            let dns_list = lists
47                .into_iter()
48                .map(move |dns_server_list| dns_servers_from_dns_server_list(dns_server_list))
49                .flatten()
50                .collect::<Vec<_>>();
51
52            Some((dns_list, proxy))
53        })
54    })
55    .tagged(DnsServersUpdateSource::SocketProxy)
56}
57
58/// Returns a `Vec` of [`DnsServer_`] from a `fnp_socketproxy::DnsServerList`.
59/// Assumption: all DNS servers retrieved from the socketproxy are from interfaces
60/// that have been provisioned by an agent other than Fuchsia.
61fn dns_servers_from_dns_server_list(
62    fnp_socketproxy::DnsServerList { addresses, source_network_id, ..}: fnp_socketproxy::DnsServerList,
63) -> Vec<DnsServer_> {
64    let id: u64 = match source_network_id {
65        Some(id) => id.into(),
66        // When a network id is not specified, return an empty list.
67        None => return vec![],
68    };
69    addresses
70        .unwrap_or_default()
71        .into_iter()
72        .map(|addr| DnsServer_ {
73            address: Some(addr),
74            source: Some(DnsServerSource::SocketProxy(SocketProxyDnsServerSource {
75                source_interface: Some(id),
76                ..Default::default()
77            })),
78            ..Default::default()
79        })
80        .collect()
81}
82
83#[cfg(test)]
84mod tests {
85    use std::collections::VecDeque;
86    use std::sync::Arc;
87
88    use fidl_fuchsia_net_name::{
89        DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
90        DnsServerWatcherWatchServersResponder,
91    };
92
93    use fuchsia_async as fasync;
94    use futures::lock::Mutex;
95    use futures::{FutureExt, StreamExt, TryStreamExt};
96
97    use super::*;
98    use crate::test_util::constants::*;
99
100    struct MockDnsServerWatcher {
101        configs: VecDeque<Vec<DnsServer_>>,
102        pending_request: Option<DnsServerWatcherWatchServersResponder>,
103    }
104
105    impl MockDnsServerWatcher {
106        fn new() -> Self {
107            Self { configs: VecDeque::new(), pending_request: None }
108        }
109
110        fn push_config(&mut self, config: Vec<DnsServer_>) {
111            match self.pending_request.take() {
112                Some(req) => {
113                    let () = req.send(&config).expect("Failed to fulfill FIDL request");
114                }
115                None => self.configs.push_back(config),
116            }
117        }
118
119        async fn serve(
120            watcher: Arc<Mutex<Self>>,
121            rs: DnsServerWatcherRequestStream,
122        ) -> Result<(), fidl::Error> {
123            rs.try_for_each(move |r| {
124                let watcher = watcher.clone();
125                async move {
126                    match r {
127                        DnsServerWatcherRequest::WatchServers { responder } => {
128                            let mut w = watcher.lock().await;
129                            if w.pending_request.is_some() {
130                                panic!("No more than 1 pending requests allowed");
131                            }
132
133                            if let Some(config) = w.configs.pop_front() {
134                                responder.send(&config).expect("Failed to fulfill FIDL request");
135                            } else {
136                                w.pending_request = Some(responder)
137                            }
138                        }
139                    }
140                    Ok(())
141                }
142            })
143            .await
144        }
145    }
146
147    #[fasync::run_singlethreaded(test)]
148    async fn test_dns_server_stream() {
149        let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
150        let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
151        let (serve_fut, abort_handle) =
152            futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
153
154        let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
155            let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
156            assert!(stream.next().now_or_never().is_none());
157            assert!(stream.next().now_or_never().is_none());
158            {
159                let mut w = watcher.lock().await;
160                w.push_config(vec![ndp_server()]);
161                w.push_config(vec![static_server()]);
162            }
163            let (source, res) = stream.next().await.expect("stream ended unexpectedly");
164            assert_eq!(source, DnsServersUpdateSource::Netstack);
165            assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
166
167            let (source, res) = stream.next().await.expect("stream ended unexpectedly");
168            assert_eq!(source, DnsServersUpdateSource::Netstack);
169            assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
170
171            // Abort the serving future so join will end.
172            abort_handle.abort();
173            stream
174        })
175        .await;
176        let _aborted = serve_result.expect_err("Future must've been aborted");
177        let (source, res) = stream.next().await.expect("Stream must yield a final value");
178        assert_eq!(source, DnsServersUpdateSource::Netstack);
179        let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
180        assert!(stream.next().await.is_none(), "Stream must end after error");
181    }
182}