Skip to main content

reachability_core/telemetry/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod convert;
6mod inspect;
7pub mod processors;
8
9use self::inspect::{Stats, inspect_record_stats};
10use crate::fetch::FetchError;
11use crate::ping::PingError;
12use crate::telemetry::processors::link_properties_state::{
13    LinkProperties, LinkPropertiesStateLogger,
14};
15use crate::telemetry::processors::{
16    InterfaceIdentifier, InterfaceTimeSeriesGrouping, InterfaceType,
17};
18use crate::{FetchParameters, IpVersions, LinkState, PingParameters, State};
19use processors::interface_aware_logger::InterfaceAwareLogger;
20
21use anyhow::{Context, Error, format_err};
22use cobalt_client::traits::AsEventCode;
23use fidl_fuchsia_metrics::MetricEvent;
24use fuchsia_async as fasync;
25use fuchsia_cobalt_builders::MetricEventExt;
26use fuchsia_inspect::Node as InspectNode;
27use fuchsia_sync::Mutex;
28use futures::channel::{mpsc, oneshot};
29use futures::{Future, StreamExt, select};
30use log::{info, warn};
31use network_policy_metrics_registry as metrics;
32use static_assertions::const_assert_eq;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use windowed_stats::aggregations::SumAndCount;
36use windowed_stats::experimental::inspect::TimeMatrixClient;
37
38#[cfg(test)]
39mod testing;
40
41pub async fn create_metrics_logger(
42    factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
43) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
44    let (cobalt_proxy, cobalt_server) =
45        fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
46
47    let project_spec = fidl_fuchsia_metrics::ProjectSpec {
48        customer_id: None, // defaults to fuchsia.
49        project_id: Some(metrics::PROJECT_ID),
50        ..Default::default()
51    };
52
53    let status = factory_proxy
54        .create_metric_event_logger(&project_spec, cobalt_server)
55        .await
56        .context("create metrics event logger")?;
57
58    match status {
59        Ok(_) => Ok(cobalt_proxy),
60        Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
61    }
62}
63
64#[derive(Clone, Debug)]
65pub struct TelemetrySender {
66    sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
67    sender_is_blocked: Arc<AtomicBool>,
68}
69
70impl TelemetrySender {
71    pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
72        Self {
73            sender: Arc::new(Mutex::new(sender)),
74            sender_is_blocked: Arc::new(AtomicBool::new(false)),
75        }
76    }
77
78    // Send telemetry event. Log an error if it fails.
79    pub fn send(&self, event: TelemetryEvent) {
80        match self.sender.lock().try_send(event) {
81            Ok(_) => {
82                // If sender has been blocked before, set bool to false and log message.
83                if self
84                    .sender_is_blocked
85                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
86                    .is_ok()
87                {
88                    info!("TelemetrySender recovered and resumed sending");
89                }
90            }
91            Err(_) => {
92                // If sender has not been blocked before, set bool to true and log error message.
93                if self
94                    .sender_is_blocked
95                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
96                    .is_ok()
97                {
98                    warn!(
99                        "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
100                    );
101                }
102            }
103        }
104    }
105}
106
107#[derive(Debug, Clone, Default, PartialEq)]
108struct SystemStateSummary {
109    system_state: IpVersions<Option<State>>,
110}
111
112impl SystemStateSummary {
113    fn ipv4_link_state_val(&self) -> u32 {
114        match self.system_state.ipv4 {
115            Some(s) => s.link as u32,
116            None => 0u32,
117        }
118    }
119
120    fn ipv6_link_state_val(&self) -> u32 {
121        match self.system_state.ipv6 {
122            Some(s) => s.link as u32,
123            None => 0u32,
124        }
125    }
126}
127
128#[derive(Debug, Clone, Default, PartialEq)]
129struct NetworkConfig {
130    has_default_ipv4_route: bool,
131    has_default_ipv6_route: bool,
132}
133
134#[derive(Debug, Clone, Default)]
135pub struct SystemStateUpdate {
136    pub(crate) system_state: IpVersions<Option<State>>,
137}
138
139#[derive(Debug)]
140pub enum TelemetryEvent {
141    SystemStateUpdate {
142        update: SystemStateUpdate,
143    },
144    NetworkConfig {
145        has_default_ipv4_route: bool,
146        has_default_ipv6_route: bool,
147    },
148    GatewayProbe {
149        internet_available: bool,
150        gateway_discoverable: bool,
151        gateway_pingable: bool,
152    },
153    // The LinkProperties update corresponding to the interface described by
154    // the vector of interface identifiers.
155    LinkPropertiesUpdate {
156        interface_identifiers: Vec<InterfaceIdentifier>,
157        link_properties: IpVersions<LinkProperties>,
158    },
159    // The LinkState update corresponding to the interface described by the
160    // vector of interface identifiers.
161    LinkStateUpdate {
162        interface_identifiers: Vec<InterfaceIdentifier>,
163        link_state: IpVersions<LinkState>,
164    },
165    GatewayPingResult {
166        interface_identifiers: Vec<InterfaceIdentifier>,
167        ping_parameters: PingParameters,
168        gateway_ping_result: Result<(), PingError>,
169    },
170    InternetPingResult {
171        interface_identifiers: Vec<InterfaceIdentifier>,
172        ping_parameters: PingParameters,
173        internet_ping_result: Result<(), PingError>,
174    },
175    FetchResult {
176        interface_identifiers: Vec<InterfaceIdentifier>,
177        fetch_parameters: FetchParameters,
178        fetch_result: Result<u16, FetchError>,
179    },
180    /// Get the TimeSeries held by telemetry loop. Intended for test only.
181    GetTimeSeries {
182        sender: oneshot::Sender<Arc<Mutex<Stats>>>,
183    },
184}
185
186/// Capacity of "first come, first serve" slots available to clients of
187/// the mpsc::Sender<TelemetryEvent>. This threshold is arbitrary.
188const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
189
190const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
191const METADATA_NODE_NAME: &str = "metadata";
192
193pub fn serve_telemetry(
194    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
195    inspect_node: InspectNode,
196) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
197    // Inspect nodes to hold Inspect events
198    let inspect_events_node = inspect_node.create_child("events");
199
200    // Inspect nodes to hold time series and metadata for other nodes.
201    let inspect_time_series_node = inspect_node.create_child("time_series");
202    let link_properties_state_time_series_node =
203        inspect_time_series_node.create_child("link_properties_state");
204    let time_matrix_client =
205        TimeMatrixClient::new(link_properties_state_time_series_node.clone_weak());
206
207    inspect_node.record(link_properties_state_time_series_node);
208    let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
209    let link_properties_state_logger = LinkPropertiesStateLogger::new(
210        &inspect_metadata_node,
211        &format!("root/telemetry/{METADATA_NODE_NAME}"),
212        InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
213        &time_matrix_client,
214    );
215
216    let interface_aware_events_node = inspect_events_node.create_child("interfaces");
217    let interface_aware_time_series_node = inspect_time_series_node.create_child("interfaces");
218    let interface_aware_logger = InterfaceAwareLogger::new(
219        &inspect_metadata_node,
220        &format!("root/telemetry/{METADATA_NODE_NAME}"),
221        InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
222        interface_aware_events_node,
223        interface_aware_time_series_node,
224    );
225
226    // Record the time series and metadata nodes so they do not get dropped.
227    inspect_node.record(inspect_events_node);
228    inspect_node.record(inspect_time_series_node);
229    inspect_node.record(inspect_metadata_node);
230
231    let (sender, fut) = serve_telemetry_inner(
232        cobalt_proxy,
233        inspect_node,
234        link_properties_state_logger,
235        interface_aware_logger,
236    );
237    (sender, fut)
238}
239
240fn serve_telemetry_inner(
241    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
242    inspect_node: InspectNode,
243    link_properties_state_logger: LinkPropertiesStateLogger,
244    interface_aware_logger: InterfaceAwareLogger,
245) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
246    let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
247    let sender = TelemetrySender::new(sender);
248    let cloned_sender = sender.clone();
249
250    let fut = async move {
251        let link_properties_state_logger = link_properties_state_logger;
252
253        let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
254        const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
255        const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
256        const INTERVAL_TICKS_PER_MINUTE: u64 =
257            (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
258        const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
259        let mut interval_tick = 0u64;
260        let mut telemetry = Telemetry::new(
261            cloned_sender,
262            cobalt_proxy,
263            link_properties_state_logger,
264            interface_aware_logger,
265            inspect_node,
266        );
267        loop {
268            select! {
269                event = receiver.next() => {
270                    if let Some(event) = event {
271                        telemetry.handle_telemetry_event(event).await;
272                    }
273                }
274                _ = report_interval_stream.next() => {
275                    telemetry.handle_ten_secondly_telemetry();
276
277                    interval_tick += 1;
278                    if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
279                        telemetry.handle_hourly_telemetry().await;
280                    }
281                }
282            }
283        }
284    };
285    (sender, fut)
286}
287
288// Macro wrapper for logging simple events (occurrence, integer, histogram, string)
289// and log a warning when the status is not Ok.
290macro_rules! log_cobalt {
291    ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
292        let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
293        match status {
294            Ok(Ok(())) => (),
295            Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
296            Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
297        }
298    }};
299}
300
301macro_rules! log_cobalt_batch {
302    ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
303        let status = $cobalt_proxy.log_metric_events($events).await;
304        match status {
305            Ok(Ok(())) => (),
306            Ok(Err(e)) => {
307                warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
308            }
309            Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
310        }
311    }};
312}
313
314fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
315    const MILLIS_PER_SEC: i64 = 1000;
316    let millis = duration.into_millis();
317    let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
318    millis / MILLIS_PER_SEC + rounded_portion
319}
320
321struct Telemetry {
322    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
323    state_summary: Option<SystemStateSummary>,
324    state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
325    state_last_refreshed_for_inspect: fasync::MonotonicInstant,
326    reachability_lost_at: Option<(
327        fasync::MonotonicInstant,
328        metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
329    )>,
330    network_config: Option<NetworkConfig>,
331    network_config_last_refreshed: fasync::MonotonicInstant,
332    link_properties_state_logger: LinkPropertiesStateLogger,
333    interface_aware_logger: InterfaceAwareLogger,
334
335    _inspect_node: InspectNode,
336    stats: Arc<Mutex<Stats>>,
337}
338
339impl Telemetry {
340    pub fn new(
341        _telemetry_sender: TelemetrySender,
342        cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
343        link_properties_state_logger: LinkPropertiesStateLogger,
344        interface_aware_logger: InterfaceAwareLogger,
345        inspect_node: InspectNode,
346    ) -> Self {
347        let stats = Arc::new(Mutex::new(Stats::new()));
348        inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
349        Self {
350            cobalt_proxy,
351            state_summary: None,
352            state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
353            state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
354            reachability_lost_at: None,
355            network_config: None,
356            network_config_last_refreshed: fasync::MonotonicInstant::now(),
357            link_properties_state_logger,
358            interface_aware_logger,
359            _inspect_node: inspect_node,
360            stats,
361        }
362    }
363
364    pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
365        let now = fasync::MonotonicInstant::now();
366        match event {
367            TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
368                let new_state = Some(match &self.state_summary {
369                    Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
370                    None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
371                });
372                if self.state_summary != new_state {
373                    // Only log if system state has changed to prevent spamming Cobalt,
374                    // since this telemetry event may happen in quick succession.
375                    self.log_system_state_metrics(
376                        new_state,
377                        now,
378                        "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
379                    )
380                    .await
381                }
382            }
383            TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
384                let new_config =
385                    Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
386                self.log_network_config_metrics(
387                    new_config,
388                    now,
389                    "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
390                )
391                .await;
392            }
393            TelemetryEvent::GatewayProbe {
394                internet_available,
395                gateway_discoverable,
396                gateway_pingable,
397            } => {
398                if internet_available {
399                    let metric_id = match (gateway_discoverable, gateway_pingable) {
400                        (true, true) => None,
401                        (true, false) => {
402                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
403                        }
404                        (false, true) => {
405                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
406                        }
407                        (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
408                    };
409                    if let Some(metric_id) = metric_id {
410                        log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
411                    }
412                }
413            }
414            TelemetryEvent::LinkPropertiesUpdate { interface_identifiers, link_properties } => {
415                self.link_properties_state_logger
416                    .update_link_properties(interface_identifiers, &link_properties);
417            }
418            TelemetryEvent::LinkStateUpdate { interface_identifiers, link_state } => {
419                self.link_properties_state_logger
420                    .update_link_state(interface_identifiers, &link_state);
421            }
422            TelemetryEvent::GatewayPingResult {
423                interface_identifiers,
424                ping_parameters,
425                gateway_ping_result,
426            } => {
427                self.interface_aware_logger.log_gateway_ping_result(
428                    interface_identifiers,
429                    &ping_parameters,
430                    &gateway_ping_result,
431                );
432            }
433            TelemetryEvent::InternetPingResult {
434                interface_identifiers,
435                ping_parameters,
436                internet_ping_result,
437            } => {
438                self.interface_aware_logger.log_internet_ping_result(
439                    interface_identifiers,
440                    &ping_parameters,
441                    &internet_ping_result,
442                );
443            }
444            TelemetryEvent::FetchResult {
445                interface_identifiers,
446                fetch_parameters,
447                fetch_result,
448            } => {
449                self.interface_aware_logger.log_fetch_result(
450                    interface_identifiers,
451                    &fetch_parameters,
452                    &fetch_result,
453                );
454            }
455            TelemetryEvent::GetTimeSeries { sender } => {
456                let _result = sender.send(Arc::clone(&self.stats));
457            }
458        }
459    }
460
461    async fn log_system_state_metrics(
462        &mut self,
463        new_state: Option<SystemStateSummary>,
464        now: fasync::MonotonicInstant,
465        ctx: &'static str,
466    ) {
467        let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
468        let duration_sec_inspect =
469            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
470        let mut metric_events = vec![];
471
472        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
473
474        if let Some(prev) = &self.state_summary {
475            if prev.system_state.has_interface_up() {
476                metric_events.push(
477                    MetricEvent::builder(
478                        metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
479                    )
480                    .as_integer(duration_cobalt.into_micros()),
481                );
482
483                let route_config_dim = convert::convert_route_config(&prev.system_state);
484                let internet_available =
485                    convert::convert_yes_no_dim(prev.system_state.has_internet());
486                let gateway_reachable =
487                    convert::convert_yes_no_dim(prev.system_state.has_gateway());
488                let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
489                let http_status = if prev.system_state.has_http() {
490                    metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
491                } else {
492                    metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
493                };
494                // We only log metric when at least one of IPv4 or IPv6 is in the SystemState.
495                if let Some(route_config) = route_config_dim {
496                    metric_events.push(
497                        MetricEvent::builder(
498                            metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
499                        )
500                        .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
501                            route_config,
502                            internet_available,
503                            gateway_reachable,
504                            dns_active,
505                            http_status,
506                        })
507                        .as_integer(duration_cobalt.into_micros()),
508                    );
509                }
510
511                if prev.system_state.has_internet() {
512                    self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
513                }
514                if prev.system_state.has_dns() {
515                    self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
516                }
517                if prev.system_state.has_http() {
518                    self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
519                }
520            }
521        }
522
523        if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
524            let previously_reachable =
525                prev.system_state.has_internet() && prev.system_state.has_dns();
526            let now_reachable =
527                new_state.system_state.has_internet() && new_state.system_state.has_dns();
528            if previously_reachable && !now_reachable {
529                let route_config_dim = convert::convert_route_config(&prev.system_state);
530                if let Some(route_config_dim) = route_config_dim {
531                    metric_events.push(
532                        MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
533                            .with_event_code(route_config_dim.as_event_code())
534                            .as_occurrence(1),
535                    );
536                    self.reachability_lost_at = Some((now, route_config_dim));
537                }
538                self.stats.lock().reachability_lost_count.log_value(&1);
539            }
540
541            if !previously_reachable && now_reachable {
542                if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
543                    metric_events.push(
544                        MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
545                            .with_event_code(route_config_dim.as_event_code())
546                            .as_integer((now - reachability_lost_at).into_micros()),
547                    );
548                }
549                self.reachability_lost_at = None;
550            }
551        }
552
553        if let Some(new_state) = &new_state {
554            self.stats
555                .lock()
556                .ipv4_state
557                .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
558            self.stats
559                .lock()
560                .ipv6_state
561                .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
562        }
563
564        if !metric_events.is_empty() {
565            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
566        }
567
568        if let Some(new_state) = new_state {
569            self.state_summary = Some(new_state);
570        }
571        self.state_last_refreshed_for_cobalt = now;
572        self.state_last_refreshed_for_inspect = now;
573    }
574
575    async fn log_network_config_metrics(
576        &mut self,
577        new_config: Option<NetworkConfig>,
578        now: fasync::MonotonicInstant,
579        ctx: &'static str,
580    ) {
581        let duration = now - self.network_config_last_refreshed;
582        let mut metric_events = vec![];
583
584        if let Some(prev) = &self.network_config {
585            let default_route_dim = convert::convert_default_route(
586                prev.has_default_ipv4_route,
587                prev.has_default_ipv6_route,
588            );
589            // We only log metric when at least one of IPv4 or IPv6 has default route.
590            if let Some(default_route_dim) = default_route_dim {
591                metric_events.push(
592                    MetricEvent::builder(
593                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
594                    )
595                    .with_event_code(default_route_dim.as_event_code())
596                    .as_integer(duration.into_micros()),
597                );
598                metric_events.push(
599                    MetricEvent::builder(
600                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
601                    )
602                    .with_event_code(default_route_dim.as_event_code())
603                    .as_occurrence(1),
604                );
605            }
606        }
607
608        if !metric_events.is_empty() {
609            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
610        }
611
612        if let Some(new_config) = new_config {
613            self.network_config = Some(new_config);
614        }
615        self.network_config_last_refreshed = now;
616    }
617
618    pub fn handle_ten_secondly_telemetry(&mut self) {
619        let now = fasync::MonotonicInstant::now();
620        let duration_sec_inspect =
621            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
622
623        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
624
625        if let Some(current) = &self.state_summary {
626            if current.system_state.has_internet() {
627                self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
628            }
629            if current.system_state.has_dns() {
630                self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
631            }
632            if current.system_state.has_http() {
633                self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
634            }
635
636            self.stats
637                .lock()
638                .ipv4_state
639                .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
640            self.stats
641                .lock()
642                .ipv6_state
643                .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
644        }
645        self.state_last_refreshed_for_inspect = now;
646    }
647
648    pub async fn handle_hourly_telemetry(&mut self) {
649        let now = fasync::MonotonicInstant::now();
650        self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
651        self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use crate::{ApplicationState, LinkState};
658
659    use super::*;
660    use fidl::endpoints::create_proxy_and_stream;
661    use fidl_fuchsia_metrics::MetricEventPayload;
662    use fuchsia_inspect::Inspector;
663    use fuchsia_inspect_contrib::id_enum::IdEnum;
664
665    use futures::task::Poll;
666    use std::pin::Pin;
667    use test_case::test_case;
668    use windowed_stats::experimental::clock::Timed;
669    use windowed_stats::experimental::testing::{MockTimeMatrixClient, TimeMatrixCall};
670
671    const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
672
673    #[test]
674    fn test_log_state_change() {
675        let (mut test_helper, mut test_fut) = setup_test();
676
677        let mut update = SystemStateUpdate {
678            system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
679        };
680        test_helper
681            .telemetry_sender
682            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
683
684        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
685
686        update.system_state = IpVersions {
687            ipv4: Some(State {
688                link: LinkState::Internet,
689                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
690            }),
691            ipv6: None,
692        };
693        test_helper
694            .telemetry_sender
695            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
696
697        test_helper.advance_test_fut(&mut test_fut);
698
699        let logged_metrics = test_helper
700            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
701        assert_eq!(logged_metrics.len(), 1);
702        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
703
704        let logged_metrics = test_helper
705            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
706        assert_eq!(logged_metrics.len(), 1);
707        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
708        assert_eq!(
709            logged_metrics[0].event_codes,
710            &[
711                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
712                    as u32,
713                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
714                    as u32,
715                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
716                    as u32,
717                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
718                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
719                    as u32,
720            ]
721        );
722
723        test_helper.cobalt_events.clear();
724        test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
725
726        // At the 1 hour mark, the new state is logged via periodic telemetry.
727        // All metrics are the same as before, except for the elapsed duration and the
728        // `dns_active` flag is now true.
729        let logged_metrics = test_helper
730            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
731        assert_eq!(logged_metrics.len(), 1);
732        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
733
734        let logged_metrics = test_helper
735            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
736        assert_eq!(logged_metrics.len(), 1);
737        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
738        assert_eq!(
739            logged_metrics[0].event_codes,
740            &[
741                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
742                    as u32,
743                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
744                    as u32,
745                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
746                    as u32,
747                // This time dns_active is Yes, and http_status is HttpOnly
748                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
749                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
750                    as u32,
751            ]
752        );
753    }
754
755    #[test]
756    fn test_log_reachability_lost() {
757        let (mut test_helper, mut test_fut) = setup_test();
758
759        let mut update = SystemStateUpdate {
760            system_state: IpVersions {
761                ipv4: None,
762                ipv6: Some(State {
763                    link: LinkState::Internet,
764                    application: ApplicationState { dns_resolved: true, ..Default::default() },
765                }),
766            },
767            ..SystemStateUpdate::default()
768        };
769        test_helper
770            .telemetry_sender
771            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
772        test_helper.advance_test_fut(&mut test_fut);
773
774        update.system_state = IpVersions {
775            ipv4: None,
776            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
777        };
778        test_helper
779            .telemetry_sender
780            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
781        test_helper.advance_test_fut(&mut test_fut);
782
783        // Reachability lost metric is lost because previously both `internet_available` and
784        // `dns_active` were true, and now the latter is false.
785        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
786        assert_eq!(logged_metrics.len(), 1);
787        assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
788        assert_eq!(
789            logged_metrics[0].event_codes,
790            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
791        );
792
793        test_helper.cobalt_events.clear();
794
795        update.system_state = IpVersions {
796            ipv4: None,
797            ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
798        };
799        test_helper
800            .telemetry_sender
801            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
802        test_helper.advance_test_fut(&mut test_fut);
803
804        // Reachability lost metric is not logged again even when `internet_available`
805        // becomes false, because it was already considered lost previously.
806        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
807        assert_eq!(logged_metrics.len(), 0);
808
809        test_helper.cobalt_events.clear();
810
811        test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
812        update.system_state = IpVersions {
813            ipv4: Some(State {
814                link: LinkState::Internet,
815                application: ApplicationState { dns_resolved: true, ..Default::default() },
816            }),
817            ipv6: None,
818        };
819        test_helper
820            .telemetry_sender
821            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
822        test_helper.advance_test_fut(&mut test_fut);
823
824        // When reachability is recovered, the duration that reachability was lost is logged.
825        let logged_metrics =
826            test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
827        assert_eq!(logged_metrics.len(), 1);
828        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
829        assert_eq!(
830            logged_metrics[0].event_codes,
831            // Reachability is recovered on ipv4, but we still log the ipv6 dimension because
832            // that was the config when reachability was lost.
833            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
834        );
835    }
836
837    #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
838    #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
839    #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
840    #[test_case(false, false, None; "no default routes")]
841    #[fuchsia::test(add_test_attr = false)]
842    fn test_log_default_route(
843        has_default_ipv4_route: bool,
844        has_default_ipv6_route: bool,
845        expected_dim: Option<
846            metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
847        >,
848    ) {
849        let (mut test_helper, mut test_fut) = setup_test();
850
851        test_helper
852            .telemetry_sender
853            .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
854        test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
855
856        let logged_metrics = test_helper
857            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
858        match expected_dim {
859            Some(dim) => {
860                assert_eq!(logged_metrics.len(), 1);
861                assert_eq!(
862                    logged_metrics[0].payload,
863                    MetricEventPayload::IntegerValue(3600_000_000)
864                );
865                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
866            }
867            None => {
868                assert_eq!(logged_metrics.len(), 0);
869            }
870        }
871
872        let logged_metrics = test_helper
873            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
874        match expected_dim {
875            Some(dim) => {
876                assert_eq!(logged_metrics.len(), 1);
877                assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
878                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
879            }
880            None => {
881                assert_eq!(logged_metrics.len(), 0);
882            }
883        }
884    }
885
886    #[test_case(true, true, vec![]; "negative")]
887    #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
888    #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
889    #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
890    #[fuchsia::test(add_test_attr = false)]
891    fn test_log_abnormal_state_situation(
892        gateway_discoverable: bool,
893        gateway_pingable: bool,
894        expected_metrics: Vec<u32>,
895    ) {
896        let (mut test_helper, mut test_fut) = setup_test();
897
898        test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
899            internet_available: true,
900            gateway_discoverable,
901            gateway_pingable,
902        });
903        test_helper.advance_test_fut(&mut test_fut);
904
905        let logged_metrics: Vec<u32> = [
906            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
907            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
908            metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
909        ]
910        .into_iter()
911        .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
912        .collect();
913
914        assert_eq!(logged_metrics, expected_metrics);
915    }
916
917    #[test]
918    fn test_state_snapshot_duration_inspect_stats() {
919        let (mut test_helper, mut test_fut) = setup_test();
920
921        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
922
923        let time_series = test_helper.get_time_series(&mut test_fut);
924        let internet_available_sec: Vec<_> =
925            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
926        let dns_active_sec: Vec<_> =
927            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
928        let http_active_sec: Vec<_> =
929            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
930        let total_duration_sec: Vec<_> =
931            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
932        assert_eq!(internet_available_sec, vec![0]);
933        assert_eq!(dns_active_sec, vec![0]);
934        assert_eq!(http_active_sec, vec![0]);
935        assert_eq!(total_duration_sec, vec![20]);
936
937        let mut update = SystemStateUpdate {
938            system_state: IpVersions {
939                ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
940                ipv6: None,
941            },
942        };
943        test_helper
944            .telemetry_sender
945            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
946        test_helper.advance_test_fut(&mut test_fut);
947        let time_series = test_helper.get_time_series(&mut test_fut);
948        let total_duration_sec: Vec<_> =
949            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
950        assert_eq!(total_duration_sec, vec![25]);
951
952        test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
953
954        // Now 40 seconds mark
955
956        let time_series = test_helper.get_time_series(&mut test_fut);
957        let internet_available_sec: Vec<_> =
958            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
959        let dns_active_sec: Vec<_> =
960            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
961        let http_active_sec: Vec<_> =
962            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
963        let total_duration_sec: Vec<_> =
964            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
965        assert_eq!(internet_available_sec, vec![15]);
966        assert_eq!(dns_active_sec, vec![0]);
967        assert_eq!(http_active_sec, vec![0]);
968        assert_eq!(total_duration_sec, vec![40]);
969
970        update.system_state = IpVersions {
971            ipv4: Some(State {
972                link: LinkState::Internet,
973                application: ApplicationState { dns_resolved: true, ..Default::default() },
974            }),
975            ipv6: None,
976        };
977        test_helper
978            .telemetry_sender
979            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
980
981        test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
982
983        // Now 90 seconds mark
984
985        let time_series = test_helper.get_time_series(&mut test_fut);
986        let internet_available_sec: Vec<_> =
987            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
988        let dns_active_sec: Vec<_> =
989            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
990        let http_active_sec: Vec<_> =
991            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
992        let total_duration_sec: Vec<_> =
993            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
994        assert_eq!(internet_available_sec, vec![25, 40]);
995        assert_eq!(dns_active_sec, vec![10, 40]);
996        assert_eq!(http_active_sec, vec![0]);
997        assert_eq!(total_duration_sec, vec![40]);
998
999        update.system_state = IpVersions {
1000            ipv4: Some(State {
1001                link: LinkState::Internet,
1002                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
1003            }),
1004            ipv6: None,
1005        };
1006        test_helper
1007            .telemetry_sender
1008            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1009
1010        test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
1011
1012        // Now 120 seconds mark
1013
1014        let time_series = test_helper.get_time_series(&mut test_fut);
1015        let internet_available_sec: Vec<_> =
1016            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
1017        let dns_active_sec: Vec<_> =
1018            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
1019        let http_active_sec: Vec<_> =
1020            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
1021        let total_duration_sec: Vec<_> =
1022            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
1023        assert_eq!(internet_available_sec, vec![25, 60, 40]);
1024        assert_eq!(dns_active_sec, vec![10, 60, 40]);
1025        assert_eq!(http_active_sec, vec![0, 20, 40]);
1026        assert_eq!(total_duration_sec, vec![40]);
1027    }
1028
1029    #[test]
1030    fn test_reachability_lost_count_inspect_stats() {
1031        let (mut test_helper, mut test_fut) = setup_test();
1032
1033        let mut update = SystemStateUpdate {
1034            system_state: IpVersions {
1035                ipv4: None,
1036                ipv6: Some(State {
1037                    link: LinkState::Internet,
1038                    application: ApplicationState { dns_resolved: true, ..Default::default() },
1039                }),
1040            },
1041            ..SystemStateUpdate::default()
1042        };
1043        test_helper
1044            .telemetry_sender
1045            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1046        test_helper.advance_test_fut(&mut test_fut);
1047
1048        let time_series = test_helper.get_time_series(&mut test_fut);
1049        let reachability_lost_count: Vec<_> =
1050            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
1051        assert_eq!(reachability_lost_count, vec![0]);
1052
1053        update.system_state = IpVersions {
1054            ipv4: None,
1055            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
1056        };
1057        test_helper
1058            .telemetry_sender
1059            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1060        test_helper.advance_test_fut(&mut test_fut);
1061
1062        let time_series = test_helper.get_time_series(&mut test_fut);
1063        let reachability_lost_count: Vec<_> =
1064            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
1065        assert_eq!(reachability_lost_count, vec![1]);
1066    }
1067
1068    #[test]
1069    fn test_avg_state_inspect_stats() {
1070        let (mut test_helper, mut test_fut) = setup_test();
1071
1072        let mut update = SystemStateUpdate {
1073            system_state: IpVersions {
1074                ipv4: None,
1075                ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
1076            },
1077            ..SystemStateUpdate::default()
1078        };
1079        test_helper
1080            .telemetry_sender
1081            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1082        test_helper.advance_test_fut(&mut test_fut);
1083
1084        let time_series = test_helper.get_time_series(&mut test_fut);
1085        let ipv4_state: Vec<_> =
1086            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1087        let ipv6_state: Vec<_> =
1088            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1089        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
1090        assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
1091
1092        update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
1093        test_helper
1094            .telemetry_sender
1095            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1096        test_helper.advance_test_fut(&mut test_fut);
1097
1098        let time_series = test_helper.get_time_series(&mut test_fut);
1099        let ipv4_state: Vec<_> =
1100            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1101        let ipv6_state: Vec<_> =
1102            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1103        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
1104        assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
1105    }
1106
1107    #[test]
1108    fn test_link_properties_update() {
1109        let (mut test_helper, mut test_fut) = setup_test();
1110
1111        // Iterate through all of the different combinations of booleans in the
1112        // LinkProperties struct. There are 4 different booleans, so 2^4
1113        // different combinations.
1114        for i in 0..=15 {
1115            let link_properties = LinkProperties::from(i);
1116            let mut expected_data_vec = vec![];
1117            // Index 0 indicates that all the booleans are false which is the
1118            // same as the default value of LinkProperties. In this case no
1119            // data is reported, so expected_data_vec should be empty.
1120            if i != 0 {
1121                expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(i)));
1122            }
1123
1124            //  There should be no calls to the `TYPE_ethernet` or `TYPE_wlanclient`
1125            // time series since nothing has been logged yet.
1126            let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1127            assert_eq!(
1128                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1129                &[]
1130            );
1131            assert_eq!(
1132                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1133                &[]
1134            );
1135            assert_eq!(
1136                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1137                &[]
1138            );
1139            assert_eq!(
1140                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1141                &[]
1142            );
1143
1144            test_helper.telemetry_sender.send(TelemetryEvent::LinkPropertiesUpdate {
1145                interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1146                link_properties: IpVersions {
1147                    ipv4: link_properties.clone(),
1148                    ipv6: link_properties,
1149                },
1150            });
1151            test_helper.advance_test_fut(&mut test_fut);
1152
1153            time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1154            assert_eq!(
1155                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1156                expected_data_vec
1157            );
1158            assert_eq!(
1159                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1160                expected_data_vec
1161            );
1162            // There should be no calls to the `TYPE_wlanclient` time series since the
1163            // update above was for `Ethernet`.
1164            assert_eq!(
1165                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1166                &[]
1167            );
1168            assert_eq!(
1169                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1170                &[]
1171            );
1172        }
1173    }
1174
1175    #[test]
1176    fn test_link_state_update() {
1177        let (mut test_helper, mut test_fut) = setup_test();
1178
1179        // Iterate through all of the different variants in the LinkState enum.
1180        // LinkState::None is the default so there should be no data recorded
1181        // for that value.
1182        let initial_value = LinkState::None.to_id() as u64;
1183        let final_value = LinkState::Internet.to_id() as u64;
1184        for i in initial_value..=final_value {
1185            let link_state = i.into();
1186            let mut expected_data_vec = vec![];
1187            if i != initial_value {
1188                expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(1 << i)));
1189            }
1190
1191            //  There should be no calls to the `TYPE_ethernet` or `TYPE_wlanclient`
1192            // time series since nothing has been logged yet.
1193            let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1194            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..], &[]);
1195            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..], &[]);
1196            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1197            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1198
1199            test_helper.telemetry_sender.send(TelemetryEvent::LinkStateUpdate {
1200                interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1201                link_state: IpVersions { ipv4: link_state, ipv6: link_state },
1202            });
1203            test_helper.advance_test_fut(&mut test_fut);
1204
1205            time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1206            assert_eq!(
1207                &time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..],
1208                expected_data_vec
1209            );
1210            assert_eq!(
1211                &time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..],
1212                expected_data_vec
1213            );
1214            // There should be no calls to the `TYPE_wlanclient` time series since the
1215            // update above was for `Ethernet`.
1216            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1217            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1218        }
1219    }
1220
1221    struct TestHelper {
1222        telemetry_sender: TelemetrySender,
1223        _inspector: Inspector,
1224        cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
1225        /// As requests to Cobalt are responded to via `self.drain_cobalt_events()`,
1226        /// their payloads are drained to this HashMap.
1227        cobalt_events: Vec<MetricEvent>,
1228        mock_time_matrix_client: MockTimeMatrixClient,
1229
1230        // Note: keep the executor field last in the struct so it gets dropped last.
1231        exec: fasync::TestExecutor,
1232    }
1233
1234    impl TestHelper {
1235        /// Advance executor until stalled, taking care of any blocking requests
1236        fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
1237            let mut made_progress = true;
1238            while made_progress {
1239                let _result = self.exec.run_until_stalled(test_fut);
1240                made_progress = false;
1241                while let Poll::Ready(Some(Ok(req))) =
1242                    self.exec.run_until_stalled(&mut self.cobalt_stream.next())
1243                {
1244                    self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
1245                    made_progress = true;
1246                }
1247            }
1248
1249            match self.exec.run_until_stalled(test_fut) {
1250                Poll::Pending => (),
1251                _ => panic!("expect test_fut to resolve to Poll::Pending"),
1252            }
1253        }
1254
1255        /// Advance executor by `duration`.
1256        /// This function repeatedly advances the executor by 1 second, triggering
1257        /// any expired timers and running the test_fut, until `duration` is reached.
1258        fn advance_by<T>(
1259            &mut self,
1260            duration: zx::MonotonicDuration,
1261            test_fut: &mut (impl Future<Output = T> + Unpin),
1262        ) {
1263            assert_eq!(
1264                duration.into_nanos() % STEP_INCREMENT.into_nanos(),
1265                0,
1266                "duration {:?} is not divisible by STEP_INCREMENT",
1267                duration,
1268            );
1269            const_assert_eq!(
1270                TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1271                0
1272            );
1273
1274            self.advance_test_fut(test_fut);
1275
1276            for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1277                self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1278                let _ = self.exec.wake_expired_timers();
1279                self.advance_test_fut(test_fut);
1280            }
1281        }
1282
1283        fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1284            self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1285        }
1286
1287        fn get_time_series<T>(
1288            &mut self,
1289            test_fut: &mut (impl Future<Output = T> + Unpin),
1290        ) -> Arc<Mutex<Stats>> {
1291            let (sender, mut receiver) = oneshot::channel();
1292            self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1293            self.advance_test_fut(test_fut);
1294            match receiver.try_recv() {
1295                Ok(Some(stats)) => stats,
1296                _ => panic!("Expect Stats to be returned"),
1297            }
1298        }
1299    }
1300
1301    trait CobaltExt {
1302        // Respond to MetricEventLoggerRequest and extract its MetricEvent.
1303        fn respond_to_metric_req(
1304            self,
1305            result: Result<(), fidl_fuchsia_metrics::Error>,
1306        ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1307    }
1308
1309    impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1310        fn respond_to_metric_req(
1311            self,
1312            result: Result<(), fidl_fuchsia_metrics::Error>,
1313        ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1314            match self {
1315                Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1316                    assert!(responder.send(result).is_ok());
1317                    vec![MetricEvent {
1318                        metric_id,
1319                        event_codes,
1320                        payload: MetricEventPayload::Count(count),
1321                    }]
1322                }
1323                Self::LogInteger { metric_id, value, event_codes, responder } => {
1324                    assert!(responder.send(result).is_ok());
1325                    vec![MetricEvent {
1326                        metric_id,
1327                        event_codes,
1328                        payload: MetricEventPayload::IntegerValue(value),
1329                    }]
1330                }
1331                Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1332                    assert!(responder.send(result).is_ok());
1333                    vec![MetricEvent {
1334                        metric_id,
1335                        event_codes,
1336                        payload: MetricEventPayload::Histogram(histogram),
1337                    }]
1338                }
1339                Self::LogString { metric_id, string_value, event_codes, responder } => {
1340                    assert!(responder.send(result).is_ok());
1341                    vec![MetricEvent {
1342                        metric_id,
1343                        event_codes,
1344                        payload: MetricEventPayload::StringValue(string_value),
1345                    }]
1346                }
1347                Self::LogMetricEvents { events, responder } => {
1348                    assert!(responder.send(result).is_ok());
1349                    events
1350                }
1351            }
1352        }
1353    }
1354
1355    fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = Result<(), Error>>>>) {
1356        let mut exec = fasync::TestExecutor::new_with_fake_time();
1357        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1358
1359        let (cobalt_proxy, cobalt_stream) =
1360            create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1361
1362        let inspector = Inspector::default();
1363        let inspect_node = inspector.root().create_child("telemetrytest");
1364        let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
1365        let mock_time_matrix_client = MockTimeMatrixClient::new();
1366
1367        let link_properties_state_logger = LinkPropertiesStateLogger::new(
1368            &inspect_metadata_node,
1369            &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1370            InterfaceTimeSeriesGrouping::Type(vec![
1371                InterfaceType::Ethernet,
1372                InterfaceType::WlanClient,
1373            ]),
1374            &mock_time_matrix_client,
1375        );
1376
1377        let interface_aware_logger_node = inspect_node.create_child("interfaces");
1378        let events_node = interface_aware_logger_node.create_child("events");
1379        let time_series_node = interface_aware_logger_node.create_child("time_series");
1380        let interface_aware_logger = InterfaceAwareLogger::new(
1381            &inspect_metadata_node,
1382            &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1383            InterfaceTimeSeriesGrouping::Type(vec![
1384                InterfaceType::Ethernet,
1385                InterfaceType::WlanClient,
1386            ]),
1387            events_node,
1388            time_series_node,
1389        );
1390
1391        inspect_node.record(interface_aware_logger_node);
1392
1393        let (telemetry_sender, test_fut) = serve_telemetry_inner(
1394            cobalt_proxy,
1395            inspect_node,
1396            link_properties_state_logger,
1397            interface_aware_logger,
1398        );
1399        let mut test_fut = Box::pin(test_fut);
1400
1401        assert_matches::assert_matches!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1402
1403        let test_helper = TestHelper {
1404            telemetry_sender,
1405            _inspector: inspector,
1406            cobalt_stream,
1407            cobalt_events: vec![],
1408            mock_time_matrix_client,
1409            exec,
1410        };
1411        (test_helper, test_fut)
1412    }
1413}