dns_server_watcher/
stream.rs1use fidl_fuchsia_net_name::{
8 DnsServerSource, DnsServerWatcherProxy, DnsServer_, SocketProxyDnsServerSource,
9};
10
11use async_utils::stream::WithTag as _;
12use fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy;
13use futures::future::TryFutureExt as _;
14use futures::stream::Stream;
15
16#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
18pub enum DnsServersUpdateSource {
19 Default,
20 Netstack,
21 Dhcpv4 { interface_id: u64 },
22 Dhcpv6 { interface_id: u64 },
23 Ndp { interface_id: u64 },
24 SocketProxy,
25}
26
27pub fn new_dns_server_stream(
30 source: DnsServersUpdateSource,
31 proxy: DnsServerWatcherProxy,
32) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
33 futures::stream::try_unfold(proxy, move |proxy| {
34 proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
35 })
36 .tagged(source)
37}
38
39pub fn new_dns_server_stream_socketproxy(
42 proxy: fnp_socketproxy::DnsServerWatcherProxy,
43) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
44 futures::stream::try_unfold(proxy, move |proxy| {
45 proxy.watch_servers().map_ok(move |lists| {
46 let dns_list = lists
47 .into_iter()
48 .map(move |dns_server_list| dns_servers_from_dns_server_list(dns_server_list))
49 .flatten()
50 .collect::<Vec<_>>();
51
52 Some((dns_list, proxy))
53 })
54 })
55 .tagged(DnsServersUpdateSource::SocketProxy)
56}
57
58fn dns_servers_from_dns_server_list(
62 fnp_socketproxy::DnsServerList { addresses, source_network_id, ..}: fnp_socketproxy::DnsServerList,
63) -> Vec<DnsServer_> {
64 let id: u64 = match source_network_id {
65 Some(id) => id.into(),
66 None => return vec![],
68 };
69 addresses
70 .unwrap_or_default()
71 .into_iter()
72 .map(|addr| DnsServer_ {
73 address: Some(addr),
74 source: Some(DnsServerSource::SocketProxy(SocketProxyDnsServerSource {
75 source_interface: Some(id),
76 ..Default::default()
77 })),
78 ..Default::default()
79 })
80 .collect()
81}
82
83#[cfg(test)]
84mod tests {
85 use std::collections::VecDeque;
86 use std::sync::Arc;
87
88 use fidl_fuchsia_net_name::{
89 DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
90 DnsServerWatcherWatchServersResponder,
91 };
92
93 use fuchsia_async as fasync;
94 use futures::lock::Mutex;
95 use futures::{FutureExt, StreamExt, TryStreamExt};
96
97 use super::*;
98 use crate::test_util::constants::*;
99
100 struct MockDnsServerWatcher {
101 configs: VecDeque<Vec<DnsServer_>>,
102 pending_request: Option<DnsServerWatcherWatchServersResponder>,
103 }
104
105 impl MockDnsServerWatcher {
106 fn new() -> Self {
107 Self { configs: VecDeque::new(), pending_request: None }
108 }
109
110 fn push_config(&mut self, config: Vec<DnsServer_>) {
111 match self.pending_request.take() {
112 Some(req) => {
113 let () = req.send(&config).expect("Failed to fulfill FIDL request");
114 }
115 None => self.configs.push_back(config),
116 }
117 }
118
119 async fn serve(
120 watcher: Arc<Mutex<Self>>,
121 rs: DnsServerWatcherRequestStream,
122 ) -> Result<(), fidl::Error> {
123 rs.try_for_each(move |r| {
124 let watcher = watcher.clone();
125 async move {
126 match r {
127 DnsServerWatcherRequest::WatchServers { responder } => {
128 let mut w = watcher.lock().await;
129 if w.pending_request.is_some() {
130 panic!("No more than 1 pending requests allowed");
131 }
132
133 if let Some(config) = w.configs.pop_front() {
134 responder.send(&config).expect("Failed to fulfill FIDL request");
135 } else {
136 w.pending_request = Some(responder)
137 }
138 }
139 }
140 Ok(())
141 }
142 })
143 .await
144 }
145 }
146
147 #[fasync::run_singlethreaded(test)]
148 async fn test_dns_server_stream() {
149 let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
150 let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
151 let (serve_fut, abort_handle) =
152 futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
153
154 let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
155 let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
156 assert!(stream.next().now_or_never().is_none());
157 assert!(stream.next().now_or_never().is_none());
158 {
159 let mut w = watcher.lock().await;
160 w.push_config(vec![ndp_server()]);
161 w.push_config(vec![static_server()]);
162 }
163 let (source, res) = stream.next().await.expect("stream ended unexpectedly");
164 assert_eq!(source, DnsServersUpdateSource::Netstack);
165 assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
166
167 let (source, res) = stream.next().await.expect("stream ended unexpectedly");
168 assert_eq!(source, DnsServersUpdateSource::Netstack);
169 assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
170
171 abort_handle.abort();
173 stream
174 })
175 .await;
176 let _aborted = serve_result.expect_err("Future must've been aborted");
177 let (source, res) = stream.next().await.expect("Stream must yield a final value");
178 assert_eq!(source, DnsServersUpdateSource::Netstack);
179 let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
180 assert!(stream.next().await.is_none(), "Stream must end after error");
181 }
182}