1use async_utils::hanging_get::server as hanging_get;
6use fidl::endpoints::ControlHandle;
7use fuchsia_component::server::{ServiceFsDir, ServiceObjLocal};
8use futures::lock::Mutex;
9use futures::{TryFutureExt as _, TryStreamExt as _};
10use log::error;
11use std::sync::Arc;
12use {fidl_fuchsia_net_reachability as freachability, fuchsia_async as fasync};
13
14type WatchResponder = freachability::MonitorWatchResponder;
15type NotifyFn = Box<dyn Fn(&freachability::Snapshot, WatchResponder) -> bool>;
16type ReachabilityBroker =
17 hanging_get::HangingGet<freachability::Snapshot, WatchResponder, NotifyFn>;
18type ReachabilityPublisher =
19 hanging_get::Publisher<freachability::Snapshot, WatchResponder, NotifyFn>;
20
21pub struct ReachabilityHandler {
22 state: Arc<Mutex<ReachabilityState>>,
23 broker: Arc<Mutex<ReachabilityBroker>>,
24 publisher: Arc<Mutex<ReachabilityPublisher>>,
25}
26
27#[derive(Clone, Debug, PartialEq)]
28pub struct ReachabilityState {
29 pub internet_available: bool,
30 pub gateway_reachable: bool,
31 pub dns_active: bool,
32 pub http_active: bool,
33}
34
35impl From<ReachabilityState> for freachability::Snapshot {
36 fn from(state: ReachabilityState) -> Self {
37 Self {
38 internet_available: Some(state.internet_available),
39 gateway_reachable: Some(state.gateway_reachable),
40 dns_active: Some(state.dns_active),
41 http_active: Some(state.http_active),
42 ..Default::default()
43 }
44 }
45}
46
47impl ReachabilityHandler {
48 pub fn new() -> Self {
49 let notify_fn: NotifyFn = Box::new(|state, responder| match responder.send(&state) {
50 Ok(()) => true,
51 Err(e) => {
52 error!("Failed to send reachability state to client: {}", e);
53 false
54 }
55 });
56 let state = ReachabilityState {
57 internet_available: false,
58 gateway_reachable: false,
59 dns_active: false,
60 http_active: false,
61 };
62 let broker = hanging_get::HangingGet::new(state.clone().into(), notify_fn);
63 let publisher = broker.new_publisher();
64 Self {
65 state: Arc::new(Mutex::new(state)),
66 broker: Arc::new(Mutex::new(broker)),
67 publisher: Arc::new(Mutex::new(publisher)),
68 }
69 }
70
71 pub async fn replace_state(&mut self, new_state: ReachabilityState) {
72 self.update_state(|state| *state = new_state).await;
73 }
74
75 async fn update_state(&mut self, update_callback: impl FnOnce(&mut ReachabilityState)) {
76 let mut current_state_guard = self.state.lock().await;
77 let previous_state = current_state_guard.clone();
78
79 update_callback(&mut current_state_guard);
80
81 if *current_state_guard != previous_state {
82 self.publisher
83 .lock()
84 .await
85 .set(freachability::Snapshot::from(current_state_guard.clone()));
86 }
87 }
88
89 pub fn publish_service<'a, 'b>(
90 &mut self,
91 mut svc_dir: ServiceFsDir<'a, ServiceObjLocal<'b, ()>>,
92 ) {
93 let _ = svc_dir.add_fidl_service({
94 let broker = self.broker.clone();
95 move |mut stream: freachability::MonitorRequestStream| {
96 let broker = broker.clone();
97 fasync::Task::local(
98 async move {
99 let subscriber = broker.lock().await.new_subscriber();
100 let mut set_options_called = false;
104 let mut watch_called = false;
105 while let Some(req) = stream.try_next().await? {
106 match req {
107 freachability::MonitorRequest::Watch { responder } => {
108 watch_called = true;
109 subscriber.register(responder)?
110 }
111 freachability::MonitorRequest::SetOptions {
112 payload: _,
113 control_handle,
114 } => {
115 if watch_called || set_options_called {
116 control_handle.shutdown_with_epitaph(
117 fidl::Status::CONNECTION_ABORTED,
118 );
119 break;
120 }
121 set_options_called = true;
122 }
123 }
124 }
125
126 Ok(())
127 }
128 .unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
129 )
130 .detach()
131 }
132 });
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use anyhow::Error;
140 use assert_matches::assert_matches;
141 use fidl::endpoints::Proxy;
142 use fuchsia_component::server::ServiceFs;
143 use futures::StreamExt as _;
144 use std::cell::RefCell;
145 use std::task::Poll;
146
147 struct TestEnv {
148 connector: fuchsia_component::server::ProtocolConnector,
149 }
150
151 impl TestEnv {
152 fn new(mut service_fs: ServiceFs<ServiceObjLocal<'static, ()>>) -> Self {
153 let connector = service_fs.create_protocol_connector().unwrap();
154 fasync::Task::local(service_fs.collect()).detach();
155 Self { connector }
156 }
157
158 fn connect_client(&self) -> FakeClient {
159 let watcher_proxy =
160 self.connector.connect_to_protocol::<freachability::MonitorMarker>().unwrap();
161 FakeClient { watcher_proxy, hanging_watcher_request: RefCell::new(None) }
162 }
163 }
164
165 struct FakeClient {
166 watcher_proxy: freachability::MonitorProxy,
167 hanging_watcher_request:
168 RefCell<Option<fidl::client::QueryResponseFut<freachability::Snapshot>>>,
169 }
170
171 impl FakeClient {
172 fn get_reachability_state(
173 &self,
174 executor: &mut fasync::TestExecutor,
175 ) -> Result<Option<freachability::Snapshot>, Error> {
176 let mut watch_request = self
177 .hanging_watcher_request
178 .take()
179 .take()
180 .unwrap_or_else(|| self.watcher_proxy.watch());
181
182 match executor.run_until_stalled(&mut watch_request) {
183 Poll::Pending => {
184 let _: Option<fidl::client::QueryResponseFut<freachability::Snapshot>> =
185 self.hanging_watcher_request.replace(Some(watch_request));
186 Ok(None)
187 }
188 Poll::Ready(Ok(state)) => Ok(Some(state)),
189 Poll::Ready(Err(e)) => Err(e.into()),
190 }
191 }
192 }
193
194 #[test]
196 fn test_hanging_get() {
197 let mut executor = fasync::TestExecutor::new();
198 let mut service_fs = ServiceFs::new_local();
199 let mut handler = ReachabilityHandler::new();
200 handler.publish_service(service_fs.root_dir());
201 let test_env = TestEnv::new(service_fs);
202 let client = test_env.connect_client();
203
204 assert_matches!(
205 client.get_reachability_state(&mut executor),
206 Ok(Some(freachability::Snapshot {
207 internet_available: Some(false),
208 gateway_reachable: Some(false),
209 dns_active: Some(false),
210 ..
211 }))
212 );
213
214 assert_matches!(client.get_reachability_state(&mut executor), Ok(None));
216
217 executor.run_singlethreaded(handler.replace_state(ReachabilityState {
218 internet_available: true,
219 gateway_reachable: true,
220 dns_active: true,
221 http_active: true,
222 }));
223
224 assert_matches!(
225 client.get_reachability_state(&mut executor),
226 Ok(Some(freachability::Snapshot {
227 internet_available: Some(true),
228 gateway_reachable: Some(true),
229 dns_active: Some(true),
230 http_active: Some(true),
231 ..
232 }))
233 );
234 }
235
236 #[test]
237 fn test_hanging_get_multiple_clients() {
238 let mut executor = fasync::TestExecutor::new();
239 let mut service_fs = ServiceFs::new_local();
240 let mut handler = ReachabilityHandler::new();
241 handler.publish_service(service_fs.root_dir());
242 let test_env = TestEnv::new(service_fs);
243
244 let client1 = test_env.connect_client();
245 let client2 = test_env.connect_client();
246
247 assert_matches!(
248 client1.get_reachability_state(&mut executor),
249 Ok(Some(freachability::Snapshot {
250 internet_available: Some(false),
251 gateway_reachable: Some(false),
252 dns_active: Some(false),
253 ..
254 }))
255 );
256 assert_matches!(
257 client2.get_reachability_state(&mut executor),
258 Ok(Some(freachability::Snapshot {
259 internet_available: Some(false),
260 gateway_reachable: Some(false),
261 dns_active: Some(false),
262 ..
263 }))
264 );
265
266 assert_matches!(client1.get_reachability_state(&mut executor), Ok(None));
267 assert_matches!(client2.get_reachability_state(&mut executor), Ok(None));
268
269 executor.run_singlethreaded(handler.update_state(|state| {
270 state.internet_available = true;
271 state.gateway_reachable = true;
272 }));
273
274 assert_matches!(
275 client1.get_reachability_state(&mut executor),
276 Ok(Some(freachability::Snapshot {
277 internet_available: Some(true),
278 gateway_reachable: Some(true),
279 dns_active: Some(false),
280 ..
281 }))
282 );
283 assert_matches!(
284 client2.get_reachability_state(&mut executor),
285 Ok(Some(freachability::Snapshot {
286 internet_available: Some(true),
287 gateway_reachable: Some(true),
288 dns_active: Some(false),
289 ..
290 }))
291 );
292
293 executor.run_singlethreaded(handler.update_state(|state| {
295 state.internet_available = true;
296 state.gateway_reachable = true;
297 state.dns_active = false;
298 }));
299
300 assert_matches!(client1.get_reachability_state(&mut executor), Ok(None));
301 assert_matches!(client2.get_reachability_state(&mut executor), Ok(None));
302 }
303
304 #[test]
307 fn test_cannot_call_set_options_after_watch() {
308 let mut executor = fasync::TestExecutor::new();
309 let mut service_fs = ServiceFs::new_local();
310 let mut handler = ReachabilityHandler::new();
311 handler.publish_service(service_fs.root_dir());
312 let test_env = TestEnv::new(service_fs);
313 let client = test_env.connect_client();
314
315 assert_matches!(client.get_reachability_state(&mut executor), Ok(_));
316 assert_matches!(
317 client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
318 Ok(())
319 );
320 assert_matches!(executor.run_singlethreaded(client.watcher_proxy.on_closed()), Ok(_));
321 }
322
323 #[test]
326 fn test_cannot_call_set_options_twice() {
327 let mut executor = fasync::TestExecutor::new();
328 let mut service_fs = ServiceFs::new_local();
329 let mut handler = ReachabilityHandler::new();
330 handler.publish_service(service_fs.root_dir());
331 let test_env = TestEnv::new(service_fs);
332 let client = test_env.connect_client();
333
334 assert_matches!(
335 client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
336 Ok(())
337 );
338 assert_matches!(
339 client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
340 Ok(())
341 );
342 assert_matches!(executor.run_singlethreaded(client.watcher_proxy.on_closed()), Ok(_));
343 }
344}