reachability_core/telemetry/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod convert;
6mod inspect;
7
8use self::inspect::{inspect_record_stats, Stats};
9use crate::{IpVersions, State};
10
11use anyhow::{format_err, Context, Error};
12use cobalt_client::traits::AsEventCode;
13use fidl_fuchsia_metrics::MetricEvent;
14use fuchsia_cobalt_builders::MetricEventExt;
15use fuchsia_inspect::Node as InspectNode;
16use fuchsia_sync::Mutex;
17use futures::channel::{mpsc, oneshot};
18use futures::{select, Future, StreamExt};
19use log::{info, warn};
20use static_assertions::const_assert_eq;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use windowed_stats::aggregations::SumAndCount;
24use {fuchsia_async as fasync, network_policy_metrics_registry as metrics};
25
26pub async fn create_metrics_logger(
27    factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
28) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
29    let (cobalt_proxy, cobalt_server) =
30        fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
31
32    let project_spec = fidl_fuchsia_metrics::ProjectSpec {
33        customer_id: None, // defaults to fuchsia.
34        project_id: Some(metrics::PROJECT_ID),
35        ..Default::default()
36    };
37
38    let status = factory_proxy
39        .create_metric_event_logger(&project_spec, cobalt_server)
40        .await
41        .context("create metrics event logger")?;
42
43    match status {
44        Ok(_) => Ok(cobalt_proxy),
45        Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
46    }
47}
48
49#[derive(Clone, Debug)]
50pub struct TelemetrySender {
51    sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
52    sender_is_blocked: Arc<AtomicBool>,
53}
54
55impl TelemetrySender {
56    pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
57        Self {
58            sender: Arc::new(Mutex::new(sender)),
59            sender_is_blocked: Arc::new(AtomicBool::new(false)),
60        }
61    }
62
63    // Send telemetry event. Log an error if it fails.
64    pub fn send(&self, event: TelemetryEvent) {
65        match self.sender.lock().try_send(event) {
66            Ok(_) => {
67                // If sender has been blocked before, set bool to false and log message.
68                if self
69                    .sender_is_blocked
70                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
71                    .is_ok()
72                {
73                    info!("TelemetrySender recovered and resumed sending");
74                }
75            }
76            Err(_) => {
77                // If sender has not been blocked before, set bool to true and log error message.
78                if self
79                    .sender_is_blocked
80                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
81                    .is_ok()
82                {
83                    warn!("TelemetrySender dropped a msg: either buffer is full or no receiver is waiting");
84                }
85            }
86        }
87    }
88}
89
90#[derive(Debug, Clone, Default, PartialEq)]
91struct SystemStateSummary {
92    system_state: IpVersions<Option<State>>,
93}
94
95impl SystemStateSummary {
96    fn ipv4_link_state_val(&self) -> u32 {
97        match self.system_state.ipv4 {
98            Some(s) => s.link as u32,
99            None => 0u32,
100        }
101    }
102
103    fn ipv6_link_state_val(&self) -> u32 {
104        match self.system_state.ipv6 {
105            Some(s) => s.link as u32,
106            None => 0u32,
107        }
108    }
109}
110
111#[derive(Debug, Clone, Default, PartialEq)]
112struct NetworkConfig {
113    has_default_ipv4_route: bool,
114    has_default_ipv6_route: bool,
115}
116
117#[derive(Debug, Clone, Default)]
118pub struct SystemStateUpdate {
119    pub(crate) system_state: IpVersions<Option<State>>,
120}
121
122#[derive(Debug)]
123pub enum TelemetryEvent {
124    SystemStateUpdate {
125        update: SystemStateUpdate,
126    },
127    NetworkConfig {
128        has_default_ipv4_route: bool,
129        has_default_ipv6_route: bool,
130    },
131    GatewayProbe {
132        internet_available: bool,
133        gateway_discoverable: bool,
134        gateway_pingable: bool,
135    },
136    /// Get the TimeSeries held by telemetry loop. Intended for test only.
137    GetTimeSeries {
138        sender: oneshot::Sender<Arc<Mutex<Stats>>>,
139    },
140}
141
142/// Capacity of "first come, first serve" slots available to clients of
143/// the mpsc::Sender<TelemetryEvent>. This threshold is arbitrary.
144const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
145
146const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
147
148pub fn serve_telemetry(
149    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
150    inspect_node: InspectNode,
151) -> (TelemetrySender, impl Future<Output = ()>) {
152    let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
153    let sender = TelemetrySender::new(sender);
154    let cloned_sender = sender.clone();
155    let fut = async move {
156        let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
157        const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
158        const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
159        const INTERVAL_TICKS_PER_MINUTE: u64 =
160            (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
161        const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
162        let mut interval_tick = 0u64;
163        let mut telemetry = Telemetry::new(cloned_sender, cobalt_proxy, inspect_node);
164        loop {
165            select! {
166                event = receiver.next() => {
167                    if let Some(event) = event {
168                        telemetry.handle_telemetry_event(event).await;
169                    }
170                }
171                _ = report_interval_stream.next() => {
172                    telemetry.handle_ten_secondly_telemetry();
173
174                    interval_tick += 1;
175                    if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
176                        telemetry.handle_hourly_telemetry().await;
177                    }
178                }
179            }
180        }
181    };
182    (sender, fut)
183}
184
185// Macro wrapper for logging simple events (occurrence, integer, histogram, string)
186// and log a warning when the status is not Ok.
187macro_rules! log_cobalt {
188    ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
189        let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
190        match status {
191            Ok(Ok(())) => (),
192            Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
193            Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
194        }
195    }};
196}
197
198macro_rules! log_cobalt_batch {
199    ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
200        let status = $cobalt_proxy.log_metric_events($events).await;
201        match status {
202            Ok(Ok(())) => (),
203            Ok(Err(e)) => {
204                warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
205            }
206            Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
207        }
208    }};
209}
210
211fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
212    const MILLIS_PER_SEC: i64 = 1000;
213    let millis = duration.into_millis();
214    let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
215    millis / MILLIS_PER_SEC + rounded_portion
216}
217
218struct Telemetry {
219    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
220    state_summary: Option<SystemStateSummary>,
221    state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
222    state_last_refreshed_for_inspect: fasync::MonotonicInstant,
223    reachability_lost_at: Option<(
224        fasync::MonotonicInstant,
225        metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
226    )>,
227    network_config: Option<NetworkConfig>,
228    network_config_last_refreshed: fasync::MonotonicInstant,
229
230    _inspect_node: InspectNode,
231    stats: Arc<Mutex<Stats>>,
232}
233
234impl Telemetry {
235    pub fn new(
236        _telemetry_sender: TelemetrySender,
237        cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
238        inspect_node: InspectNode,
239    ) -> Self {
240        let stats = Arc::new(Mutex::new(Stats::new()));
241        inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
242        Self {
243            cobalt_proxy,
244            state_summary: None,
245            state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
246            state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
247            reachability_lost_at: None,
248            network_config: None,
249            network_config_last_refreshed: fasync::MonotonicInstant::now(),
250            _inspect_node: inspect_node,
251            stats,
252        }
253    }
254
255    pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
256        let now = fasync::MonotonicInstant::now();
257        match event {
258            TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
259                let new_state = Some(match &self.state_summary {
260                    Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
261                    None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
262                });
263                if self.state_summary != new_state {
264                    // Only log if system state has changed to prevent spamming Cobalt,
265                    // since this telemetry event may happen in quick succession.
266                    self.log_system_state_metrics(
267                        new_state,
268                        now,
269                        "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
270                    )
271                    .await
272                }
273            }
274            TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
275                let new_config =
276                    Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
277                self.log_network_config_metrics(
278                    new_config,
279                    now,
280                    "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
281                )
282                .await;
283            }
284            TelemetryEvent::GatewayProbe {
285                internet_available,
286                gateway_discoverable,
287                gateway_pingable,
288            } => {
289                if internet_available {
290                    let metric_id = match (gateway_discoverable, gateway_pingable) {
291                        (true, true) => None,
292                        (true, false) => {
293                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
294                        }
295                        (false, true) => {
296                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
297                        }
298                        (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
299                    };
300                    if let Some(metric_id) = metric_id {
301                        log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
302                    }
303                }
304            }
305            TelemetryEvent::GetTimeSeries { sender } => {
306                let _result = sender.send(Arc::clone(&self.stats));
307            }
308        }
309    }
310
311    async fn log_system_state_metrics(
312        &mut self,
313        new_state: Option<SystemStateSummary>,
314        now: fasync::MonotonicInstant,
315        ctx: &'static str,
316    ) {
317        let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
318        let duration_sec_inspect =
319            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
320        let mut metric_events = vec![];
321
322        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
323
324        if let Some(prev) = &self.state_summary {
325            if prev.system_state.has_interface_up() {
326                metric_events.push(
327                    MetricEvent::builder(
328                        metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
329                    )
330                    .as_integer(duration_cobalt.into_micros()),
331                );
332
333                let route_config_dim = convert::convert_route_config(&prev.system_state);
334                let internet_available =
335                    convert::convert_yes_no_dim(prev.system_state.has_internet());
336                let gateway_reachable =
337                    convert::convert_yes_no_dim(prev.system_state.has_gateway());
338                let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
339                let http_status = if prev.system_state.has_http() {
340                    metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
341                } else {
342                    metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
343                };
344                // We only log metric when at least one of IPv4 or IPv6 is in the SystemState.
345                if let Some(route_config) = route_config_dim {
346                    metric_events.push(
347                        MetricEvent::builder(
348                            metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
349                        )
350                        .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
351                            route_config,
352                            internet_available,
353                            gateway_reachable,
354                            dns_active,
355                            http_status,
356                        })
357                        .as_integer(duration_cobalt.into_micros()),
358                    );
359                }
360
361                if prev.system_state.has_internet() {
362                    self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
363                }
364                if prev.system_state.has_dns() {
365                    self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
366                }
367                if prev.system_state.has_http() {
368                    self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
369                }
370            }
371        }
372
373        if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
374            let previously_reachable =
375                prev.system_state.has_internet() && prev.system_state.has_dns();
376            let now_reachable =
377                new_state.system_state.has_internet() && new_state.system_state.has_dns();
378            if previously_reachable && !now_reachable {
379                let route_config_dim = convert::convert_route_config(&prev.system_state);
380                if let Some(route_config_dim) = route_config_dim {
381                    metric_events.push(
382                        MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
383                            .with_event_code(route_config_dim.as_event_code())
384                            .as_occurrence(1),
385                    );
386                    self.reachability_lost_at = Some((now, route_config_dim));
387                }
388                self.stats.lock().reachability_lost_count.log_value(&1);
389            }
390
391            if !previously_reachable && now_reachable {
392                if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
393                    metric_events.push(
394                        MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
395                            .with_event_code(route_config_dim.as_event_code())
396                            .as_integer((now - reachability_lost_at).into_micros()),
397                    );
398                }
399                self.reachability_lost_at = None;
400            }
401        }
402
403        if let Some(new_state) = &new_state {
404            self.stats
405                .lock()
406                .ipv4_state
407                .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
408            self.stats
409                .lock()
410                .ipv6_state
411                .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
412        }
413
414        if !metric_events.is_empty() {
415            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
416        }
417
418        if let Some(new_state) = new_state {
419            self.state_summary = Some(new_state);
420        }
421        self.state_last_refreshed_for_cobalt = now;
422        self.state_last_refreshed_for_inspect = now;
423    }
424
425    async fn log_network_config_metrics(
426        &mut self,
427        new_config: Option<NetworkConfig>,
428        now: fasync::MonotonicInstant,
429        ctx: &'static str,
430    ) {
431        let duration = now - self.network_config_last_refreshed;
432        let mut metric_events = vec![];
433
434        if let Some(prev) = &self.network_config {
435            let default_route_dim = convert::convert_default_route(
436                prev.has_default_ipv4_route,
437                prev.has_default_ipv6_route,
438            );
439            // We only log metric when at least one of IPv4 or IPv6 has default route.
440            if let Some(default_route_dim) = default_route_dim {
441                metric_events.push(
442                    MetricEvent::builder(
443                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
444                    )
445                    .with_event_code(default_route_dim.as_event_code())
446                    .as_integer(duration.into_micros()),
447                );
448                metric_events.push(
449                    MetricEvent::builder(
450                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
451                    )
452                    .with_event_code(default_route_dim.as_event_code())
453                    .as_occurrence(1),
454                );
455            }
456        }
457
458        if !metric_events.is_empty() {
459            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
460        }
461
462        if let Some(new_config) = new_config {
463            self.network_config = Some(new_config);
464        }
465        self.network_config_last_refreshed = now;
466    }
467
468    pub fn handle_ten_secondly_telemetry(&mut self) {
469        let now = fasync::MonotonicInstant::now();
470        let duration_sec_inspect =
471            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
472
473        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
474
475        if let Some(current) = &self.state_summary {
476            if current.system_state.has_internet() {
477                self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
478            }
479            if current.system_state.has_dns() {
480                self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
481            }
482            if current.system_state.has_http() {
483                self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
484            }
485
486            self.stats
487                .lock()
488                .ipv4_state
489                .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
490            self.stats
491                .lock()
492                .ipv6_state
493                .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
494        }
495        self.state_last_refreshed_for_inspect = now;
496    }
497
498    pub async fn handle_hourly_telemetry(&mut self) {
499        let now = fasync::MonotonicInstant::now();
500        self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
501        self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use crate::{ApplicationState, LinkState};
508
509    use super::*;
510    use fidl::endpoints::create_proxy_and_stream;
511    use fidl_fuchsia_metrics::MetricEventPayload;
512    use fuchsia_inspect::Inspector;
513
514    use futures::task::Poll;
515    use std::pin::Pin;
516    use test_case::test_case;
517
518    const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
519
520    #[test]
521    fn test_log_state_change() {
522        let (mut test_helper, mut test_fut) = setup_test();
523
524        let mut update = SystemStateUpdate {
525            system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
526        };
527        test_helper
528            .telemetry_sender
529            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
530
531        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
532
533        update.system_state = IpVersions {
534            ipv4: Some(State {
535                link: LinkState::Internet,
536                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
537            }),
538            ipv6: None,
539        };
540        test_helper
541            .telemetry_sender
542            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
543
544        test_helper.advance_test_fut(&mut test_fut);
545
546        let logged_metrics = test_helper
547            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
548        assert_eq!(logged_metrics.len(), 1);
549        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
550
551        let logged_metrics = test_helper
552            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
553        assert_eq!(logged_metrics.len(), 1);
554        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
555        assert_eq!(
556            logged_metrics[0].event_codes,
557            &[
558                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
559                    as u32,
560                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
561                    as u32,
562                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
563                    as u32,
564                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
565                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
566                    as u32,
567            ]
568        );
569
570        test_helper.cobalt_events.clear();
571        test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
572
573        // At the 1 hour mark, the new state is logged via periodic telemetry.
574        // All metrics are the same as before, except for the elapsed duration and the
575        // `dns_active` flag is now true.
576        let logged_metrics = test_helper
577            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
578        assert_eq!(logged_metrics.len(), 1);
579        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
580
581        let logged_metrics = test_helper
582            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
583        assert_eq!(logged_metrics.len(), 1);
584        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
585        assert_eq!(
586            logged_metrics[0].event_codes,
587            &[
588                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
589                    as u32,
590                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
591                    as u32,
592                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
593                    as u32,
594                // This time dns_active is Yes, and http_status is HttpOnly
595                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
596                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
597                    as u32,
598            ]
599        );
600    }
601
602    #[test]
603    fn test_log_reachability_lost() {
604        let (mut test_helper, mut test_fut) = setup_test();
605
606        let mut update = SystemStateUpdate {
607            system_state: IpVersions {
608                ipv4: None,
609                ipv6: Some(State {
610                    link: LinkState::Internet,
611                    application: ApplicationState { dns_resolved: true, ..Default::default() },
612                }),
613            },
614            ..SystemStateUpdate::default()
615        };
616        test_helper
617            .telemetry_sender
618            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
619        test_helper.advance_test_fut(&mut test_fut);
620
621        update.system_state = IpVersions {
622            ipv4: None,
623            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
624        };
625        test_helper
626            .telemetry_sender
627            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
628        test_helper.advance_test_fut(&mut test_fut);
629
630        // Reachability lost metric is lost because previously both `internet_available` and
631        // `dns_active` were true, and now the latter is false.
632        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
633        assert_eq!(logged_metrics.len(), 1);
634        assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
635        assert_eq!(
636            logged_metrics[0].event_codes,
637            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
638        );
639
640        test_helper.cobalt_events.clear();
641
642        update.system_state = IpVersions {
643            ipv4: None,
644            ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
645        };
646        test_helper
647            .telemetry_sender
648            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
649        test_helper.advance_test_fut(&mut test_fut);
650
651        // Reachability lost metric is not logged again even when `internet_available`
652        // becomes false, because it was already considered lost previously.
653        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
654        assert_eq!(logged_metrics.len(), 0);
655
656        test_helper.cobalt_events.clear();
657
658        test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
659        update.system_state = IpVersions {
660            ipv4: Some(State {
661                link: LinkState::Internet,
662                application: ApplicationState { dns_resolved: true, ..Default::default() },
663            }),
664            ipv6: None,
665        };
666        test_helper
667            .telemetry_sender
668            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
669        test_helper.advance_test_fut(&mut test_fut);
670
671        // When reachability is recovered, the duration that reachability was lost is logged.
672        let logged_metrics =
673            test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
674        assert_eq!(logged_metrics.len(), 1);
675        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
676        assert_eq!(
677            logged_metrics[0].event_codes,
678            // Reachability is recovered on ipv4, but we still log the ipv6 dimension because
679            // that was the config when reachability was lost.
680            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
681        );
682    }
683
684    #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
685    #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
686    #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
687    #[test_case(false, false, None; "no default routes")]
688    #[fuchsia::test(add_test_attr = false)]
689    fn test_log_default_route(
690        has_default_ipv4_route: bool,
691        has_default_ipv6_route: bool,
692        expected_dim: Option<
693            metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
694        >,
695    ) {
696        let (mut test_helper, mut test_fut) = setup_test();
697
698        test_helper
699            .telemetry_sender
700            .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
701        test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
702
703        let logged_metrics = test_helper
704            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
705        match expected_dim {
706            Some(dim) => {
707                assert_eq!(logged_metrics.len(), 1);
708                assert_eq!(
709                    logged_metrics[0].payload,
710                    MetricEventPayload::IntegerValue(3600_000_000)
711                );
712                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
713            }
714            None => {
715                assert_eq!(logged_metrics.len(), 0);
716            }
717        }
718
719        let logged_metrics = test_helper
720            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
721        match expected_dim {
722            Some(dim) => {
723                assert_eq!(logged_metrics.len(), 1);
724                assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
725                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
726            }
727            None => {
728                assert_eq!(logged_metrics.len(), 0);
729            }
730        }
731    }
732
733    #[test_case(true, true, vec![]; "negative")]
734    #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
735    #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
736    #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
737    #[fuchsia::test(add_test_attr = false)]
738    fn test_log_abnormal_state_situation(
739        gateway_discoverable: bool,
740        gateway_pingable: bool,
741        expected_metrics: Vec<u32>,
742    ) {
743        let (mut test_helper, mut test_fut) = setup_test();
744
745        test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
746            internet_available: true,
747            gateway_discoverable,
748            gateway_pingable,
749        });
750        test_helper.advance_test_fut(&mut test_fut);
751
752        let logged_metrics: Vec<u32> = [
753            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
754            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
755            metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
756        ]
757        .into_iter()
758        .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
759        .collect();
760
761        assert_eq!(logged_metrics, expected_metrics);
762    }
763
764    #[test]
765    fn test_state_snapshot_duration_inspect_stats() {
766        let (mut test_helper, mut test_fut) = setup_test();
767
768        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
769
770        let time_series = test_helper.get_time_series(&mut test_fut);
771        let internet_available_sec: Vec<_> =
772            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
773        let dns_active_sec: Vec<_> =
774            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
775        let http_active_sec: Vec<_> =
776            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
777        let total_duration_sec: Vec<_> =
778            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
779        assert_eq!(internet_available_sec, vec![0]);
780        assert_eq!(dns_active_sec, vec![0]);
781        assert_eq!(http_active_sec, vec![0]);
782        assert_eq!(total_duration_sec, vec![20]);
783
784        let mut update = SystemStateUpdate {
785            system_state: IpVersions {
786                ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
787                ipv6: None,
788            },
789        };
790        test_helper
791            .telemetry_sender
792            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
793        test_helper.advance_test_fut(&mut test_fut);
794        let time_series = test_helper.get_time_series(&mut test_fut);
795        let total_duration_sec: Vec<_> =
796            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
797        assert_eq!(total_duration_sec, vec![25]);
798
799        test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
800
801        // Now 40 seconds mark
802
803        let time_series = test_helper.get_time_series(&mut test_fut);
804        let internet_available_sec: Vec<_> =
805            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
806        let dns_active_sec: Vec<_> =
807            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
808        let http_active_sec: Vec<_> =
809            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
810        let total_duration_sec: Vec<_> =
811            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
812        assert_eq!(internet_available_sec, vec![15]);
813        assert_eq!(dns_active_sec, vec![0]);
814        assert_eq!(http_active_sec, vec![0]);
815        assert_eq!(total_duration_sec, vec![40]);
816
817        update.system_state = IpVersions {
818            ipv4: Some(State {
819                link: LinkState::Internet,
820                application: ApplicationState { dns_resolved: true, ..Default::default() },
821            }),
822            ipv6: None,
823        };
824        test_helper
825            .telemetry_sender
826            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
827
828        test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
829
830        // Now 90 seconds mark
831
832        let time_series = test_helper.get_time_series(&mut test_fut);
833        let internet_available_sec: Vec<_> =
834            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
835        let dns_active_sec: Vec<_> =
836            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
837        let http_active_sec: Vec<_> =
838            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
839        let total_duration_sec: Vec<_> =
840            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
841        assert_eq!(internet_available_sec, vec![25, 40]);
842        assert_eq!(dns_active_sec, vec![10, 40]);
843        assert_eq!(http_active_sec, vec![0]);
844        assert_eq!(total_duration_sec, vec![40]);
845
846        update.system_state = IpVersions {
847            ipv4: Some(State {
848                link: LinkState::Internet,
849                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
850            }),
851            ipv6: None,
852        };
853        test_helper
854            .telemetry_sender
855            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
856
857        test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
858
859        // Now 120 seconds mark
860
861        let time_series = test_helper.get_time_series(&mut test_fut);
862        let internet_available_sec: Vec<_> =
863            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
864        let dns_active_sec: Vec<_> =
865            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
866        let http_active_sec: Vec<_> =
867            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
868        let total_duration_sec: Vec<_> =
869            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
870        assert_eq!(internet_available_sec, vec![25, 60, 40]);
871        assert_eq!(dns_active_sec, vec![10, 60, 40]);
872        assert_eq!(http_active_sec, vec![0, 20, 40]);
873        assert_eq!(total_duration_sec, vec![40]);
874    }
875
876    #[test]
877    fn test_reachability_lost_count_inspect_stats() {
878        let (mut test_helper, mut test_fut) = setup_test();
879
880        let mut update = SystemStateUpdate {
881            system_state: IpVersions {
882                ipv4: None,
883                ipv6: Some(State {
884                    link: LinkState::Internet,
885                    application: ApplicationState { dns_resolved: true, ..Default::default() },
886                }),
887            },
888            ..SystemStateUpdate::default()
889        };
890        test_helper
891            .telemetry_sender
892            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
893        test_helper.advance_test_fut(&mut test_fut);
894
895        let time_series = test_helper.get_time_series(&mut test_fut);
896        let reachability_lost_count: Vec<_> =
897            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
898        assert_eq!(reachability_lost_count, vec![0]);
899
900        update.system_state = IpVersions {
901            ipv4: None,
902            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
903        };
904        test_helper
905            .telemetry_sender
906            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
907        test_helper.advance_test_fut(&mut test_fut);
908
909        let time_series = test_helper.get_time_series(&mut test_fut);
910        let reachability_lost_count: Vec<_> =
911            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
912        assert_eq!(reachability_lost_count, vec![1]);
913    }
914
915    #[test]
916    fn test_avg_state_inspect_stats() {
917        let (mut test_helper, mut test_fut) = setup_test();
918
919        let mut update = SystemStateUpdate {
920            system_state: IpVersions {
921                ipv4: None,
922                ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
923            },
924            ..SystemStateUpdate::default()
925        };
926        test_helper
927            .telemetry_sender
928            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
929        test_helper.advance_test_fut(&mut test_fut);
930
931        let time_series = test_helper.get_time_series(&mut test_fut);
932        let ipv4_state: Vec<_> =
933            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
934        let ipv6_state: Vec<_> =
935            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
936        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
937        assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
938
939        update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
940        test_helper
941            .telemetry_sender
942            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
943        test_helper.advance_test_fut(&mut test_fut);
944
945        let time_series = test_helper.get_time_series(&mut test_fut);
946        let ipv4_state: Vec<_> =
947            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
948        let ipv6_state: Vec<_> =
949            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
950        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
951        assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
952    }
953
954    struct TestHelper {
955        telemetry_sender: TelemetrySender,
956        _inspector: Inspector,
957        cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
958        /// As requests to Cobalt are responded to via `self.drain_cobalt_events()`,
959        /// their payloads are drained to this HashMap.
960        cobalt_events: Vec<MetricEvent>,
961
962        // Note: keep the executor field last in the struct so it gets dropped last.
963        exec: fasync::TestExecutor,
964    }
965
966    impl TestHelper {
967        /// Advance executor until stalled, taking care of any blocking requests
968        fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
969            let mut made_progress = true;
970            while made_progress {
971                let _result = self.exec.run_until_stalled(test_fut);
972                made_progress = false;
973                while let Poll::Ready(Some(Ok(req))) =
974                    self.exec.run_until_stalled(&mut self.cobalt_stream.next())
975                {
976                    self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
977                    made_progress = true;
978                }
979            }
980
981            match self.exec.run_until_stalled(test_fut) {
982                Poll::Pending => (),
983                _ => panic!("expect test_fut to resolve to Poll::Pending"),
984            }
985        }
986
987        /// Advance executor by `duration`.
988        /// This function repeatedly advances the executor by 1 second, triggering
989        /// any expired timers and running the test_fut, until `duration` is reached.
990        fn advance_by<T>(
991            &mut self,
992            duration: zx::MonotonicDuration,
993            test_fut: &mut (impl Future<Output = T> + Unpin),
994        ) {
995            assert_eq!(
996                duration.into_nanos() % STEP_INCREMENT.into_nanos(),
997                0,
998                "duration {:?} is not divisible by STEP_INCREMENT",
999                duration,
1000            );
1001            const_assert_eq!(
1002                TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1003                0
1004            );
1005
1006            self.advance_test_fut(test_fut);
1007
1008            for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1009                self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1010                let _ = self.exec.wake_expired_timers();
1011                self.advance_test_fut(test_fut);
1012            }
1013        }
1014
1015        fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1016            self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1017        }
1018
1019        fn get_time_series<T>(
1020            &mut self,
1021            test_fut: &mut (impl Future<Output = T> + Unpin),
1022        ) -> Arc<Mutex<Stats>> {
1023            let (sender, mut receiver) = oneshot::channel();
1024            self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1025            self.advance_test_fut(test_fut);
1026            match receiver.try_recv() {
1027                Ok(Some(stats)) => stats,
1028                _ => panic!("Expect Stats to be returned"),
1029            }
1030        }
1031    }
1032
1033    trait CobaltExt {
1034        // Respond to MetricEventLoggerRequest and extract its MetricEvent.
1035        fn respond_to_metric_req(
1036            self,
1037            result: Result<(), fidl_fuchsia_metrics::Error>,
1038        ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1039    }
1040
1041    impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1042        fn respond_to_metric_req(
1043            self,
1044            result: Result<(), fidl_fuchsia_metrics::Error>,
1045        ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1046            match self {
1047                Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1048                    assert!(responder.send(result).is_ok());
1049                    vec![MetricEvent {
1050                        metric_id,
1051                        event_codes,
1052                        payload: MetricEventPayload::Count(count),
1053                    }]
1054                }
1055                Self::LogInteger { metric_id, value, event_codes, responder } => {
1056                    assert!(responder.send(result).is_ok());
1057                    vec![MetricEvent {
1058                        metric_id,
1059                        event_codes,
1060                        payload: MetricEventPayload::IntegerValue(value),
1061                    }]
1062                }
1063                Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1064                    assert!(responder.send(result).is_ok());
1065                    vec![MetricEvent {
1066                        metric_id,
1067                        event_codes,
1068                        payload: MetricEventPayload::Histogram(histogram),
1069                    }]
1070                }
1071                Self::LogString { metric_id, string_value, event_codes, responder } => {
1072                    assert!(responder.send(result).is_ok());
1073                    vec![MetricEvent {
1074                        metric_id,
1075                        event_codes,
1076                        payload: MetricEventPayload::StringValue(string_value),
1077                    }]
1078                }
1079                Self::LogMetricEvents { events, responder } => {
1080                    assert!(responder.send(result).is_ok());
1081                    events
1082                }
1083            }
1084        }
1085    }
1086
1087    fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = ()>>>) {
1088        let mut exec = fasync::TestExecutor::new_with_fake_time();
1089        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1090
1091        let (cobalt_proxy, cobalt_stream) =
1092            create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1093
1094        let inspector = Inspector::default();
1095        let inspect_node = inspector.root().create_child("telemetry");
1096
1097        let (telemetry_sender, test_fut) = serve_telemetry(cobalt_proxy, inspect_node);
1098        let mut test_fut = Box::pin(test_fut);
1099
1100        assert_eq!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1101
1102        let test_helper = TestHelper {
1103            telemetry_sender,
1104            _inspector: inspector,
1105            cobalt_stream,
1106            cobalt_events: vec![],
1107            exec,
1108        };
1109        (test_helper, test_fut)
1110    }
1111}