reachability_core/telemetry/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod convert;
6mod inspect;
7pub mod processors;
8
9use self::inspect::{Stats, inspect_record_stats};
10use crate::{IpVersions, LinkState, State};
11use processors::link_properties_state::{
12    InterfaceIdentifier, InterfaceTimeSeriesGrouping, InterfaceType, LinkProperties,
13    LinkPropertiesStateLogger,
14};
15
16use anyhow::{Context, Error, format_err};
17use cobalt_client::traits::AsEventCode;
18use fidl_fuchsia_metrics::MetricEvent;
19use fuchsia_cobalt_builders::MetricEventExt;
20use fuchsia_inspect::Node as InspectNode;
21use fuchsia_sync::Mutex;
22use futures::channel::{mpsc, oneshot};
23use futures::{Future, StreamExt, select};
24use log::{info, warn};
25use static_assertions::const_assert_eq;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use windowed_stats::aggregations::SumAndCount;
29use windowed_stats::experimental::inspect::TimeMatrixClient;
30use {fuchsia_async as fasync, network_policy_metrics_registry as metrics};
31
32#[cfg(test)]
33mod testing;
34
35pub async fn create_metrics_logger(
36    factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
37) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
38    let (cobalt_proxy, cobalt_server) =
39        fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
40
41    let project_spec = fidl_fuchsia_metrics::ProjectSpec {
42        customer_id: None, // defaults to fuchsia.
43        project_id: Some(metrics::PROJECT_ID),
44        ..Default::default()
45    };
46
47    let status = factory_proxy
48        .create_metric_event_logger(&project_spec, cobalt_server)
49        .await
50        .context("create metrics event logger")?;
51
52    match status {
53        Ok(_) => Ok(cobalt_proxy),
54        Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
55    }
56}
57
58#[derive(Clone, Debug)]
59pub struct TelemetrySender {
60    sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
61    sender_is_blocked: Arc<AtomicBool>,
62}
63
64impl TelemetrySender {
65    pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
66        Self {
67            sender: Arc::new(Mutex::new(sender)),
68            sender_is_blocked: Arc::new(AtomicBool::new(false)),
69        }
70    }
71
72    // Send telemetry event. Log an error if it fails.
73    pub fn send(&self, event: TelemetryEvent) {
74        match self.sender.lock().try_send(event) {
75            Ok(_) => {
76                // If sender has been blocked before, set bool to false and log message.
77                if self
78                    .sender_is_blocked
79                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
80                    .is_ok()
81                {
82                    info!("TelemetrySender recovered and resumed sending");
83                }
84            }
85            Err(_) => {
86                // If sender has not been blocked before, set bool to true and log error message.
87                if self
88                    .sender_is_blocked
89                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
90                    .is_ok()
91                {
92                    warn!(
93                        "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
94                    );
95                }
96            }
97        }
98    }
99}
100
101#[derive(Debug, Clone, Default, PartialEq)]
102struct SystemStateSummary {
103    system_state: IpVersions<Option<State>>,
104}
105
106impl SystemStateSummary {
107    fn ipv4_link_state_val(&self) -> u32 {
108        match self.system_state.ipv4 {
109            Some(s) => s.link as u32,
110            None => 0u32,
111        }
112    }
113
114    fn ipv6_link_state_val(&self) -> u32 {
115        match self.system_state.ipv6 {
116            Some(s) => s.link as u32,
117            None => 0u32,
118        }
119    }
120}
121
122#[derive(Debug, Clone, Default, PartialEq)]
123struct NetworkConfig {
124    has_default_ipv4_route: bool,
125    has_default_ipv6_route: bool,
126}
127
128#[derive(Debug, Clone, Default)]
129pub struct SystemStateUpdate {
130    pub(crate) system_state: IpVersions<Option<State>>,
131}
132
133#[derive(Debug)]
134pub enum TelemetryEvent {
135    SystemStateUpdate {
136        update: SystemStateUpdate,
137    },
138    NetworkConfig {
139        has_default_ipv4_route: bool,
140        has_default_ipv6_route: bool,
141    },
142    GatewayProbe {
143        internet_available: bool,
144        gateway_discoverable: bool,
145        gateway_pingable: bool,
146    },
147    // The LinkProperties update corresponding to the interface described by
148    // the vector of interface identifiers.
149    LinkPropertiesUpdate {
150        interface_identifiers: Vec<InterfaceIdentifier>,
151        link_properties: IpVersions<LinkProperties>,
152    },
153    // The LinkState update corresponding to the interface described by the
154    // vector of interface identifiers.
155    LinkStateUpdate {
156        interface_identifiers: Vec<InterfaceIdentifier>,
157        link_state: IpVersions<LinkState>,
158    },
159    /// Get the TimeSeries held by telemetry loop. Intended for test only.
160    GetTimeSeries {
161        sender: oneshot::Sender<Arc<Mutex<Stats>>>,
162    },
163}
164
165/// Capacity of "first come, first serve" slots available to clients of
166/// the mpsc::Sender<TelemetryEvent>. This threshold is arbitrary.
167const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
168
169const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
170const METADATA_NODE_NAME: &str = "metadata";
171
172pub fn serve_telemetry(
173    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
174    inspect_node: InspectNode,
175) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
176    // Inspect nodes to hold time series and metadata for other nodes.
177    let inspect_time_series_node = inspect_node.create_child("time_series");
178    let link_properties_state_time_series_node =
179        inspect_time_series_node.create_child("link_properties_state");
180    let time_matrix_client =
181        TimeMatrixClient::new(link_properties_state_time_series_node.clone_weak());
182
183    inspect_node.record(inspect_time_series_node);
184    inspect_node.record(link_properties_state_time_series_node);
185    let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
186    let link_properties_state_logger = LinkPropertiesStateLogger::new(
187        &inspect_metadata_node,
188        &format!("root/telemetry/{METADATA_NODE_NAME}"),
189        InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
190        &time_matrix_client,
191    );
192    // Record the metadata node so it does not get dropped.
193    inspect_node.record(inspect_metadata_node);
194
195    let (sender, fut) =
196        serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
197    (sender, fut)
198}
199
200fn serve_telemetry_inner(
201    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
202    inspect_node: InspectNode,
203    link_properties_state_logger: LinkPropertiesStateLogger,
204) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
205    let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
206    let sender = TelemetrySender::new(sender);
207    let cloned_sender = sender.clone();
208
209    let fut = async move {
210        let link_properties_state_logger = link_properties_state_logger;
211
212        let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
213        const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
214        const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
215        const INTERVAL_TICKS_PER_MINUTE: u64 =
216            (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
217        const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
218        let mut interval_tick = 0u64;
219        let mut telemetry =
220            Telemetry::new(cloned_sender, cobalt_proxy, link_properties_state_logger, inspect_node);
221        loop {
222            select! {
223                event = receiver.next() => {
224                    if let Some(event) = event {
225                        telemetry.handle_telemetry_event(event).await;
226                    }
227                }
228                _ = report_interval_stream.next() => {
229                    telemetry.handle_ten_secondly_telemetry();
230
231                    interval_tick += 1;
232                    if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
233                        telemetry.handle_hourly_telemetry().await;
234                    }
235                }
236            }
237        }
238    };
239    (sender, fut)
240}
241
242// Macro wrapper for logging simple events (occurrence, integer, histogram, string)
243// and log a warning when the status is not Ok.
244macro_rules! log_cobalt {
245    ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
246        let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
247        match status {
248            Ok(Ok(())) => (),
249            Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
250            Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
251        }
252    }};
253}
254
255macro_rules! log_cobalt_batch {
256    ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
257        let status = $cobalt_proxy.log_metric_events($events).await;
258        match status {
259            Ok(Ok(())) => (),
260            Ok(Err(e)) => {
261                warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
262            }
263            Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
264        }
265    }};
266}
267
268fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
269    const MILLIS_PER_SEC: i64 = 1000;
270    let millis = duration.into_millis();
271    let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
272    millis / MILLIS_PER_SEC + rounded_portion
273}
274
275struct Telemetry {
276    cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
277    state_summary: Option<SystemStateSummary>,
278    state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
279    state_last_refreshed_for_inspect: fasync::MonotonicInstant,
280    reachability_lost_at: Option<(
281        fasync::MonotonicInstant,
282        metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
283    )>,
284    network_config: Option<NetworkConfig>,
285    network_config_last_refreshed: fasync::MonotonicInstant,
286    link_properties_state_logger: LinkPropertiesStateLogger,
287
288    _inspect_node: InspectNode,
289    stats: Arc<Mutex<Stats>>,
290}
291
292impl Telemetry {
293    pub fn new(
294        _telemetry_sender: TelemetrySender,
295        cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
296        link_properties_state_logger: LinkPropertiesStateLogger,
297        inspect_node: InspectNode,
298    ) -> Self {
299        let stats = Arc::new(Mutex::new(Stats::new()));
300        inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
301        Self {
302            cobalt_proxy,
303            state_summary: None,
304            state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
305            state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
306            reachability_lost_at: None,
307            network_config: None,
308            network_config_last_refreshed: fasync::MonotonicInstant::now(),
309            link_properties_state_logger,
310            _inspect_node: inspect_node,
311            stats,
312        }
313    }
314
315    pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
316        let now = fasync::MonotonicInstant::now();
317        match event {
318            TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
319                let new_state = Some(match &self.state_summary {
320                    Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
321                    None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
322                });
323                if self.state_summary != new_state {
324                    // Only log if system state has changed to prevent spamming Cobalt,
325                    // since this telemetry event may happen in quick succession.
326                    self.log_system_state_metrics(
327                        new_state,
328                        now,
329                        "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
330                    )
331                    .await
332                }
333            }
334            TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
335                let new_config =
336                    Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
337                self.log_network_config_metrics(
338                    new_config,
339                    now,
340                    "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
341                )
342                .await;
343            }
344            TelemetryEvent::GatewayProbe {
345                internet_available,
346                gateway_discoverable,
347                gateway_pingable,
348            } => {
349                if internet_available {
350                    let metric_id = match (gateway_discoverable, gateway_pingable) {
351                        (true, true) => None,
352                        (true, false) => {
353                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
354                        }
355                        (false, true) => {
356                            Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
357                        }
358                        (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
359                    };
360                    if let Some(metric_id) = metric_id {
361                        log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
362                    }
363                }
364            }
365            TelemetryEvent::LinkPropertiesUpdate { interface_identifiers, link_properties } => {
366                self.link_properties_state_logger
367                    .update_link_properties(interface_identifiers, &link_properties);
368            }
369            TelemetryEvent::LinkStateUpdate { interface_identifiers, link_state } => {
370                self.link_properties_state_logger
371                    .update_link_state(interface_identifiers, &link_state);
372            }
373            TelemetryEvent::GetTimeSeries { sender } => {
374                let _result = sender.send(Arc::clone(&self.stats));
375            }
376        }
377    }
378
379    async fn log_system_state_metrics(
380        &mut self,
381        new_state: Option<SystemStateSummary>,
382        now: fasync::MonotonicInstant,
383        ctx: &'static str,
384    ) {
385        let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
386        let duration_sec_inspect =
387            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
388        let mut metric_events = vec![];
389
390        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
391
392        if let Some(prev) = &self.state_summary {
393            if prev.system_state.has_interface_up() {
394                metric_events.push(
395                    MetricEvent::builder(
396                        metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
397                    )
398                    .as_integer(duration_cobalt.into_micros()),
399                );
400
401                let route_config_dim = convert::convert_route_config(&prev.system_state);
402                let internet_available =
403                    convert::convert_yes_no_dim(prev.system_state.has_internet());
404                let gateway_reachable =
405                    convert::convert_yes_no_dim(prev.system_state.has_gateway());
406                let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
407                let http_status = if prev.system_state.has_http() {
408                    metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
409                } else {
410                    metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
411                };
412                // We only log metric when at least one of IPv4 or IPv6 is in the SystemState.
413                if let Some(route_config) = route_config_dim {
414                    metric_events.push(
415                        MetricEvent::builder(
416                            metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
417                        )
418                        .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
419                            route_config,
420                            internet_available,
421                            gateway_reachable,
422                            dns_active,
423                            http_status,
424                        })
425                        .as_integer(duration_cobalt.into_micros()),
426                    );
427                }
428
429                if prev.system_state.has_internet() {
430                    self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
431                }
432                if prev.system_state.has_dns() {
433                    self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
434                }
435                if prev.system_state.has_http() {
436                    self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
437                }
438            }
439        }
440
441        if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
442            let previously_reachable =
443                prev.system_state.has_internet() && prev.system_state.has_dns();
444            let now_reachable =
445                new_state.system_state.has_internet() && new_state.system_state.has_dns();
446            if previously_reachable && !now_reachable {
447                let route_config_dim = convert::convert_route_config(&prev.system_state);
448                if let Some(route_config_dim) = route_config_dim {
449                    metric_events.push(
450                        MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
451                            .with_event_code(route_config_dim.as_event_code())
452                            .as_occurrence(1),
453                    );
454                    self.reachability_lost_at = Some((now, route_config_dim));
455                }
456                self.stats.lock().reachability_lost_count.log_value(&1);
457            }
458
459            if !previously_reachable && now_reachable {
460                if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
461                    metric_events.push(
462                        MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
463                            .with_event_code(route_config_dim.as_event_code())
464                            .as_integer((now - reachability_lost_at).into_micros()),
465                    );
466                }
467                self.reachability_lost_at = None;
468            }
469        }
470
471        if let Some(new_state) = &new_state {
472            self.stats
473                .lock()
474                .ipv4_state
475                .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
476            self.stats
477                .lock()
478                .ipv6_state
479                .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
480        }
481
482        if !metric_events.is_empty() {
483            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
484        }
485
486        if let Some(new_state) = new_state {
487            self.state_summary = Some(new_state);
488        }
489        self.state_last_refreshed_for_cobalt = now;
490        self.state_last_refreshed_for_inspect = now;
491    }
492
493    async fn log_network_config_metrics(
494        &mut self,
495        new_config: Option<NetworkConfig>,
496        now: fasync::MonotonicInstant,
497        ctx: &'static str,
498    ) {
499        let duration = now - self.network_config_last_refreshed;
500        let mut metric_events = vec![];
501
502        if let Some(prev) = &self.network_config {
503            let default_route_dim = convert::convert_default_route(
504                prev.has_default_ipv4_route,
505                prev.has_default_ipv6_route,
506            );
507            // We only log metric when at least one of IPv4 or IPv6 has default route.
508            if let Some(default_route_dim) = default_route_dim {
509                metric_events.push(
510                    MetricEvent::builder(
511                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
512                    )
513                    .with_event_code(default_route_dim.as_event_code())
514                    .as_integer(duration.into_micros()),
515                );
516                metric_events.push(
517                    MetricEvent::builder(
518                        metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
519                    )
520                    .with_event_code(default_route_dim.as_event_code())
521                    .as_occurrence(1),
522                );
523            }
524        }
525
526        if !metric_events.is_empty() {
527            log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
528        }
529
530        if let Some(new_config) = new_config {
531            self.network_config = Some(new_config);
532        }
533        self.network_config_last_refreshed = now;
534    }
535
536    pub fn handle_ten_secondly_telemetry(&mut self) {
537        let now = fasync::MonotonicInstant::now();
538        let duration_sec_inspect =
539            round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
540
541        self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
542
543        if let Some(current) = &self.state_summary {
544            if current.system_state.has_internet() {
545                self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
546            }
547            if current.system_state.has_dns() {
548                self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
549            }
550            if current.system_state.has_http() {
551                self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
552            }
553
554            self.stats
555                .lock()
556                .ipv4_state
557                .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
558            self.stats
559                .lock()
560                .ipv6_state
561                .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
562        }
563        self.state_last_refreshed_for_inspect = now;
564    }
565
566    pub async fn handle_hourly_telemetry(&mut self) {
567        let now = fasync::MonotonicInstant::now();
568        self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
569        self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use crate::{ApplicationState, LinkState};
576
577    use super::*;
578    use fidl::endpoints::create_proxy_and_stream;
579    use fidl_fuchsia_metrics::MetricEventPayload;
580    use fuchsia_inspect::Inspector;
581    use fuchsia_inspect_contrib::id_enum::IdEnum;
582
583    use futures::task::Poll;
584    use std::pin::Pin;
585    use test_case::test_case;
586    use windowed_stats::experimental::clock::Timed;
587    use windowed_stats::experimental::testing::{MockTimeMatrixClient, TimeMatrixCall};
588
589    const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
590
591    #[test]
592    fn test_log_state_change() {
593        let (mut test_helper, mut test_fut) = setup_test();
594
595        let mut update = SystemStateUpdate {
596            system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
597        };
598        test_helper
599            .telemetry_sender
600            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
601
602        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
603
604        update.system_state = IpVersions {
605            ipv4: Some(State {
606                link: LinkState::Internet,
607                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
608            }),
609            ipv6: None,
610        };
611        test_helper
612            .telemetry_sender
613            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
614
615        test_helper.advance_test_fut(&mut test_fut);
616
617        let logged_metrics = test_helper
618            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
619        assert_eq!(logged_metrics.len(), 1);
620        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
621
622        let logged_metrics = test_helper
623            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
624        assert_eq!(logged_metrics.len(), 1);
625        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
626        assert_eq!(
627            logged_metrics[0].event_codes,
628            &[
629                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
630                    as u32,
631                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
632                    as u32,
633                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
634                    as u32,
635                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
636                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
637                    as u32,
638            ]
639        );
640
641        test_helper.cobalt_events.clear();
642        test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
643
644        // At the 1 hour mark, the new state is logged via periodic telemetry.
645        // All metrics are the same as before, except for the elapsed duration and the
646        // `dns_active` flag is now true.
647        let logged_metrics = test_helper
648            .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
649        assert_eq!(logged_metrics.len(), 1);
650        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
651
652        let logged_metrics = test_helper
653            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
654        assert_eq!(logged_metrics.len(), 1);
655        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
656        assert_eq!(
657            logged_metrics[0].event_codes,
658            &[
659                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
660                    as u32,
661                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
662                    as u32,
663                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
664                    as u32,
665                // This time dns_active is Yes, and http_status is HttpOnly
666                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
667                metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
668                    as u32,
669            ]
670        );
671    }
672
673    #[test]
674    fn test_log_reachability_lost() {
675        let (mut test_helper, mut test_fut) = setup_test();
676
677        let mut update = SystemStateUpdate {
678            system_state: IpVersions {
679                ipv4: None,
680                ipv6: Some(State {
681                    link: LinkState::Internet,
682                    application: ApplicationState { dns_resolved: true, ..Default::default() },
683                }),
684            },
685            ..SystemStateUpdate::default()
686        };
687        test_helper
688            .telemetry_sender
689            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
690        test_helper.advance_test_fut(&mut test_fut);
691
692        update.system_state = IpVersions {
693            ipv4: None,
694            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
695        };
696        test_helper
697            .telemetry_sender
698            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
699        test_helper.advance_test_fut(&mut test_fut);
700
701        // Reachability lost metric is lost because previously both `internet_available` and
702        // `dns_active` were true, and now the latter is false.
703        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
704        assert_eq!(logged_metrics.len(), 1);
705        assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
706        assert_eq!(
707            logged_metrics[0].event_codes,
708            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
709        );
710
711        test_helper.cobalt_events.clear();
712
713        update.system_state = IpVersions {
714            ipv4: None,
715            ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
716        };
717        test_helper
718            .telemetry_sender
719            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
720        test_helper.advance_test_fut(&mut test_fut);
721
722        // Reachability lost metric is not logged again even when `internet_available`
723        // becomes false, because it was already considered lost previously.
724        let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
725        assert_eq!(logged_metrics.len(), 0);
726
727        test_helper.cobalt_events.clear();
728
729        test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
730        update.system_state = IpVersions {
731            ipv4: Some(State {
732                link: LinkState::Internet,
733                application: ApplicationState { dns_resolved: true, ..Default::default() },
734            }),
735            ipv6: None,
736        };
737        test_helper
738            .telemetry_sender
739            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
740        test_helper.advance_test_fut(&mut test_fut);
741
742        // When reachability is recovered, the duration that reachability was lost is logged.
743        let logged_metrics =
744            test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
745        assert_eq!(logged_metrics.len(), 1);
746        assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
747        assert_eq!(
748            logged_metrics[0].event_codes,
749            // Reachability is recovered on ipv4, but we still log the ipv6 dimension because
750            // that was the config when reachability was lost.
751            &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
752        );
753    }
754
755    #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
756    #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
757    #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
758    #[test_case(false, false, None; "no default routes")]
759    #[fuchsia::test(add_test_attr = false)]
760    fn test_log_default_route(
761        has_default_ipv4_route: bool,
762        has_default_ipv6_route: bool,
763        expected_dim: Option<
764            metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
765        >,
766    ) {
767        let (mut test_helper, mut test_fut) = setup_test();
768
769        test_helper
770            .telemetry_sender
771            .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
772        test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
773
774        let logged_metrics = test_helper
775            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
776        match expected_dim {
777            Some(dim) => {
778                assert_eq!(logged_metrics.len(), 1);
779                assert_eq!(
780                    logged_metrics[0].payload,
781                    MetricEventPayload::IntegerValue(3600_000_000)
782                );
783                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
784            }
785            None => {
786                assert_eq!(logged_metrics.len(), 0);
787            }
788        }
789
790        let logged_metrics = test_helper
791            .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
792        match expected_dim {
793            Some(dim) => {
794                assert_eq!(logged_metrics.len(), 1);
795                assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
796                assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
797            }
798            None => {
799                assert_eq!(logged_metrics.len(), 0);
800            }
801        }
802    }
803
804    #[test_case(true, true, vec![]; "negative")]
805    #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
806    #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
807    #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
808    #[fuchsia::test(add_test_attr = false)]
809    fn test_log_abnormal_state_situation(
810        gateway_discoverable: bool,
811        gateway_pingable: bool,
812        expected_metrics: Vec<u32>,
813    ) {
814        let (mut test_helper, mut test_fut) = setup_test();
815
816        test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
817            internet_available: true,
818            gateway_discoverable,
819            gateway_pingable,
820        });
821        test_helper.advance_test_fut(&mut test_fut);
822
823        let logged_metrics: Vec<u32> = [
824            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
825            metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
826            metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
827        ]
828        .into_iter()
829        .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
830        .collect();
831
832        assert_eq!(logged_metrics, expected_metrics);
833    }
834
835    #[test]
836    fn test_state_snapshot_duration_inspect_stats() {
837        let (mut test_helper, mut test_fut) = setup_test();
838
839        test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
840
841        let time_series = test_helper.get_time_series(&mut test_fut);
842        let internet_available_sec: Vec<_> =
843            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
844        let dns_active_sec: Vec<_> =
845            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
846        let http_active_sec: Vec<_> =
847            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
848        let total_duration_sec: Vec<_> =
849            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
850        assert_eq!(internet_available_sec, vec![0]);
851        assert_eq!(dns_active_sec, vec![0]);
852        assert_eq!(http_active_sec, vec![0]);
853        assert_eq!(total_duration_sec, vec![20]);
854
855        let mut update = SystemStateUpdate {
856            system_state: IpVersions {
857                ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
858                ipv6: None,
859            },
860        };
861        test_helper
862            .telemetry_sender
863            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
864        test_helper.advance_test_fut(&mut test_fut);
865        let time_series = test_helper.get_time_series(&mut test_fut);
866        let total_duration_sec: Vec<_> =
867            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
868        assert_eq!(total_duration_sec, vec![25]);
869
870        test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
871
872        // Now 40 seconds mark
873
874        let time_series = test_helper.get_time_series(&mut test_fut);
875        let internet_available_sec: Vec<_> =
876            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
877        let dns_active_sec: Vec<_> =
878            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
879        let http_active_sec: Vec<_> =
880            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
881        let total_duration_sec: Vec<_> =
882            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
883        assert_eq!(internet_available_sec, vec![15]);
884        assert_eq!(dns_active_sec, vec![0]);
885        assert_eq!(http_active_sec, vec![0]);
886        assert_eq!(total_duration_sec, vec![40]);
887
888        update.system_state = IpVersions {
889            ipv4: Some(State {
890                link: LinkState::Internet,
891                application: ApplicationState { dns_resolved: true, ..Default::default() },
892            }),
893            ipv6: None,
894        };
895        test_helper
896            .telemetry_sender
897            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
898
899        test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
900
901        // Now 90 seconds mark
902
903        let time_series = test_helper.get_time_series(&mut test_fut);
904        let internet_available_sec: Vec<_> =
905            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
906        let dns_active_sec: Vec<_> =
907            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
908        let http_active_sec: Vec<_> =
909            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
910        let total_duration_sec: Vec<_> =
911            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
912        assert_eq!(internet_available_sec, vec![25, 40]);
913        assert_eq!(dns_active_sec, vec![10, 40]);
914        assert_eq!(http_active_sec, vec![0]);
915        assert_eq!(total_duration_sec, vec![40]);
916
917        update.system_state = IpVersions {
918            ipv4: Some(State {
919                link: LinkState::Internet,
920                application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
921            }),
922            ipv6: None,
923        };
924        test_helper
925            .telemetry_sender
926            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
927
928        test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
929
930        // Now 120 seconds mark
931
932        let time_series = test_helper.get_time_series(&mut test_fut);
933        let internet_available_sec: Vec<_> =
934            time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
935        let dns_active_sec: Vec<_> =
936            time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
937        let http_active_sec: Vec<_> =
938            time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
939        let total_duration_sec: Vec<_> =
940            time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
941        assert_eq!(internet_available_sec, vec![25, 60, 40]);
942        assert_eq!(dns_active_sec, vec![10, 60, 40]);
943        assert_eq!(http_active_sec, vec![0, 20, 40]);
944        assert_eq!(total_duration_sec, vec![40]);
945    }
946
947    #[test]
948    fn test_reachability_lost_count_inspect_stats() {
949        let (mut test_helper, mut test_fut) = setup_test();
950
951        let mut update = SystemStateUpdate {
952            system_state: IpVersions {
953                ipv4: None,
954                ipv6: Some(State {
955                    link: LinkState::Internet,
956                    application: ApplicationState { dns_resolved: true, ..Default::default() },
957                }),
958            },
959            ..SystemStateUpdate::default()
960        };
961        test_helper
962            .telemetry_sender
963            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
964        test_helper.advance_test_fut(&mut test_fut);
965
966        let time_series = test_helper.get_time_series(&mut test_fut);
967        let reachability_lost_count: Vec<_> =
968            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
969        assert_eq!(reachability_lost_count, vec![0]);
970
971        update.system_state = IpVersions {
972            ipv4: None,
973            ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
974        };
975        test_helper
976            .telemetry_sender
977            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
978        test_helper.advance_test_fut(&mut test_fut);
979
980        let time_series = test_helper.get_time_series(&mut test_fut);
981        let reachability_lost_count: Vec<_> =
982            time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
983        assert_eq!(reachability_lost_count, vec![1]);
984    }
985
986    #[test]
987    fn test_avg_state_inspect_stats() {
988        let (mut test_helper, mut test_fut) = setup_test();
989
990        let mut update = SystemStateUpdate {
991            system_state: IpVersions {
992                ipv4: None,
993                ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
994            },
995            ..SystemStateUpdate::default()
996        };
997        test_helper
998            .telemetry_sender
999            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1000        test_helper.advance_test_fut(&mut test_fut);
1001
1002        let time_series = test_helper.get_time_series(&mut test_fut);
1003        let ipv4_state: Vec<_> =
1004            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1005        let ipv6_state: Vec<_> =
1006            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1007        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
1008        assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
1009
1010        update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
1011        test_helper
1012            .telemetry_sender
1013            .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1014        test_helper.advance_test_fut(&mut test_fut);
1015
1016        let time_series = test_helper.get_time_series(&mut test_fut);
1017        let ipv4_state: Vec<_> =
1018            time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1019        let ipv6_state: Vec<_> =
1020            time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1021        assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
1022        assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
1023    }
1024
1025    #[test]
1026    fn test_link_properties_update() {
1027        let (mut test_helper, mut test_fut) = setup_test();
1028
1029        // Iterate through all of the different combinations of booleans in the
1030        // LinkProperties struct. There are 4 different booleans, so 2^4
1031        // different combinations.
1032        for i in 0..=15 {
1033            let link_properties = LinkProperties::from(i);
1034            let mut expected_data_vec = vec![];
1035            // Index 0 indicates that all the booleans are false which is the
1036            // same as the default value of LinkProperties. In this case no
1037            // data is reported, so expected_data_vec should be empty.
1038            if i != 0 {
1039                expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(i)));
1040            }
1041
1042            //  There should be no calls to the `TYPE_ethernet` or `TYPE_wlanclient`
1043            // time series since nothing has been logged yet.
1044            let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1045            assert_eq!(
1046                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1047                &[]
1048            );
1049            assert_eq!(
1050                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1051                &[]
1052            );
1053            assert_eq!(
1054                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1055                &[]
1056            );
1057            assert_eq!(
1058                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1059                &[]
1060            );
1061
1062            test_helper.telemetry_sender.send(TelemetryEvent::LinkPropertiesUpdate {
1063                interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1064                link_properties: IpVersions {
1065                    ipv4: link_properties.clone(),
1066                    ipv6: link_properties,
1067                },
1068            });
1069            test_helper.advance_test_fut(&mut test_fut);
1070
1071            time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1072            assert_eq!(
1073                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1074                expected_data_vec
1075            );
1076            assert_eq!(
1077                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1078                expected_data_vec
1079            );
1080            // There should be no calls to the `TYPE_wlanclient` time series since the
1081            // update above was for `Ethernet`.
1082            assert_eq!(
1083                &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1084                &[]
1085            );
1086            assert_eq!(
1087                &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1088                &[]
1089            );
1090        }
1091    }
1092
1093    #[test]
1094    fn test_link_state_update() {
1095        let (mut test_helper, mut test_fut) = setup_test();
1096
1097        // Iterate through all of the different variants in the LinkState enum.
1098        // LinkState::None is the default so there should be no data recorded
1099        // for that value.
1100        let initial_value = LinkState::None.to_id() as u64;
1101        let final_value = LinkState::Internet.to_id() as u64;
1102        for i in initial_value..=final_value {
1103            let link_state = i.into();
1104            let mut expected_data_vec = vec![];
1105            if i != initial_value {
1106                expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(1 << i)));
1107            }
1108
1109            //  There should be no calls to the `TYPE_ethernet` or `TYPE_wlanclient`
1110            // time series since nothing has been logged yet.
1111            let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1112            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..], &[]);
1113            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..], &[]);
1114            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1115            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1116
1117            test_helper.telemetry_sender.send(TelemetryEvent::LinkStateUpdate {
1118                interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1119                link_state: IpVersions { ipv4: link_state, ipv6: link_state },
1120            });
1121            test_helper.advance_test_fut(&mut test_fut);
1122
1123            time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1124            assert_eq!(
1125                &time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..],
1126                expected_data_vec
1127            );
1128            assert_eq!(
1129                &time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..],
1130                expected_data_vec
1131            );
1132            // There should be no calls to the `TYPE_wlanclient` time series since the
1133            // update above was for `Ethernet`.
1134            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1135            assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1136        }
1137    }
1138
1139    struct TestHelper {
1140        telemetry_sender: TelemetrySender,
1141        _inspector: Inspector,
1142        cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
1143        /// As requests to Cobalt are responded to via `self.drain_cobalt_events()`,
1144        /// their payloads are drained to this HashMap.
1145        cobalt_events: Vec<MetricEvent>,
1146        mock_time_matrix_client: MockTimeMatrixClient,
1147
1148        // Note: keep the executor field last in the struct so it gets dropped last.
1149        exec: fasync::TestExecutor,
1150    }
1151
1152    impl TestHelper {
1153        /// Advance executor until stalled, taking care of any blocking requests
1154        fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
1155            let mut made_progress = true;
1156            while made_progress {
1157                let _result = self.exec.run_until_stalled(test_fut);
1158                made_progress = false;
1159                while let Poll::Ready(Some(Ok(req))) =
1160                    self.exec.run_until_stalled(&mut self.cobalt_stream.next())
1161                {
1162                    self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
1163                    made_progress = true;
1164                }
1165            }
1166
1167            match self.exec.run_until_stalled(test_fut) {
1168                Poll::Pending => (),
1169                _ => panic!("expect test_fut to resolve to Poll::Pending"),
1170            }
1171        }
1172
1173        /// Advance executor by `duration`.
1174        /// This function repeatedly advances the executor by 1 second, triggering
1175        /// any expired timers and running the test_fut, until `duration` is reached.
1176        fn advance_by<T>(
1177            &mut self,
1178            duration: zx::MonotonicDuration,
1179            test_fut: &mut (impl Future<Output = T> + Unpin),
1180        ) {
1181            assert_eq!(
1182                duration.into_nanos() % STEP_INCREMENT.into_nanos(),
1183                0,
1184                "duration {:?} is not divisible by STEP_INCREMENT",
1185                duration,
1186            );
1187            const_assert_eq!(
1188                TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1189                0
1190            );
1191
1192            self.advance_test_fut(test_fut);
1193
1194            for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1195                self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1196                let _ = self.exec.wake_expired_timers();
1197                self.advance_test_fut(test_fut);
1198            }
1199        }
1200
1201        fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1202            self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1203        }
1204
1205        fn get_time_series<T>(
1206            &mut self,
1207            test_fut: &mut (impl Future<Output = T> + Unpin),
1208        ) -> Arc<Mutex<Stats>> {
1209            let (sender, mut receiver) = oneshot::channel();
1210            self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1211            self.advance_test_fut(test_fut);
1212            match receiver.try_recv() {
1213                Ok(Some(stats)) => stats,
1214                _ => panic!("Expect Stats to be returned"),
1215            }
1216        }
1217    }
1218
1219    trait CobaltExt {
1220        // Respond to MetricEventLoggerRequest and extract its MetricEvent.
1221        fn respond_to_metric_req(
1222            self,
1223            result: Result<(), fidl_fuchsia_metrics::Error>,
1224        ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1225    }
1226
1227    impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1228        fn respond_to_metric_req(
1229            self,
1230            result: Result<(), fidl_fuchsia_metrics::Error>,
1231        ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1232            match self {
1233                Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1234                    assert!(responder.send(result).is_ok());
1235                    vec![MetricEvent {
1236                        metric_id,
1237                        event_codes,
1238                        payload: MetricEventPayload::Count(count),
1239                    }]
1240                }
1241                Self::LogInteger { metric_id, value, event_codes, responder } => {
1242                    assert!(responder.send(result).is_ok());
1243                    vec![MetricEvent {
1244                        metric_id,
1245                        event_codes,
1246                        payload: MetricEventPayload::IntegerValue(value),
1247                    }]
1248                }
1249                Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1250                    assert!(responder.send(result).is_ok());
1251                    vec![MetricEvent {
1252                        metric_id,
1253                        event_codes,
1254                        payload: MetricEventPayload::Histogram(histogram),
1255                    }]
1256                }
1257                Self::LogString { metric_id, string_value, event_codes, responder } => {
1258                    assert!(responder.send(result).is_ok());
1259                    vec![MetricEvent {
1260                        metric_id,
1261                        event_codes,
1262                        payload: MetricEventPayload::StringValue(string_value),
1263                    }]
1264                }
1265                Self::LogMetricEvents { events, responder } => {
1266                    assert!(responder.send(result).is_ok());
1267                    events
1268                }
1269            }
1270        }
1271    }
1272
1273    fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = Result<(), Error>>>>) {
1274        let mut exec = fasync::TestExecutor::new_with_fake_time();
1275        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1276
1277        let (cobalt_proxy, cobalt_stream) =
1278            create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1279
1280        let inspector = Inspector::default();
1281        let inspect_node = inspector.root().create_child("telemetrytest");
1282        let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
1283        let mock_time_matrix_client = MockTimeMatrixClient::new();
1284
1285        let link_properties_state_logger = LinkPropertiesStateLogger::new(
1286            &inspect_metadata_node,
1287            &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1288            InterfaceTimeSeriesGrouping::Type(vec![
1289                InterfaceType::Ethernet,
1290                InterfaceType::WlanClient,
1291            ]),
1292            &mock_time_matrix_client,
1293        );
1294
1295        let (telemetry_sender, test_fut) =
1296            serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
1297        let mut test_fut = Box::pin(test_fut);
1298
1299        assert_matches::assert_matches!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1300
1301        let test_helper = TestHelper {
1302            telemetry_sender,
1303            _inspector: inspector,
1304            cobalt_stream,
1305            cobalt_events: vec![],
1306            mock_time_matrix_client,
1307            exec,
1308        };
1309        (test_helper, test_fut)
1310    }
1311}