reachability_core/telemetry/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod convert;
6mod inspect;
7pub mod processors;
8
9use self::inspect::{inspect_record_stats, Stats};
10use crate::{IpVersions, LinkState, State};
11use processors::link_properties_state::{
12    InterfaceIdentifier, InterfaceTimeSeriesGrouping, InterfaceType, LinkProperties,
13    LinkPropertiesStateLogger,
14};
15
16use anyhow::{format_err, Context, Error};
17use cobalt_client::traits::AsEventCode;
18use fidl_fuchsia_metrics::MetricEvent;
19use fuchsia_cobalt_builders::MetricEventExt;
20use fuchsia_inspect::Node as InspectNode;
21use fuchsia_sync::Mutex;
22use futures::channel::{mpsc, oneshot};
23use futures::{future, select, Future, StreamExt, TryFutureExt};
24use log::{info, warn};
25use static_assertions::const_assert_eq;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use windowed_stats::aggregations::SumAndCount;
29use windowed_stats::experimental::serve::serve_time_matrix_inspection;
30use {fuchsia_async as fasync, network_policy_metrics_registry as metrics};
31
32#[cfg(test)]
33mod testing;
34
35pub async fn create_metrics_logger(
36    factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
37) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
38    let (cobalt_proxy, cobalt_server) =
39        fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
40
41    let project_spec = fidl_fuchsia_metrics::ProjectSpec {
42        customer_id: None, // defaults to fuchsia.
43        project_id: Some(metrics::PROJECT_ID),
44        ..Default::default()
45    };
46
47    let status = factory_proxy
48        .create_metric_event_logger(&project_spec, cobalt_server)
49        .await
50        .context("create metrics event logger")?;
51
52    match status {
53        Ok(_) => Ok(cobalt_proxy),
54        Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
55    }
56}
57
58#[derive(Clone, Debug)]
59pub struct TelemetrySender {
60    sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
61    sender_is_blocked: Arc<AtomicBool>,
62}
63
64impl TelemetrySender {
65    pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
66        Self {
67            sender: Arc::new(Mutex::new(sender)),
68            sender_is_blocked: Arc::new(AtomicBool::new(false)),
69        }
70    }
71
72    // Send telemetry event. Log an error if it fails.
73    pub fn send(&self, event: TelemetryEvent) {
74        match self.sender.lock().try_send(event) {
75            Ok(_) => {
76                // If sender has been blocked before, set bool to false and log message.
77                if self
78                    .sender_is_blocked
79                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
80                    .is_ok()
81                {
82                    info!("TelemetrySender recovered and resumed sending");
83                }
84            }
85            Err(_) => {
86                // If sender has not been blocked before, set bool to true and log error message.
87                if self
88                    .sender_is_blocked
89                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
90                    .is_ok()
91                {
92                    warn!("TelemetrySender dropped a msg: either buffer is full or no receiver is waiting");
93                }
94            }
95        }
96    }
97}
98
99#[derive(Debug, Clone, Default, PartialEq)]
100struct SystemStateSummary {
101    system_state: IpVersions<Option<State>>,
102}
103
104impl SystemStateSummary {
105    fn ipv4_link_state_val(&self) -> u32 {
106        match self.system_state.ipv4 {
107            Some(s) => s.link as u32,
108            None => 0u32,
109        }
110    }
111
112    fn ipv6_link_state_val(&self) -> u32 {
113        match self.system_state.ipv6 {
114            Some(s) => s.link as u32,
115            None => 0u32,
116        }
117    }
118}
119
120#[derive(Debug, Clone, Default, PartialEq)]
121struct NetworkConfig {
122    has_default_ipv4_route: bool,
123    has_default_ipv6_route: bool,
124}
125
126#[derive(Debug, Clone, Default)]
127pub struct SystemStateUpdate {
128    pub(crate) system_state: IpVersions<Option<State>>,
129}
130
131#[derive(Debug)]
132pub enum TelemetryEvent {
133    SystemStateUpdate {
134        update: SystemStateUpdate,
135    },
136    NetworkConfig {
137        has_default_ipv4_route: bool,
138        has_default_ipv6_route: bool,
139    },
140    GatewayProbe {
141        internet_available: bool,
142        gateway_discoverable: bool,
143        gateway_pingable: bool,
144    },
145    // The LinkProperties update corresponding to the interface described by
146    // the vector of interface identifiers.
147    LinkPropertiesUpdate {
148        interface_identifiers: Vec<InterfaceIdentifier>,
149        link_properties: IpVersions<LinkProperties>,
150    },
151    // The LinkState update corresponding to the interface described by the
152    // vector of interface identifiers.
153    LinkStateUpdate {
154        interface_identifiers: Vec<InterfaceIdentifier>,
155        link_state: IpVersions<LinkState>,
156    },
157    /// Get the TimeSeries held by telemetry loop. Intended for test only.
158    GetTimeSeries {
159        sender: oneshot::Sender<Arc<Mutex<Stats>>>,
160    },
161}
162
163/// Capacity of "first come, first serve" slots available to clients of
164/// the mpsc::Sender<TelemetryEvent>. This threshold is arbitrary.
165const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
166
167const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
168const METADATA_NODE_NAME: &str = "metadata";
169
170pub fn serve_telemetry(
171    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
172    inspect_node: InspectNode,
173) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
174    // Inspect nodes to hold time series and metadata for other nodes.
175    let inspect_time_series_node = inspect_node.create_child("time_series");
176    let link_properties_state_time_series_node =
177        inspect_time_series_node.create_child("link_properties_state");
178    let (time_matrix_client, time_matrix_fut) =
179        serve_time_matrix_inspection(link_properties_state_time_series_node);
180
181    inspect_node.record(inspect_time_series_node);
182    let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
183    let link_properties_state_logger = LinkPropertiesStateLogger::new(
184        &inspect_metadata_node,
185        &format!("root/telemetry/{METADATA_NODE_NAME}"),
186        InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
187        &time_matrix_client,
188    );
189    // Record the metadata node so it does not get dropped.
190    inspect_node.record(inspect_metadata_node);
191
192    let (sender, fut) =
193        serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
194    (sender, future::try_join(fut, time_matrix_fut).map_ok(|((), ())| ()))
195}
196
197fn serve_telemetry_inner(
198    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
199    inspect_node: InspectNode,
200    link_properties_state_logger: LinkPropertiesStateLogger,
201) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
202    let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
203    let sender = TelemetrySender::new(sender);
204    let cloned_sender = sender.clone();
205
206    let fut = async move {
207        let link_properties_state_logger = link_properties_state_logger;
208
209        let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
210        const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
211        const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
212        const INTERVAL_TICKS_PER_MINUTE: u64 =
213            (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
214        const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
215        let mut interval_tick = 0u64;
216        let mut telemetry =
217            Telemetry::new(cloned_sender, cobalt_proxy, link_properties_state_logger, inspect_node);
218        loop {
219            select! {
220                event = receiver.next() => {
221                    if let Some(event) = event {
222                        telemetry.handle_telemetry_event(event).await;
223                    }
224                }
225                _ = report_interval_stream.next() => {
226                    telemetry.handle_ten_secondly_telemetry();
227
228                    interval_tick += 1;
229                    if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
230                        telemetry.handle_hourly_telemetry().await;
231                    }
232                }
233            }
234        }
235    };
236    (sender, fut)
237}
238
239// Macro wrapper for logging simple events (occurrence, integer, histogram, string)
240// and log a warning when the status is not Ok.
241macro_rules! log_cobalt {
242    ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
243        let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
244        match status {
245            Ok(Ok(())) => (),
246            Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
247            Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
248        }
249    }};
250}
251
252macro_rules! log_cobalt_batch {
253    ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
254        let status = $cobalt_proxy.log_metric_events($events).await;
255        match status {
256            Ok(Ok(())) => (),
257            Ok(Err(e)) => {
258                warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
259            }
260            Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
261        }
262    }};
263}
264
265fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
266    const MILLIS_PER_SEC: i64 = 1000;
267    let millis = duration.into_millis();
268    let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
269    millis / MILLIS_PER_SEC + rounded_portion
270}
271
272struct Telemetry {
273    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
274    state_summary: Option<SystemStateSummary>,
275    state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
276    state_last_refreshed_for_inspect: fasync::MonotonicInstant,
277    reachability_lost_at: Option<(
278        fasync::MonotonicInstant,
279        metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
280    )>,
281    network_config: Option<NetworkConfig>,
282    network_config_last_refreshed: fasync::MonotonicInstant,
283    link_properties_state_logger: LinkPropertiesStateLogger,
284
285    _inspect_node: InspectNode,
286    stats: Arc<Mutex<Stats>>,
287}
288
289impl Telemetry {
290    pub fn new(
291        _telemetry_sender: TelemetrySender,
292        cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
293        link_properties_state_logger: LinkPropertiesStateLogger,
294        inspect_node: InspectNode,
295    ) -> Self {
296        let stats = Arc::new(Mutex::new(Stats::new()));
297        inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
298        Self {
299            cobalt_proxy,
300            state_summary: None,
301            state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
302            state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
303            reachability_lost_at: None,
304            network_config: None,
305            network_config_last_refreshed: fasync::MonotonicInstant::now(),
306            link_properties_state_logger,
307            _inspect_node: inspect_node,
308            stats,
309        }
310    }
311
312    pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
313        let now = fasync::MonotonicInstant::now();
314        match event {
315            TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
316                let new_state = Some(match &self.state_summary {
317                    Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
318                    None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
319                });
320                if self.state_summary != new_state {
321                    // Only log if system state has changed to prevent spamming Cobalt,
322                    // since this telemetry event may happen in quick succession.
323                    self.log_system_state_metrics(
324                        new_state,
325                        now,
326                        "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
327                    )
328                    .await
329                }
330            }
331            TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
332                let new_config =
333                    Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
334                self.log_network_config_metrics(
335                    new_config,
336                    now,
337                    "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
338                )
339                .await;
340            }
341            TelemetryEvent::GatewayProbe {
342                internet_available,
343                gateway_discoverable,
344                gateway_pingable,
345            } => {
346                if internet_available {
347                    let metric_id = match (gateway_discoverable, gateway_pingable) {
348                        (true, true) => None,
349                        (true, false) => {
350                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
351                        }
352                        (false, true) => {
353                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
354                        }
355                        (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
356                    };
357                    if let Some(metric_id) = metric_id {
358                        log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
359                    }
360                }
361            }
362            TelemetryEvent::LinkPropertiesUpdate { interface_identifiers, link_properties } => {
363                self.link_properties_state_logger
364                    .update_link_properties(interface_identifiers, &link_properties);
365            }
366            TelemetryEvent::LinkStateUpdate { interface_identifiers, link_state } => {
367                self.link_properties_state_logger
368                    .update_link_state(interface_identifiers, &link_state);
369            }
370            TelemetryEvent::GetTimeSeries { sender } => {
371                let _result = sender.send(Arc::clone(&self.stats));
372            }
373        }
374    }
375
376    async fn log_system_state_metrics(
377        &mut self,
378        new_state: Option<SystemStateSummary>,
379        now: fasync::MonotonicInstant,
380        ctx: &'static str,
381    ) {
382        let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
383        let duration_sec_inspect =
384            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
385        let mut metric_events = vec![];
386
387        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
388
389        if let Some(prev) = &self.state_summary {
390            if prev.system_state.has_interface_up() {
391                metric_events.push(
392                    MetricEvent::builder(
393                        metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
394                    )
395                    .as_integer(duration_cobalt.into_micros()),
396                );
397
398                let route_config_dim = convert::convert_route_config(&prev.system_state);
399                let internet_available =
400                    convert::convert_yes_no_dim(prev.system_state.has_internet());
401                let gateway_reachable =
402                    convert::convert_yes_no_dim(prev.system_state.has_gateway());
403                let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
404                let http_status = if prev.system_state.has_http() {
405                    metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
406                } else {
407                    metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
408                };
409                // We only log metric when at least one of IPv4 or IPv6 is in the SystemState.
410                if let Some(route_config) = route_config_dim {
411                    metric_events.push(
412                        MetricEvent::builder(
413                            metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
414                        )
415                        .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
416                            route_config,
417                            internet_available,
418                            gateway_reachable,
419                            dns_active,
420                            http_status,
421                        })
422                        .as_integer(duration_cobalt.into_micros()),
423                    );
424                }
425
426                if prev.system_state.has_internet() {
427                    self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
428                }
429                if prev.system_state.has_dns() {
430                    self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
431                }
432                if prev.system_state.has_http() {
433                    self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
434                }
435            }
436        }
437
438        if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
439            let previously_reachable =
440                prev.system_state.has_internet() && prev.system_state.has_dns();
441            let now_reachable =
442                new_state.system_state.has_internet() && new_state.system_state.has_dns();
443            if previously_reachable && !now_reachable {
444                let route_config_dim = convert::convert_route_config(&prev.system_state);
445                if let Some(route_config_dim) = route_config_dim {
446                    metric_events.push(
447                        MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
448                            .with_event_code(route_config_dim.as_event_code())
449                            .as_occurrence(1),
450                    );
451                    self.reachability_lost_at = Some((now, route_config_dim));
452                }
453                self.stats.lock().reachability_lost_count.log_value(&1);
454            }
455
456            if !previously_reachable && now_reachable {
457                if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
458                    metric_events.push(
459                        MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
460                            .with_event_code(route_config_dim.as_event_code())
461                            .as_integer((now - reachability_lost_at).into_micros()),
462                    );
463                }
464                self.reachability_lost_at = None;
465            }
466        }
467
468        if let Some(new_state) = &new_state {
469            self.stats
470                .lock()
471                .ipv4_state
472                .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
473            self.stats
474                .lock()
475                .ipv6_state
476                .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
477        }
478
479        if !metric_events.is_empty() {
480            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
481        }
482
483        if let Some(new_state) = new_state {
484            self.state_summary = Some(new_state);
485        }
486        self.state_last_refreshed_for_cobalt = now;
487        self.state_last_refreshed_for_inspect = now;
488    }
489
490    async fn log_network_config_metrics(
491        &mut self,
492        new_config: Option<NetworkConfig>,
493        now: fasync::MonotonicInstant,
494        ctx: &'static str,
495    ) {
496        let duration = now - self.network_config_last_refreshed;
497        let mut metric_events = vec![];
498
499        if let Some(prev) = &self.network_config {
500            let default_route_dim = convert::convert_default_route(
501                prev.has_default_ipv4_route,
502                prev.has_default_ipv6_route,
503            );
504            // We only log metric when at least one of IPv4 or IPv6 has default route.
505            if let Some(default_route_dim) = default_route_dim {
506                metric_events.push(
507                    MetricEvent::builder(
508                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
509                    )
510                    .with_event_code(default_route_dim.as_event_code())
511                    .as_integer(duration.into_micros()),
512                );
513                metric_events.push(
514                    MetricEvent::builder(
515                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
516                    )
517                    .with_event_code(default_route_dim.as_event_code())
518                    .as_occurrence(1),
519                );
520            }
521        }
522
523        if !metric_events.is_empty() {
524            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
525        }
526
527        if let Some(new_config) = new_config {
528            self.network_config = Some(new_config);
529        }
530        self.network_config_last_refreshed = now;
531    }
532
533    pub fn handle_ten_secondly_telemetry(&mut self) {
534        let now = fasync::MonotonicInstant::now();
535        let duration_sec_inspect =
536            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
537
538        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
539
540        if let Some(current) = &self.state_summary {
541            if current.system_state.has_internet() {
542                self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
543            }
544            if current.system_state.has_dns() {
545                self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
546            }
547            if current.system_state.has_http() {
548                self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
549            }
550
551            self.stats
552                .lock()
553                .ipv4_state
554                .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
555            self.stats
556                .lock()
557                .ipv6_state
558                .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
559        }
560        self.state_last_refreshed_for_inspect = now;
561    }
562
563    pub async fn handle_hourly_telemetry(&mut self) {
564        let now = fasync::MonotonicInstant::now();
565        self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
566        self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use crate::{ApplicationState, LinkState};
573
574    use super::*;
575    use fidl::endpoints::create_proxy_and_stream;
576    use fidl_fuchsia_metrics::MetricEventPayload;
577    use fuchsia_inspect::Inspector;
578    use fuchsia_inspect_contrib::id_enum::IdEnum;
579
580    use futures::task::Poll;
581    use std::pin::Pin;
582    use test_case::test_case;
583    use windowed_stats::experimental::clock::Timed;
584    use windowed_stats::experimental::testing::{MockTimeMatrixClient, TimeMatrixCall};
585
586    const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
587
588    #[test]
589    fn test_log_state_change() {
590        let (mut test_helper, mut test_fut) = setup_test();
591
592        let mut update = SystemStateUpdate {
593            system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
594        };
595        test_helper
596            .telemetry_sender
597            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
598
599        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
600
601        update.system_state = IpVersions {
602            ipv4: Some(State {
603                link: LinkState::Internet,
604                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
605            }),
606            ipv6: None,
607        };
608        test_helper
609            .telemetry_sender
610            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
611
612        test_helper.advance_test_fut(&mut test_fut);
613
614        let logged_metrics = test_helper
615            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
616        assert_eq!(logged_metrics.len(), 1);
617        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
618
619        let logged_metrics = test_helper
620            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
621        assert_eq!(logged_metrics.len(), 1);
622        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
623        assert_eq!(
624            logged_metrics[0].event_codes,
625            &[
626                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
627                    as u32,
628                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
629                    as u32,
630                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
631                    as u32,
632                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
633                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
634                    as u32,
635            ]
636        );
637
638        test_helper.cobalt_events.clear();
639        test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
640
641        // At the 1 hour mark, the new state is logged via periodic telemetry.
642        // All metrics are the same as before, except for the elapsed duration and the
643        // `dns_active` flag is now true.
644        let logged_metrics = test_helper
645            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
646        assert_eq!(logged_metrics.len(), 1);
647        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
648
649        let logged_metrics = test_helper
650            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
651        assert_eq!(logged_metrics.len(), 1);
652        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
653        assert_eq!(
654            logged_metrics[0].event_codes,
655            &[
656                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
657                    as u32,
658                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
659                    as u32,
660                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
661                    as u32,
662                // This time dns_active is Yes, and http_status is HttpOnly
663                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
664                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
665                    as u32,
666            ]
667        );
668    }
669
670    #[test]
671    fn test_log_reachability_lost() {
672        let (mut test_helper, mut test_fut) = setup_test();
673
674        let mut update = SystemStateUpdate {
675            system_state: IpVersions {
676                ipv4: None,
677                ipv6: Some(State {
678                    link: LinkState::Internet,
679                    application: ApplicationState { dns_resolved: true, ..Default::default() },
680                }),
681            },
682            ..SystemStateUpdate::default()
683        };
684        test_helper
685            .telemetry_sender
686            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
687        test_helper.advance_test_fut(&mut test_fut);
688
689        update.system_state = IpVersions {
690            ipv4: None,
691            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
692        };
693        test_helper
694            .telemetry_sender
695            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
696        test_helper.advance_test_fut(&mut test_fut);
697
698        // Reachability lost metric is lost because previously both `internet_available` and
699        // `dns_active` were true, and now the latter is false.
700        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
701        assert_eq!(logged_metrics.len(), 1);
702        assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
703        assert_eq!(
704            logged_metrics[0].event_codes,
705            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
706        );
707
708        test_helper.cobalt_events.clear();
709
710        update.system_state = IpVersions {
711            ipv4: None,
712            ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
713        };
714        test_helper
715            .telemetry_sender
716            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
717        test_helper.advance_test_fut(&mut test_fut);
718
719        // Reachability lost metric is not logged again even when `internet_available`
720        // becomes false, because it was already considered lost previously.
721        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
722        assert_eq!(logged_metrics.len(), 0);
723
724        test_helper.cobalt_events.clear();
725
726        test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
727        update.system_state = IpVersions {
728            ipv4: Some(State {
729                link: LinkState::Internet,
730                application: ApplicationState { dns_resolved: true, ..Default::default() },
731            }),
732            ipv6: None,
733        };
734        test_helper
735            .telemetry_sender
736            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
737        test_helper.advance_test_fut(&mut test_fut);
738
739        // When reachability is recovered, the duration that reachability was lost is logged.
740        let logged_metrics =
741            test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
742        assert_eq!(logged_metrics.len(), 1);
743        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
744        assert_eq!(
745            logged_metrics[0].event_codes,
746            // Reachability is recovered on ipv4, but we still log the ipv6 dimension because
747            // that was the config when reachability was lost.
748            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
749        );
750    }
751
752    #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
753    #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
754    #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
755    #[test_case(false, false, None; "no default routes")]
756    #[fuchsia::test(add_test_attr = false)]
757    fn test_log_default_route(
758        has_default_ipv4_route: bool,
759        has_default_ipv6_route: bool,
760        expected_dim: Option<
761            metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
762        >,
763    ) {
764        let (mut test_helper, mut test_fut) = setup_test();
765
766        test_helper
767            .telemetry_sender
768            .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
769        test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
770
771        let logged_metrics = test_helper
772            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
773        match expected_dim {
774            Some(dim) => {
775                assert_eq!(logged_metrics.len(), 1);
776                assert_eq!(
777                    logged_metrics[0].payload,
778                    MetricEventPayload::IntegerValue(3600_000_000)
779                );
780                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
781            }
782            None => {
783                assert_eq!(logged_metrics.len(), 0);
784            }
785        }
786
787        let logged_metrics = test_helper
788            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
789        match expected_dim {
790            Some(dim) => {
791                assert_eq!(logged_metrics.len(), 1);
792                assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
793                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
794            }
795            None => {
796                assert_eq!(logged_metrics.len(), 0);
797            }
798        }
799    }
800
801    #[test_case(true, true, vec![]; "negative")]
802    #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
803    #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
804    #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
805    #[fuchsia::test(add_test_attr = false)]
806    fn test_log_abnormal_state_situation(
807        gateway_discoverable: bool,
808        gateway_pingable: bool,
809        expected_metrics: Vec<u32>,
810    ) {
811        let (mut test_helper, mut test_fut) = setup_test();
812
813        test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
814            internet_available: true,
815            gateway_discoverable,
816            gateway_pingable,
817        });
818        test_helper.advance_test_fut(&mut test_fut);
819
820        let logged_metrics: Vec<u32> = [
821            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
822            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
823            metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
824        ]
825        .into_iter()
826        .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
827        .collect();
828
829        assert_eq!(logged_metrics, expected_metrics);
830    }
831
832    #[test]
833    fn test_state_snapshot_duration_inspect_stats() {
834        let (mut test_helper, mut test_fut) = setup_test();
835
836        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
837
838        let time_series = test_helper.get_time_series(&mut test_fut);
839        let internet_available_sec: Vec<_> =
840            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
841        let dns_active_sec: Vec<_> =
842            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
843        let http_active_sec: Vec<_> =
844            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
845        let total_duration_sec: Vec<_> =
846            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
847        assert_eq!(internet_available_sec, vec![0]);
848        assert_eq!(dns_active_sec, vec![0]);
849        assert_eq!(http_active_sec, vec![0]);
850        assert_eq!(total_duration_sec, vec![20]);
851
852        let mut update = SystemStateUpdate {
853            system_state: IpVersions {
854                ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
855                ipv6: None,
856            },
857        };
858        test_helper
859            .telemetry_sender
860            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
861        test_helper.advance_test_fut(&mut test_fut);
862        let time_series = test_helper.get_time_series(&mut test_fut);
863        let total_duration_sec: Vec<_> =
864            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
865        assert_eq!(total_duration_sec, vec![25]);
866
867        test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
868
869        // Now 40 seconds mark
870
871        let time_series = test_helper.get_time_series(&mut test_fut);
872        let internet_available_sec: Vec<_> =
873            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
874        let dns_active_sec: Vec<_> =
875            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
876        let http_active_sec: Vec<_> =
877            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
878        let total_duration_sec: Vec<_> =
879            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
880        assert_eq!(internet_available_sec, vec![15]);
881        assert_eq!(dns_active_sec, vec![0]);
882        assert_eq!(http_active_sec, vec![0]);
883        assert_eq!(total_duration_sec, vec![40]);
884
885        update.system_state = IpVersions {
886            ipv4: Some(State {
887                link: LinkState::Internet,
888                application: ApplicationState { dns_resolved: true, ..Default::default() },
889            }),
890            ipv6: None,
891        };
892        test_helper
893            .telemetry_sender
894            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
895
896        test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
897
898        // Now 90 seconds mark
899
900        let time_series = test_helper.get_time_series(&mut test_fut);
901        let internet_available_sec: Vec<_> =
902            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
903        let dns_active_sec: Vec<_> =
904            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
905        let http_active_sec: Vec<_> =
906            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
907        let total_duration_sec: Vec<_> =
908            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
909        assert_eq!(internet_available_sec, vec![25, 40]);
910        assert_eq!(dns_active_sec, vec![10, 40]);
911        assert_eq!(http_active_sec, vec![0]);
912        assert_eq!(total_duration_sec, vec![40]);
913
914        update.system_state = IpVersions {
915            ipv4: Some(State {
916                link: LinkState::Internet,
917                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
918            }),
919            ipv6: None,
920        };
921        test_helper
922            .telemetry_sender
923            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
924
925        test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
926
927        // Now 120 seconds mark
928
929        let time_series = test_helper.get_time_series(&mut test_fut);
930        let internet_available_sec: Vec<_> =
931            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
932        let dns_active_sec: Vec<_> =
933            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
934        let http_active_sec: Vec<_> =
935            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
936        let total_duration_sec: Vec<_> =
937            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
938        assert_eq!(internet_available_sec, vec![25, 60, 40]);
939        assert_eq!(dns_active_sec, vec![10, 60, 40]);
940        assert_eq!(http_active_sec, vec![0, 20, 40]);
941        assert_eq!(total_duration_sec, vec![40]);
942    }
943
944    #[test]
945    fn test_reachability_lost_count_inspect_stats() {
946        let (mut test_helper, mut test_fut) = setup_test();
947
948        let mut update = SystemStateUpdate {
949            system_state: IpVersions {
950                ipv4: None,
951                ipv6: Some(State {
952                    link: LinkState::Internet,
953                    application: ApplicationState { dns_resolved: true, ..Default::default() },
954                }),
955            },
956            ..SystemStateUpdate::default()
957        };
958        test_helper
959            .telemetry_sender
960            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
961        test_helper.advance_test_fut(&mut test_fut);
962
963        let time_series = test_helper.get_time_series(&mut test_fut);
964        let reachability_lost_count: Vec<_> =
965            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
966        assert_eq!(reachability_lost_count, vec![0]);
967
968        update.system_state = IpVersions {
969            ipv4: None,
970            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
971        };
972        test_helper
973            .telemetry_sender
974            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
975        test_helper.advance_test_fut(&mut test_fut);
976
977        let time_series = test_helper.get_time_series(&mut test_fut);
978        let reachability_lost_count: Vec<_> =
979            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
980        assert_eq!(reachability_lost_count, vec![1]);
981    }
982
983    #[test]
984    fn test_avg_state_inspect_stats() {
985        let (mut test_helper, mut test_fut) = setup_test();
986
987        let mut update = SystemStateUpdate {
988            system_state: IpVersions {
989                ipv4: None,
990                ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
991            },
992            ..SystemStateUpdate::default()
993        };
994        test_helper
995            .telemetry_sender
996            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
997        test_helper.advance_test_fut(&mut test_fut);
998
999        let time_series = test_helper.get_time_series(&mut test_fut);
1000        let ipv4_state: Vec<_> =
1001            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1002        let ipv6_state: Vec<_> =
1003            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1004        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
1005        assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
1006
1007        update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
1008        test_helper
1009            .telemetry_sender
1010            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1011        test_helper.advance_test_fut(&mut test_fut);
1012
1013        let time_series = test_helper.get_time_series(&mut test_fut);
1014        let ipv4_state: Vec<_> =
1015            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1016        let ipv6_state: Vec<_> =
1017            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1018        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
1019        assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
1020    }
1021
1022    #[test]
1023    fn test_link_properties_update() {
1024        let (mut test_helper, mut test_fut) = setup_test();
1025
1026        // Iterate through all of the different combinations of booleans in the
1027        // LinkProperties struct. There are 4 different booleans, so 2^4
1028        // different combinations.
1029        for i in 0..=15 {
1030            let link_properties = LinkProperties::from(i);
1031            let mut expected_data_vec = vec![];
1032            // Index 0 indicates that all the booleans are false which is the
1033            // same as the default value of LinkProperties. In this case no
1034            // data is reported, so expected_data_vec should be empty.
1035            if i != 0 {
1036                expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(i)));
1037            }
1038
1039            //  There should be no calls to the `TYPE_ethernet` or `TYPE_wlanclient`
1040            // time series since nothing has been logged yet.
1041            let mut time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1042            assert_eq!(
1043                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1044                &[]
1045            );
1046            assert_eq!(
1047                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1048                &[]
1049            );
1050            assert_eq!(
1051                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1052                &[]
1053            );
1054            assert_eq!(
1055                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1056                &[]
1057            );
1058
1059            test_helper.telemetry_sender.send(TelemetryEvent::LinkPropertiesUpdate {
1060                interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1061                link_properties: IpVersions {
1062                    ipv4: link_properties.clone(),
1063                    ipv6: link_properties,
1064                },
1065            });
1066            test_helper.advance_test_fut(&mut test_fut);
1067
1068            time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1069            assert_eq!(
1070                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1071                expected_data_vec
1072            );
1073            assert_eq!(
1074                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1075                expected_data_vec
1076            );
1077            // There should be no calls to the `TYPE_wlanclient` time series since the
1078            // update above was for `Ethernet`.
1079            assert_eq!(
1080                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1081                &[]
1082            );
1083            assert_eq!(
1084                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1085                &[]
1086            );
1087        }
1088    }
1089
1090    #[test]
1091    fn test_link_state_update() {
1092        let (mut test_helper, mut test_fut) = setup_test();
1093
1094        // Iterate through all of the different variants in the LinkState enum.
1095        // LinkState::None is the default so there should be no data recorded
1096        // for that value.
1097        let initial_value = LinkState::None.to_id() as u64;
1098        let final_value = LinkState::Internet.to_id() as u64;
1099        for i in initial_value..=final_value {
1100            let link_state = i.into();
1101            let mut expected_data_vec = vec![];
1102            if i != initial_value {
1103                expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(1 << i)));
1104            }
1105
1106            //  There should be no calls to the `TYPE_ethernet` or `TYPE_wlanclient`
1107            // time series since nothing has been logged yet.
1108            let mut time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1109            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..], &[]);
1110            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..], &[]);
1111            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1112            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1113
1114            test_helper.telemetry_sender.send(TelemetryEvent::LinkStateUpdate {
1115                interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1116                link_state: IpVersions { ipv4: link_state, ipv6: link_state },
1117            });
1118            test_helper.advance_test_fut(&mut test_fut);
1119
1120            time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1121            assert_eq!(
1122                &time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..],
1123                expected_data_vec
1124            );
1125            assert_eq!(
1126                &time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..],
1127                expected_data_vec
1128            );
1129            // There should be no calls to the `TYPE_wlanclient` time series since the
1130            // update above was for `Ethernet`.
1131            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1132            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1133        }
1134    }
1135
1136    struct TestHelper {
1137        telemetry_sender: TelemetrySender,
1138        _inspector: Inspector,
1139        cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
1140        /// As requests to Cobalt are responded to via `self.drain_cobalt_events()`,
1141        /// their payloads are drained to this HashMap.
1142        cobalt_events: Vec<MetricEvent>,
1143        mock_time_matrix_client: MockTimeMatrixClient,
1144
1145        // Note: keep the executor field last in the struct so it gets dropped last.
1146        exec: fasync::TestExecutor,
1147    }
1148
1149    impl TestHelper {
1150        /// Advance executor until stalled, taking care of any blocking requests
1151        fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
1152            let mut made_progress = true;
1153            while made_progress {
1154                let _result = self.exec.run_until_stalled(test_fut);
1155                made_progress = false;
1156                while let Poll::Ready(Some(Ok(req))) =
1157                    self.exec.run_until_stalled(&mut self.cobalt_stream.next())
1158                {
1159                    self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
1160                    made_progress = true;
1161                }
1162            }
1163
1164            match self.exec.run_until_stalled(test_fut) {
1165                Poll::Pending => (),
1166                _ => panic!("expect test_fut to resolve to Poll::Pending"),
1167            }
1168        }
1169
1170        /// Advance executor by `duration`.
1171        /// This function repeatedly advances the executor by 1 second, triggering
1172        /// any expired timers and running the test_fut, until `duration` is reached.
1173        fn advance_by<T>(
1174            &mut self,
1175            duration: zx::MonotonicDuration,
1176            test_fut: &mut (impl Future<Output = T> + Unpin),
1177        ) {
1178            assert_eq!(
1179                duration.into_nanos() % STEP_INCREMENT.into_nanos(),
1180                0,
1181                "duration {:?} is not divisible by STEP_INCREMENT",
1182                duration,
1183            );
1184            const_assert_eq!(
1185                TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1186                0
1187            );
1188
1189            self.advance_test_fut(test_fut);
1190
1191            for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1192                self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1193                let _ = self.exec.wake_expired_timers();
1194                self.advance_test_fut(test_fut);
1195            }
1196        }
1197
1198        fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1199            self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1200        }
1201
1202        fn get_time_series<T>(
1203            &mut self,
1204            test_fut: &mut (impl Future<Output = T> + Unpin),
1205        ) -> Arc<Mutex<Stats>> {
1206            let (sender, mut receiver) = oneshot::channel();
1207            self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1208            self.advance_test_fut(test_fut);
1209            match receiver.try_recv() {
1210                Ok(Some(stats)) => stats,
1211                _ => panic!("Expect Stats to be returned"),
1212            }
1213        }
1214    }
1215
1216    trait CobaltExt {
1217        // Respond to MetricEventLoggerRequest and extract its MetricEvent.
1218        fn respond_to_metric_req(
1219            self,
1220            result: Result<(), fidl_fuchsia_metrics::Error>,
1221        ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1222    }
1223
1224    impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1225        fn respond_to_metric_req(
1226            self,
1227            result: Result<(), fidl_fuchsia_metrics::Error>,
1228        ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1229            match self {
1230                Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1231                    assert!(responder.send(result).is_ok());
1232                    vec![MetricEvent {
1233                        metric_id,
1234                        event_codes,
1235                        payload: MetricEventPayload::Count(count),
1236                    }]
1237                }
1238                Self::LogInteger { metric_id, value, event_codes, responder } => {
1239                    assert!(responder.send(result).is_ok());
1240                    vec![MetricEvent {
1241                        metric_id,
1242                        event_codes,
1243                        payload: MetricEventPayload::IntegerValue(value),
1244                    }]
1245                }
1246                Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1247                    assert!(responder.send(result).is_ok());
1248                    vec![MetricEvent {
1249                        metric_id,
1250                        event_codes,
1251                        payload: MetricEventPayload::Histogram(histogram),
1252                    }]
1253                }
1254                Self::LogString { metric_id, string_value, event_codes, responder } => {
1255                    assert!(responder.send(result).is_ok());
1256                    vec![MetricEvent {
1257                        metric_id,
1258                        event_codes,
1259                        payload: MetricEventPayload::StringValue(string_value),
1260                    }]
1261                }
1262                Self::LogMetricEvents { events, responder } => {
1263                    assert!(responder.send(result).is_ok());
1264                    events
1265                }
1266            }
1267        }
1268    }
1269
1270    fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = Result<(), Error>>>>) {
1271        let mut exec = fasync::TestExecutor::new_with_fake_time();
1272        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1273
1274        let (cobalt_proxy, cobalt_stream) =
1275            create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1276
1277        let inspector = Inspector::default();
1278        let inspect_node = inspector.root().create_child("telemetrytest");
1279        let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
1280        let mock_time_matrix_client = MockTimeMatrixClient::new();
1281
1282        let link_properties_state_logger = LinkPropertiesStateLogger::new(
1283            &inspect_metadata_node,
1284            &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1285            InterfaceTimeSeriesGrouping::Type(vec![
1286                InterfaceType::Ethernet,
1287                InterfaceType::WlanClient,
1288            ]),
1289            &mock_time_matrix_client,
1290        );
1291
1292        let (telemetry_sender, test_fut) =
1293            serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
1294        let mut test_fut = Box::pin(test_fut);
1295
1296        assert_matches::assert_matches!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1297
1298        let test_helper = TestHelper {
1299            telemetry_sender,
1300            _inspector: inspector,
1301            cobalt_stream,
1302            cobalt_events: vec![],
1303            mock_time_matrix_client,
1304            exec,
1305        };
1306        (test_helper, test_fut)
1307    }
1308}