1mod convert;
6mod inspect;
7pub mod processors;
8
9use self::inspect::{Stats, inspect_record_stats};
10use crate::{IpVersions, LinkState, State};
11use processors::link_properties_state::{
12 InterfaceIdentifier, InterfaceTimeSeriesGrouping, InterfaceType, LinkProperties,
13 LinkPropertiesStateLogger,
14};
15
16use anyhow::{Context, Error, format_err};
17use cobalt_client::traits::AsEventCode;
18use fidl_fuchsia_metrics::MetricEvent;
19use fuchsia_cobalt_builders::MetricEventExt;
20use fuchsia_inspect::Node as InspectNode;
21use fuchsia_sync::Mutex;
22use futures::channel::{mpsc, oneshot};
23use futures::{Future, StreamExt, select};
24use log::{info, warn};
25use static_assertions::const_assert_eq;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use windowed_stats::aggregations::SumAndCount;
29use windowed_stats::experimental::inspect::TimeMatrixClient;
30use {fuchsia_async as fasync, network_policy_metrics_registry as metrics};
31
32#[cfg(test)]
33mod testing;
34
35pub async fn create_metrics_logger(
36 factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
37) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
38 let (cobalt_proxy, cobalt_server) =
39 fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
40
41 let project_spec = fidl_fuchsia_metrics::ProjectSpec {
42 customer_id: None, project_id: Some(metrics::PROJECT_ID),
44 ..Default::default()
45 };
46
47 let status = factory_proxy
48 .create_metric_event_logger(&project_spec, cobalt_server)
49 .await
50 .context("create metrics event logger")?;
51
52 match status {
53 Ok(_) => Ok(cobalt_proxy),
54 Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
55 }
56}
57
58#[derive(Clone, Debug)]
59pub struct TelemetrySender {
60 sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
61 sender_is_blocked: Arc<AtomicBool>,
62}
63
64impl TelemetrySender {
65 pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
66 Self {
67 sender: Arc::new(Mutex::new(sender)),
68 sender_is_blocked: Arc::new(AtomicBool::new(false)),
69 }
70 }
71
72 pub fn send(&self, event: TelemetryEvent) {
74 match self.sender.lock().try_send(event) {
75 Ok(_) => {
76 if self
78 .sender_is_blocked
79 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
80 .is_ok()
81 {
82 info!("TelemetrySender recovered and resumed sending");
83 }
84 }
85 Err(_) => {
86 if self
88 .sender_is_blocked
89 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
90 .is_ok()
91 {
92 warn!(
93 "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
94 );
95 }
96 }
97 }
98 }
99}
100
101#[derive(Debug, Clone, Default, PartialEq)]
102struct SystemStateSummary {
103 system_state: IpVersions<Option<State>>,
104}
105
106impl SystemStateSummary {
107 fn ipv4_link_state_val(&self) -> u32 {
108 match self.system_state.ipv4 {
109 Some(s) => s.link as u32,
110 None => 0u32,
111 }
112 }
113
114 fn ipv6_link_state_val(&self) -> u32 {
115 match self.system_state.ipv6 {
116 Some(s) => s.link as u32,
117 None => 0u32,
118 }
119 }
120}
121
122#[derive(Debug, Clone, Default, PartialEq)]
123struct NetworkConfig {
124 has_default_ipv4_route: bool,
125 has_default_ipv6_route: bool,
126}
127
128#[derive(Debug, Clone, Default)]
129pub struct SystemStateUpdate {
130 pub(crate) system_state: IpVersions<Option<State>>,
131}
132
133#[derive(Debug)]
134pub enum TelemetryEvent {
135 SystemStateUpdate {
136 update: SystemStateUpdate,
137 },
138 NetworkConfig {
139 has_default_ipv4_route: bool,
140 has_default_ipv6_route: bool,
141 },
142 GatewayProbe {
143 internet_available: bool,
144 gateway_discoverable: bool,
145 gateway_pingable: bool,
146 },
147 LinkPropertiesUpdate {
150 interface_identifiers: Vec<InterfaceIdentifier>,
151 link_properties: IpVersions<LinkProperties>,
152 },
153 LinkStateUpdate {
156 interface_identifiers: Vec<InterfaceIdentifier>,
157 link_state: IpVersions<LinkState>,
158 },
159 GetTimeSeries {
161 sender: oneshot::Sender<Arc<Mutex<Stats>>>,
162 },
163}
164
165const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
168
169const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
170const METADATA_NODE_NAME: &str = "metadata";
171
172pub fn serve_telemetry(
173 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
174 inspect_node: InspectNode,
175) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
176 let inspect_time_series_node = inspect_node.create_child("time_series");
178 let link_properties_state_time_series_node =
179 inspect_time_series_node.create_child("link_properties_state");
180 let time_matrix_client =
181 TimeMatrixClient::new(link_properties_state_time_series_node.clone_weak());
182
183 inspect_node.record(inspect_time_series_node);
184 inspect_node.record(link_properties_state_time_series_node);
185 let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
186 let link_properties_state_logger = LinkPropertiesStateLogger::new(
187 &inspect_metadata_node,
188 &format!("root/telemetry/{METADATA_NODE_NAME}"),
189 InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
190 &time_matrix_client,
191 );
192 inspect_node.record(inspect_metadata_node);
194
195 let (sender, fut) =
196 serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
197 (sender, fut)
198}
199
200fn serve_telemetry_inner(
201 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
202 inspect_node: InspectNode,
203 link_properties_state_logger: LinkPropertiesStateLogger,
204) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
205 let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
206 let sender = TelemetrySender::new(sender);
207 let cloned_sender = sender.clone();
208
209 let fut = async move {
210 let link_properties_state_logger = link_properties_state_logger;
211
212 let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
213 const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
214 const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
215 const INTERVAL_TICKS_PER_MINUTE: u64 =
216 (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
217 const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
218 let mut interval_tick = 0u64;
219 let mut telemetry =
220 Telemetry::new(cloned_sender, cobalt_proxy, link_properties_state_logger, inspect_node);
221 loop {
222 select! {
223 event = receiver.next() => {
224 if let Some(event) = event {
225 telemetry.handle_telemetry_event(event).await;
226 }
227 }
228 _ = report_interval_stream.next() => {
229 telemetry.handle_ten_secondly_telemetry();
230
231 interval_tick += 1;
232 if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
233 telemetry.handle_hourly_telemetry().await;
234 }
235 }
236 }
237 }
238 };
239 (sender, fut)
240}
241
242macro_rules! log_cobalt {
245 ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
246 let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
247 match status {
248 Ok(Ok(())) => (),
249 Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
250 Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
251 }
252 }};
253}
254
255macro_rules! log_cobalt_batch {
256 ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
257 let status = $cobalt_proxy.log_metric_events($events).await;
258 match status {
259 Ok(Ok(())) => (),
260 Ok(Err(e)) => {
261 warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
262 }
263 Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
264 }
265 }};
266}
267
268fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
269 const MILLIS_PER_SEC: i64 = 1000;
270 let millis = duration.into_millis();
271 let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
272 millis / MILLIS_PER_SEC + rounded_portion
273}
274
275struct Telemetry {
276 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
277 state_summary: Option<SystemStateSummary>,
278 state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
279 state_last_refreshed_for_inspect: fasync::MonotonicInstant,
280 reachability_lost_at: Option<(
281 fasync::MonotonicInstant,
282 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
283 )>,
284 network_config: Option<NetworkConfig>,
285 network_config_last_refreshed: fasync::MonotonicInstant,
286 link_properties_state_logger: LinkPropertiesStateLogger,
287
288 _inspect_node: InspectNode,
289 stats: Arc<Mutex<Stats>>,
290}
291
292impl Telemetry {
293 pub fn new(
294 _telemetry_sender: TelemetrySender,
295 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
296 link_properties_state_logger: LinkPropertiesStateLogger,
297 inspect_node: InspectNode,
298 ) -> Self {
299 let stats = Arc::new(Mutex::new(Stats::new()));
300 inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
301 Self {
302 cobalt_proxy,
303 state_summary: None,
304 state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
305 state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
306 reachability_lost_at: None,
307 network_config: None,
308 network_config_last_refreshed: fasync::MonotonicInstant::now(),
309 link_properties_state_logger,
310 _inspect_node: inspect_node,
311 stats,
312 }
313 }
314
315 pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
316 let now = fasync::MonotonicInstant::now();
317 match event {
318 TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
319 let new_state = Some(match &self.state_summary {
320 Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
321 None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
322 });
323 if self.state_summary != new_state {
324 self.log_system_state_metrics(
327 new_state,
328 now,
329 "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
330 )
331 .await
332 }
333 }
334 TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
335 let new_config =
336 Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
337 self.log_network_config_metrics(
338 new_config,
339 now,
340 "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
341 )
342 .await;
343 }
344 TelemetryEvent::GatewayProbe {
345 internet_available,
346 gateway_discoverable,
347 gateway_pingable,
348 } => {
349 if internet_available {
350 let metric_id = match (gateway_discoverable, gateway_pingable) {
351 (true, true) => None,
352 (true, false) => {
353 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
354 }
355 (false, true) => {
356 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
357 }
358 (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
359 };
360 if let Some(metric_id) = metric_id {
361 log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
362 }
363 }
364 }
365 TelemetryEvent::LinkPropertiesUpdate { interface_identifiers, link_properties } => {
366 self.link_properties_state_logger
367 .update_link_properties(interface_identifiers, &link_properties);
368 }
369 TelemetryEvent::LinkStateUpdate { interface_identifiers, link_state } => {
370 self.link_properties_state_logger
371 .update_link_state(interface_identifiers, &link_state);
372 }
373 TelemetryEvent::GetTimeSeries { sender } => {
374 let _result = sender.send(Arc::clone(&self.stats));
375 }
376 }
377 }
378
379 async fn log_system_state_metrics(
380 &mut self,
381 new_state: Option<SystemStateSummary>,
382 now: fasync::MonotonicInstant,
383 ctx: &'static str,
384 ) {
385 let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
386 let duration_sec_inspect =
387 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
388 let mut metric_events = vec![];
389
390 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
391
392 if let Some(prev) = &self.state_summary {
393 if prev.system_state.has_interface_up() {
394 metric_events.push(
395 MetricEvent::builder(
396 metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
397 )
398 .as_integer(duration_cobalt.into_micros()),
399 );
400
401 let route_config_dim = convert::convert_route_config(&prev.system_state);
402 let internet_available =
403 convert::convert_yes_no_dim(prev.system_state.has_internet());
404 let gateway_reachable =
405 convert::convert_yes_no_dim(prev.system_state.has_gateway());
406 let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
407 let http_status = if prev.system_state.has_http() {
408 metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
409 } else {
410 metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
411 };
412 if let Some(route_config) = route_config_dim {
414 metric_events.push(
415 MetricEvent::builder(
416 metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
417 )
418 .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
419 route_config,
420 internet_available,
421 gateway_reachable,
422 dns_active,
423 http_status,
424 })
425 .as_integer(duration_cobalt.into_micros()),
426 );
427 }
428
429 if prev.system_state.has_internet() {
430 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
431 }
432 if prev.system_state.has_dns() {
433 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
434 }
435 if prev.system_state.has_http() {
436 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
437 }
438 }
439 }
440
441 if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
442 let previously_reachable =
443 prev.system_state.has_internet() && prev.system_state.has_dns();
444 let now_reachable =
445 new_state.system_state.has_internet() && new_state.system_state.has_dns();
446 if previously_reachable && !now_reachable {
447 let route_config_dim = convert::convert_route_config(&prev.system_state);
448 if let Some(route_config_dim) = route_config_dim {
449 metric_events.push(
450 MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
451 .with_event_code(route_config_dim.as_event_code())
452 .as_occurrence(1),
453 );
454 self.reachability_lost_at = Some((now, route_config_dim));
455 }
456 self.stats.lock().reachability_lost_count.log_value(&1);
457 }
458
459 if !previously_reachable && now_reachable {
460 if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
461 metric_events.push(
462 MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
463 .with_event_code(route_config_dim.as_event_code())
464 .as_integer((now - reachability_lost_at).into_micros()),
465 );
466 }
467 self.reachability_lost_at = None;
468 }
469 }
470
471 if let Some(new_state) = &new_state {
472 self.stats
473 .lock()
474 .ipv4_state
475 .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
476 self.stats
477 .lock()
478 .ipv6_state
479 .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
480 }
481
482 if !metric_events.is_empty() {
483 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
484 }
485
486 if let Some(new_state) = new_state {
487 self.state_summary = Some(new_state);
488 }
489 self.state_last_refreshed_for_cobalt = now;
490 self.state_last_refreshed_for_inspect = now;
491 }
492
493 async fn log_network_config_metrics(
494 &mut self,
495 new_config: Option<NetworkConfig>,
496 now: fasync::MonotonicInstant,
497 ctx: &'static str,
498 ) {
499 let duration = now - self.network_config_last_refreshed;
500 let mut metric_events = vec![];
501
502 if let Some(prev) = &self.network_config {
503 let default_route_dim = convert::convert_default_route(
504 prev.has_default_ipv4_route,
505 prev.has_default_ipv6_route,
506 );
507 if let Some(default_route_dim) = default_route_dim {
509 metric_events.push(
510 MetricEvent::builder(
511 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
512 )
513 .with_event_code(default_route_dim.as_event_code())
514 .as_integer(duration.into_micros()),
515 );
516 metric_events.push(
517 MetricEvent::builder(
518 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
519 )
520 .with_event_code(default_route_dim.as_event_code())
521 .as_occurrence(1),
522 );
523 }
524 }
525
526 if !metric_events.is_empty() {
527 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
528 }
529
530 if let Some(new_config) = new_config {
531 self.network_config = Some(new_config);
532 }
533 self.network_config_last_refreshed = now;
534 }
535
536 pub fn handle_ten_secondly_telemetry(&mut self) {
537 let now = fasync::MonotonicInstant::now();
538 let duration_sec_inspect =
539 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
540
541 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
542
543 if let Some(current) = &self.state_summary {
544 if current.system_state.has_internet() {
545 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
546 }
547 if current.system_state.has_dns() {
548 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
549 }
550 if current.system_state.has_http() {
551 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
552 }
553
554 self.stats
555 .lock()
556 .ipv4_state
557 .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
558 self.stats
559 .lock()
560 .ipv6_state
561 .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
562 }
563 self.state_last_refreshed_for_inspect = now;
564 }
565
566 pub async fn handle_hourly_telemetry(&mut self) {
567 let now = fasync::MonotonicInstant::now();
568 self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
569 self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use crate::{ApplicationState, LinkState};
576
577 use super::*;
578 use fidl::endpoints::create_proxy_and_stream;
579 use fidl_fuchsia_metrics::MetricEventPayload;
580 use fuchsia_inspect::Inspector;
581 use fuchsia_inspect_contrib::id_enum::IdEnum;
582
583 use futures::task::Poll;
584 use std::pin::Pin;
585 use test_case::test_case;
586 use windowed_stats::experimental::clock::Timed;
587 use windowed_stats::experimental::testing::{MockTimeMatrixClient, TimeMatrixCall};
588
589 const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
590
591 #[test]
592 fn test_log_state_change() {
593 let (mut test_helper, mut test_fut) = setup_test();
594
595 let mut update = SystemStateUpdate {
596 system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
597 };
598 test_helper
599 .telemetry_sender
600 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
601
602 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
603
604 update.system_state = IpVersions {
605 ipv4: Some(State {
606 link: LinkState::Internet,
607 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
608 }),
609 ipv6: None,
610 };
611 test_helper
612 .telemetry_sender
613 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
614
615 test_helper.advance_test_fut(&mut test_fut);
616
617 let logged_metrics = test_helper
618 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
619 assert_eq!(logged_metrics.len(), 1);
620 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
621
622 let logged_metrics = test_helper
623 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
624 assert_eq!(logged_metrics.len(), 1);
625 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
626 assert_eq!(
627 logged_metrics[0].event_codes,
628 &[
629 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
630 as u32,
631 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
632 as u32,
633 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
634 as u32,
635 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
636 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
637 as u32,
638 ]
639 );
640
641 test_helper.cobalt_events.clear();
642 test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
643
644 let logged_metrics = test_helper
648 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
649 assert_eq!(logged_metrics.len(), 1);
650 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
651
652 let logged_metrics = test_helper
653 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
654 assert_eq!(logged_metrics.len(), 1);
655 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
656 assert_eq!(
657 logged_metrics[0].event_codes,
658 &[
659 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
660 as u32,
661 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
662 as u32,
663 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
664 as u32,
665 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
667 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
668 as u32,
669 ]
670 );
671 }
672
673 #[test]
674 fn test_log_reachability_lost() {
675 let (mut test_helper, mut test_fut) = setup_test();
676
677 let mut update = SystemStateUpdate {
678 system_state: IpVersions {
679 ipv4: None,
680 ipv6: Some(State {
681 link: LinkState::Internet,
682 application: ApplicationState { dns_resolved: true, ..Default::default() },
683 }),
684 },
685 ..SystemStateUpdate::default()
686 };
687 test_helper
688 .telemetry_sender
689 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
690 test_helper.advance_test_fut(&mut test_fut);
691
692 update.system_state = IpVersions {
693 ipv4: None,
694 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
695 };
696 test_helper
697 .telemetry_sender
698 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
699 test_helper.advance_test_fut(&mut test_fut);
700
701 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
704 assert_eq!(logged_metrics.len(), 1);
705 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
706 assert_eq!(
707 logged_metrics[0].event_codes,
708 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
709 );
710
711 test_helper.cobalt_events.clear();
712
713 update.system_state = IpVersions {
714 ipv4: None,
715 ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
716 };
717 test_helper
718 .telemetry_sender
719 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
720 test_helper.advance_test_fut(&mut test_fut);
721
722 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
725 assert_eq!(logged_metrics.len(), 0);
726
727 test_helper.cobalt_events.clear();
728
729 test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
730 update.system_state = IpVersions {
731 ipv4: Some(State {
732 link: LinkState::Internet,
733 application: ApplicationState { dns_resolved: true, ..Default::default() },
734 }),
735 ipv6: None,
736 };
737 test_helper
738 .telemetry_sender
739 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
740 test_helper.advance_test_fut(&mut test_fut);
741
742 let logged_metrics =
744 test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
745 assert_eq!(logged_metrics.len(), 1);
746 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
747 assert_eq!(
748 logged_metrics[0].event_codes,
749 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
752 );
753 }
754
755 #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
756 #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
757 #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
758 #[test_case(false, false, None; "no default routes")]
759 #[fuchsia::test(add_test_attr = false)]
760 fn test_log_default_route(
761 has_default_ipv4_route: bool,
762 has_default_ipv6_route: bool,
763 expected_dim: Option<
764 metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
765 >,
766 ) {
767 let (mut test_helper, mut test_fut) = setup_test();
768
769 test_helper
770 .telemetry_sender
771 .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
772 test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
773
774 let logged_metrics = test_helper
775 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
776 match expected_dim {
777 Some(dim) => {
778 assert_eq!(logged_metrics.len(), 1);
779 assert_eq!(
780 logged_metrics[0].payload,
781 MetricEventPayload::IntegerValue(3600_000_000)
782 );
783 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
784 }
785 None => {
786 assert_eq!(logged_metrics.len(), 0);
787 }
788 }
789
790 let logged_metrics = test_helper
791 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
792 match expected_dim {
793 Some(dim) => {
794 assert_eq!(logged_metrics.len(), 1);
795 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
796 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
797 }
798 None => {
799 assert_eq!(logged_metrics.len(), 0);
800 }
801 }
802 }
803
804 #[test_case(true, true, vec![]; "negative")]
805 #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
806 #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
807 #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
808 #[fuchsia::test(add_test_attr = false)]
809 fn test_log_abnormal_state_situation(
810 gateway_discoverable: bool,
811 gateway_pingable: bool,
812 expected_metrics: Vec<u32>,
813 ) {
814 let (mut test_helper, mut test_fut) = setup_test();
815
816 test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
817 internet_available: true,
818 gateway_discoverable,
819 gateway_pingable,
820 });
821 test_helper.advance_test_fut(&mut test_fut);
822
823 let logged_metrics: Vec<u32> = [
824 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
825 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
826 metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
827 ]
828 .into_iter()
829 .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
830 .collect();
831
832 assert_eq!(logged_metrics, expected_metrics);
833 }
834
835 #[test]
836 fn test_state_snapshot_duration_inspect_stats() {
837 let (mut test_helper, mut test_fut) = setup_test();
838
839 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
840
841 let time_series = test_helper.get_time_series(&mut test_fut);
842 let internet_available_sec: Vec<_> =
843 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
844 let dns_active_sec: Vec<_> =
845 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
846 let http_active_sec: Vec<_> =
847 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
848 let total_duration_sec: Vec<_> =
849 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
850 assert_eq!(internet_available_sec, vec![0]);
851 assert_eq!(dns_active_sec, vec![0]);
852 assert_eq!(http_active_sec, vec![0]);
853 assert_eq!(total_duration_sec, vec![20]);
854
855 let mut update = SystemStateUpdate {
856 system_state: IpVersions {
857 ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
858 ipv6: None,
859 },
860 };
861 test_helper
862 .telemetry_sender
863 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
864 test_helper.advance_test_fut(&mut test_fut);
865 let time_series = test_helper.get_time_series(&mut test_fut);
866 let total_duration_sec: Vec<_> =
867 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
868 assert_eq!(total_duration_sec, vec![25]);
869
870 test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
871
872 let time_series = test_helper.get_time_series(&mut test_fut);
875 let internet_available_sec: Vec<_> =
876 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
877 let dns_active_sec: Vec<_> =
878 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
879 let http_active_sec: Vec<_> =
880 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
881 let total_duration_sec: Vec<_> =
882 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
883 assert_eq!(internet_available_sec, vec![15]);
884 assert_eq!(dns_active_sec, vec![0]);
885 assert_eq!(http_active_sec, vec![0]);
886 assert_eq!(total_duration_sec, vec![40]);
887
888 update.system_state = IpVersions {
889 ipv4: Some(State {
890 link: LinkState::Internet,
891 application: ApplicationState { dns_resolved: true, ..Default::default() },
892 }),
893 ipv6: None,
894 };
895 test_helper
896 .telemetry_sender
897 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
898
899 test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
900
901 let time_series = test_helper.get_time_series(&mut test_fut);
904 let internet_available_sec: Vec<_> =
905 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
906 let dns_active_sec: Vec<_> =
907 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
908 let http_active_sec: Vec<_> =
909 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
910 let total_duration_sec: Vec<_> =
911 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
912 assert_eq!(internet_available_sec, vec![25, 40]);
913 assert_eq!(dns_active_sec, vec![10, 40]);
914 assert_eq!(http_active_sec, vec![0]);
915 assert_eq!(total_duration_sec, vec![40]);
916
917 update.system_state = IpVersions {
918 ipv4: Some(State {
919 link: LinkState::Internet,
920 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
921 }),
922 ipv6: None,
923 };
924 test_helper
925 .telemetry_sender
926 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
927
928 test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
929
930 let time_series = test_helper.get_time_series(&mut test_fut);
933 let internet_available_sec: Vec<_> =
934 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
935 let dns_active_sec: Vec<_> =
936 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
937 let http_active_sec: Vec<_> =
938 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
939 let total_duration_sec: Vec<_> =
940 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
941 assert_eq!(internet_available_sec, vec![25, 60, 40]);
942 assert_eq!(dns_active_sec, vec![10, 60, 40]);
943 assert_eq!(http_active_sec, vec![0, 20, 40]);
944 assert_eq!(total_duration_sec, vec![40]);
945 }
946
947 #[test]
948 fn test_reachability_lost_count_inspect_stats() {
949 let (mut test_helper, mut test_fut) = setup_test();
950
951 let mut update = SystemStateUpdate {
952 system_state: IpVersions {
953 ipv4: None,
954 ipv6: Some(State {
955 link: LinkState::Internet,
956 application: ApplicationState { dns_resolved: true, ..Default::default() },
957 }),
958 },
959 ..SystemStateUpdate::default()
960 };
961 test_helper
962 .telemetry_sender
963 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
964 test_helper.advance_test_fut(&mut test_fut);
965
966 let time_series = test_helper.get_time_series(&mut test_fut);
967 let reachability_lost_count: Vec<_> =
968 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
969 assert_eq!(reachability_lost_count, vec![0]);
970
971 update.system_state = IpVersions {
972 ipv4: None,
973 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
974 };
975 test_helper
976 .telemetry_sender
977 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
978 test_helper.advance_test_fut(&mut test_fut);
979
980 let time_series = test_helper.get_time_series(&mut test_fut);
981 let reachability_lost_count: Vec<_> =
982 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
983 assert_eq!(reachability_lost_count, vec![1]);
984 }
985
986 #[test]
987 fn test_avg_state_inspect_stats() {
988 let (mut test_helper, mut test_fut) = setup_test();
989
990 let mut update = SystemStateUpdate {
991 system_state: IpVersions {
992 ipv4: None,
993 ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
994 },
995 ..SystemStateUpdate::default()
996 };
997 test_helper
998 .telemetry_sender
999 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1000 test_helper.advance_test_fut(&mut test_fut);
1001
1002 let time_series = test_helper.get_time_series(&mut test_fut);
1003 let ipv4_state: Vec<_> =
1004 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1005 let ipv6_state: Vec<_> =
1006 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1007 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
1008 assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
1009
1010 update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
1011 test_helper
1012 .telemetry_sender
1013 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1014 test_helper.advance_test_fut(&mut test_fut);
1015
1016 let time_series = test_helper.get_time_series(&mut test_fut);
1017 let ipv4_state: Vec<_> =
1018 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1019 let ipv6_state: Vec<_> =
1020 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1021 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
1022 assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
1023 }
1024
1025 #[test]
1026 fn test_link_properties_update() {
1027 let (mut test_helper, mut test_fut) = setup_test();
1028
1029 for i in 0..=15 {
1033 let link_properties = LinkProperties::from(i);
1034 let mut expected_data_vec = vec![];
1035 if i != 0 {
1039 expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(i)));
1040 }
1041
1042 let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1045 assert_eq!(
1046 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1047 &[]
1048 );
1049 assert_eq!(
1050 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1051 &[]
1052 );
1053 assert_eq!(
1054 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1055 &[]
1056 );
1057 assert_eq!(
1058 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1059 &[]
1060 );
1061
1062 test_helper.telemetry_sender.send(TelemetryEvent::LinkPropertiesUpdate {
1063 interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1064 link_properties: IpVersions {
1065 ipv4: link_properties.clone(),
1066 ipv6: link_properties,
1067 },
1068 });
1069 test_helper.advance_test_fut(&mut test_fut);
1070
1071 time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1072 assert_eq!(
1073 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1074 expected_data_vec
1075 );
1076 assert_eq!(
1077 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1078 expected_data_vec
1079 );
1080 assert_eq!(
1083 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1084 &[]
1085 );
1086 assert_eq!(
1087 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1088 &[]
1089 );
1090 }
1091 }
1092
1093 #[test]
1094 fn test_link_state_update() {
1095 let (mut test_helper, mut test_fut) = setup_test();
1096
1097 let initial_value = LinkState::None.to_id() as u64;
1101 let final_value = LinkState::Internet.to_id() as u64;
1102 for i in initial_value..=final_value {
1103 let link_state = i.into();
1104 let mut expected_data_vec = vec![];
1105 if i != initial_value {
1106 expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(1 << i)));
1107 }
1108
1109 let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1112 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..], &[]);
1113 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..], &[]);
1114 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1115 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1116
1117 test_helper.telemetry_sender.send(TelemetryEvent::LinkStateUpdate {
1118 interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1119 link_state: IpVersions { ipv4: link_state, ipv6: link_state },
1120 });
1121 test_helper.advance_test_fut(&mut test_fut);
1122
1123 time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1124 assert_eq!(
1125 &time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..],
1126 expected_data_vec
1127 );
1128 assert_eq!(
1129 &time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..],
1130 expected_data_vec
1131 );
1132 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1135 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1136 }
1137 }
1138
1139 struct TestHelper {
1140 telemetry_sender: TelemetrySender,
1141 _inspector: Inspector,
1142 cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
1143 cobalt_events: Vec<MetricEvent>,
1146 mock_time_matrix_client: MockTimeMatrixClient,
1147
1148 exec: fasync::TestExecutor,
1150 }
1151
1152 impl TestHelper {
1153 fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
1155 let mut made_progress = true;
1156 while made_progress {
1157 let _result = self.exec.run_until_stalled(test_fut);
1158 made_progress = false;
1159 while let Poll::Ready(Some(Ok(req))) =
1160 self.exec.run_until_stalled(&mut self.cobalt_stream.next())
1161 {
1162 self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
1163 made_progress = true;
1164 }
1165 }
1166
1167 match self.exec.run_until_stalled(test_fut) {
1168 Poll::Pending => (),
1169 _ => panic!("expect test_fut to resolve to Poll::Pending"),
1170 }
1171 }
1172
1173 fn advance_by<T>(
1177 &mut self,
1178 duration: zx::MonotonicDuration,
1179 test_fut: &mut (impl Future<Output = T> + Unpin),
1180 ) {
1181 assert_eq!(
1182 duration.into_nanos() % STEP_INCREMENT.into_nanos(),
1183 0,
1184 "duration {:?} is not divisible by STEP_INCREMENT",
1185 duration,
1186 );
1187 const_assert_eq!(
1188 TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1189 0
1190 );
1191
1192 self.advance_test_fut(test_fut);
1193
1194 for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1195 self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1196 let _ = self.exec.wake_expired_timers();
1197 self.advance_test_fut(test_fut);
1198 }
1199 }
1200
1201 fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1202 self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1203 }
1204
1205 fn get_time_series<T>(
1206 &mut self,
1207 test_fut: &mut (impl Future<Output = T> + Unpin),
1208 ) -> Arc<Mutex<Stats>> {
1209 let (sender, mut receiver) = oneshot::channel();
1210 self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1211 self.advance_test_fut(test_fut);
1212 match receiver.try_recv() {
1213 Ok(Some(stats)) => stats,
1214 _ => panic!("Expect Stats to be returned"),
1215 }
1216 }
1217 }
1218
1219 trait CobaltExt {
1220 fn respond_to_metric_req(
1222 self,
1223 result: Result<(), fidl_fuchsia_metrics::Error>,
1224 ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1225 }
1226
1227 impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1228 fn respond_to_metric_req(
1229 self,
1230 result: Result<(), fidl_fuchsia_metrics::Error>,
1231 ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1232 match self {
1233 Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1234 assert!(responder.send(result).is_ok());
1235 vec![MetricEvent {
1236 metric_id,
1237 event_codes,
1238 payload: MetricEventPayload::Count(count),
1239 }]
1240 }
1241 Self::LogInteger { metric_id, value, event_codes, responder } => {
1242 assert!(responder.send(result).is_ok());
1243 vec![MetricEvent {
1244 metric_id,
1245 event_codes,
1246 payload: MetricEventPayload::IntegerValue(value),
1247 }]
1248 }
1249 Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1250 assert!(responder.send(result).is_ok());
1251 vec![MetricEvent {
1252 metric_id,
1253 event_codes,
1254 payload: MetricEventPayload::Histogram(histogram),
1255 }]
1256 }
1257 Self::LogString { metric_id, string_value, event_codes, responder } => {
1258 assert!(responder.send(result).is_ok());
1259 vec![MetricEvent {
1260 metric_id,
1261 event_codes,
1262 payload: MetricEventPayload::StringValue(string_value),
1263 }]
1264 }
1265 Self::LogMetricEvents { events, responder } => {
1266 assert!(responder.send(result).is_ok());
1267 events
1268 }
1269 }
1270 }
1271 }
1272
1273 fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = Result<(), Error>>>>) {
1274 let mut exec = fasync::TestExecutor::new_with_fake_time();
1275 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1276
1277 let (cobalt_proxy, cobalt_stream) =
1278 create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1279
1280 let inspector = Inspector::default();
1281 let inspect_node = inspector.root().create_child("telemetrytest");
1282 let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
1283 let mock_time_matrix_client = MockTimeMatrixClient::new();
1284
1285 let link_properties_state_logger = LinkPropertiesStateLogger::new(
1286 &inspect_metadata_node,
1287 &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1288 InterfaceTimeSeriesGrouping::Type(vec![
1289 InterfaceType::Ethernet,
1290 InterfaceType::WlanClient,
1291 ]),
1292 &mock_time_matrix_client,
1293 );
1294
1295 let (telemetry_sender, test_fut) =
1296 serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
1297 let mut test_fut = Box::pin(test_fut);
1298
1299 assert_matches::assert_matches!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1300
1301 let test_helper = TestHelper {
1302 telemetry_sender,
1303 _inspector: inspector,
1304 cobalt_stream,
1305 cobalt_events: vec![],
1306 mock_time_matrix_client,
1307 exec,
1308 };
1309 (test_helper, test_fut)
1310 }
1311}