reachability_handler/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_utils::hanging_get::server as hanging_get;
6use fidl::endpoints::ControlHandle;
7use fuchsia_component::server::{ServiceFsDir, ServiceObjLocal};
8use futures::lock::Mutex;
9use futures::{TryFutureExt as _, TryStreamExt as _};
10use log::error;
11use std::sync::Arc;
12use {fidl_fuchsia_net_reachability as freachability, fuchsia_async as fasync};
13
14type WatchResponder = freachability::MonitorWatchResponder;
15type NotifyFn = Box<dyn Fn(&freachability::Snapshot, WatchResponder) -> bool>;
16type ReachabilityBroker =
17    hanging_get::HangingGet<freachability::Snapshot, WatchResponder, NotifyFn>;
18type ReachabilityPublisher =
19    hanging_get::Publisher<freachability::Snapshot, WatchResponder, NotifyFn>;
20
21pub struct ReachabilityHandler {
22    state: Arc<Mutex<ReachabilityState>>,
23    broker: Arc<Mutex<ReachabilityBroker>>,
24    publisher: Arc<Mutex<ReachabilityPublisher>>,
25}
26
27#[derive(Clone, Debug, PartialEq)]
28pub struct ReachabilityState {
29    pub internet_available: bool,
30    pub gateway_reachable: bool,
31    pub dns_active: bool,
32    pub http_active: bool,
33}
34
35impl From<ReachabilityState> for freachability::Snapshot {
36    fn from(state: ReachabilityState) -> Self {
37        Self {
38            internet_available: Some(state.internet_available),
39            gateway_reachable: Some(state.gateway_reachable),
40            dns_active: Some(state.dns_active),
41            http_active: Some(state.http_active),
42            ..Default::default()
43        }
44    }
45}
46
47impl ReachabilityHandler {
48    pub fn new() -> Self {
49        let notify_fn: NotifyFn = Box::new(|state, responder| match responder.send(&state) {
50            Ok(()) => true,
51            Err(e) => {
52                error!("Failed to send reachability state to client: {}", e);
53                false
54            }
55        });
56        let state = ReachabilityState {
57            internet_available: false,
58            gateway_reachable: false,
59            dns_active: false,
60            http_active: false,
61        };
62        let broker = hanging_get::HangingGet::new(state.clone().into(), notify_fn);
63        let publisher = broker.new_publisher();
64        Self {
65            state: Arc::new(Mutex::new(state)),
66            broker: Arc::new(Mutex::new(broker)),
67            publisher: Arc::new(Mutex::new(publisher)),
68        }
69    }
70
71    pub async fn replace_state(&mut self, new_state: ReachabilityState) {
72        self.update_state(|state| *state = new_state).await;
73    }
74
75    async fn update_state(&mut self, update_callback: impl FnOnce(&mut ReachabilityState)) {
76        let mut current_state_guard = self.state.lock().await;
77        let previous_state = current_state_guard.clone();
78
79        update_callback(&mut current_state_guard);
80
81        if *current_state_guard != previous_state {
82            self.publisher
83                .lock()
84                .await
85                .set(freachability::Snapshot::from(current_state_guard.clone()));
86        }
87    }
88
89    pub fn publish_service<'a, 'b>(
90        &mut self,
91        mut svc_dir: ServiceFsDir<'a, ServiceObjLocal<'b, ()>>,
92    ) {
93        let _ = svc_dir.add_fidl_service({
94            let broker = self.broker.clone();
95            move |mut stream: freachability::MonitorRequestStream| {
96                let broker = broker.clone();
97                fasync::Task::local(
98                    async move {
99                        let subscriber = broker.lock().await.new_subscriber();
100                        // Keep track of whether SetOptions or Watch were already called. Calling
101                        // SetOptions after either it or Watch have already been called will result in us
102                        // closing the request stream.
103                        let mut set_options_called = false;
104                        let mut watch_called = false;
105                        while let Some(req) = stream.try_next().await? {
106                            match req {
107                                freachability::MonitorRequest::Watch { responder } => {
108                                    watch_called = true;
109                                    subscriber.register(responder)?
110                                }
111                                freachability::MonitorRequest::SetOptions {
112                                    payload: _,
113                                    control_handle,
114                                } => {
115                                    if watch_called || set_options_called {
116                                        control_handle.shutdown_with_epitaph(
117                                            fidl::Status::CONNECTION_ABORTED,
118                                        );
119                                        break;
120                                    }
121                                    set_options_called = true;
122                                }
123                            }
124                        }
125
126                        Ok(())
127                    }
128                    .unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
129                )
130                .detach()
131            }
132        });
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use anyhow::Error;
140    use assert_matches::assert_matches;
141    use fidl::endpoints::Proxy;
142    use fuchsia_component::server::ServiceFs;
143    use futures::StreamExt as _;
144    use std::cell::RefCell;
145    use std::task::Poll;
146
147    struct TestEnv {
148        connector: fuchsia_component::server::ProtocolConnector,
149    }
150
151    impl TestEnv {
152        fn new(mut service_fs: ServiceFs<ServiceObjLocal<'static, ()>>) -> Self {
153            let connector = service_fs.create_protocol_connector().unwrap();
154            fasync::Task::local(service_fs.collect()).detach();
155            Self { connector }
156        }
157
158        fn connect_client(&self) -> FakeClient {
159            let watcher_proxy =
160                self.connector.connect_to_protocol::<freachability::MonitorMarker>().unwrap();
161            FakeClient { watcher_proxy, hanging_watcher_request: RefCell::new(None) }
162        }
163    }
164
165    struct FakeClient {
166        watcher_proxy: freachability::MonitorProxy,
167        hanging_watcher_request:
168            RefCell<Option<fidl::client::QueryResponseFut<freachability::Snapshot>>>,
169    }
170
171    impl FakeClient {
172        fn get_reachability_state(
173            &self,
174            executor: &mut fasync::TestExecutor,
175        ) -> Result<Option<freachability::Snapshot>, Error> {
176            let mut watch_request = self
177                .hanging_watcher_request
178                .take()
179                .take()
180                .unwrap_or_else(|| self.watcher_proxy.watch());
181
182            match executor.run_until_stalled(&mut watch_request) {
183                Poll::Pending => {
184                    let _: Option<fidl::client::QueryResponseFut<freachability::Snapshot>> =
185                        self.hanging_watcher_request.replace(Some(watch_request));
186                    Ok(None)
187                }
188                Poll::Ready(Ok(state)) => Ok(Some(state)),
189                Poll::Ready(Err(e)) => Err(e.into()),
190            }
191        }
192    }
193
194    // Tests that the handler correctly implements the hanging-get pattern.
195    #[test]
196    fn test_hanging_get() {
197        let mut executor = fasync::TestExecutor::new();
198        let mut service_fs = ServiceFs::new_local();
199        let mut handler = ReachabilityHandler::new();
200        handler.publish_service(service_fs.root_dir());
201        let test_env = TestEnv::new(service_fs);
202        let client = test_env.connect_client();
203
204        assert_matches!(
205            client.get_reachability_state(&mut executor),
206            Ok(Some(freachability::Snapshot {
207                internet_available: Some(false),
208                gateway_reachable: Some(false),
209                dns_active: Some(false),
210                ..
211            }))
212        );
213
214        // Verify no response as state hasn't changed.
215        assert_matches!(client.get_reachability_state(&mut executor), Ok(None));
216
217        executor.run_singlethreaded(handler.replace_state(ReachabilityState {
218            internet_available: true,
219            gateway_reachable: true,
220            dns_active: true,
221            http_active: true,
222        }));
223
224        assert_matches!(
225            client.get_reachability_state(&mut executor),
226            Ok(Some(freachability::Snapshot {
227                internet_available: Some(true),
228                gateway_reachable: Some(true),
229                dns_active: Some(true),
230                http_active: Some(true),
231                ..
232            }))
233        );
234    }
235
236    #[test]
237    fn test_hanging_get_multiple_clients() {
238        let mut executor = fasync::TestExecutor::new();
239        let mut service_fs = ServiceFs::new_local();
240        let mut handler = ReachabilityHandler::new();
241        handler.publish_service(service_fs.root_dir());
242        let test_env = TestEnv::new(service_fs);
243
244        let client1 = test_env.connect_client();
245        let client2 = test_env.connect_client();
246
247        assert_matches!(
248            client1.get_reachability_state(&mut executor),
249            Ok(Some(freachability::Snapshot {
250                internet_available: Some(false),
251                gateway_reachable: Some(false),
252                dns_active: Some(false),
253                ..
254            }))
255        );
256        assert_matches!(
257            client2.get_reachability_state(&mut executor),
258            Ok(Some(freachability::Snapshot {
259                internet_available: Some(false),
260                gateway_reachable: Some(false),
261                dns_active: Some(false),
262                ..
263            }))
264        );
265
266        assert_matches!(client1.get_reachability_state(&mut executor), Ok(None));
267        assert_matches!(client2.get_reachability_state(&mut executor), Ok(None));
268
269        executor.run_singlethreaded(handler.update_state(|state| {
270            state.internet_available = true;
271            state.gateway_reachable = true;
272        }));
273
274        assert_matches!(
275            client1.get_reachability_state(&mut executor),
276            Ok(Some(freachability::Snapshot {
277                internet_available: Some(true),
278                gateway_reachable: Some(true),
279                dns_active: Some(false),
280                ..
281            }))
282        );
283        assert_matches!(
284            client2.get_reachability_state(&mut executor),
285            Ok(Some(freachability::Snapshot {
286                internet_available: Some(true),
287                gateway_reachable: Some(true),
288                dns_active: Some(false),
289                ..
290            }))
291        );
292
293        // An update that does not change the current state should not be published.
294        executor.run_singlethreaded(handler.update_state(|state| {
295            state.internet_available = true;
296            state.gateway_reachable = true;
297            state.dns_active = false;
298        }));
299
300        assert_matches!(client1.get_reachability_state(&mut executor), Ok(None));
301        assert_matches!(client2.get_reachability_state(&mut executor), Ok(None));
302    }
303
304    // Tests that the handler closes the request stream if the client calls SetOptions after having
305    // already called Watch.
306    #[test]
307    fn test_cannot_call_set_options_after_watch() {
308        let mut executor = fasync::TestExecutor::new();
309        let mut service_fs = ServiceFs::new_local();
310        let mut handler = ReachabilityHandler::new();
311        handler.publish_service(service_fs.root_dir());
312        let test_env = TestEnv::new(service_fs);
313        let client = test_env.connect_client();
314
315        assert_matches!(client.get_reachability_state(&mut executor), Ok(_));
316        assert_matches!(
317            client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
318            Ok(())
319        );
320        assert_matches!(executor.run_singlethreaded(client.watcher_proxy.on_closed()), Ok(_));
321    }
322
323    // Tests that the handler closes the request stream if the client calls SetOptions after having
324    // already called it before.
325    #[test]
326    fn test_cannot_call_set_options_twice() {
327        let mut executor = fasync::TestExecutor::new();
328        let mut service_fs = ServiceFs::new_local();
329        let mut handler = ReachabilityHandler::new();
330        handler.publish_service(service_fs.root_dir());
331        let test_env = TestEnv::new(service_fs);
332        let client = test_env.connect_client();
333
334        assert_matches!(
335            client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
336            Ok(())
337        );
338        assert_matches!(
339            client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
340            Ok(())
341        );
342        assert_matches!(executor.run_singlethreaded(client.watcher_proxy.on_closed()), Ok(_));
343    }
344}