1mod convert;
6mod inspect;
7pub mod processors;
8
9use self::inspect::{inspect_record_stats, Stats};
10use crate::{IpVersions, LinkState, State};
11use processors::link_properties_state::{
12 InterfaceIdentifier, InterfaceTimeSeriesGrouping, InterfaceType, LinkProperties,
13 LinkPropertiesStateLogger,
14};
15
16use anyhow::{format_err, Context, Error};
17use cobalt_client::traits::AsEventCode;
18use fidl_fuchsia_metrics::MetricEvent;
19use fuchsia_cobalt_builders::MetricEventExt;
20use fuchsia_inspect::Node as InspectNode;
21use fuchsia_sync::Mutex;
22use futures::channel::{mpsc, oneshot};
23use futures::{future, select, Future, StreamExt, TryFutureExt};
24use log::{info, warn};
25use static_assertions::const_assert_eq;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use windowed_stats::aggregations::SumAndCount;
29use windowed_stats::experimental::serve::serve_time_matrix_inspection;
30use {fuchsia_async as fasync, network_policy_metrics_registry as metrics};
31
32#[cfg(test)]
33mod testing;
34
35pub async fn create_metrics_logger(
36 factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
37) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
38 let (cobalt_proxy, cobalt_server) =
39 fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
40
41 let project_spec = fidl_fuchsia_metrics::ProjectSpec {
42 customer_id: None, project_id: Some(metrics::PROJECT_ID),
44 ..Default::default()
45 };
46
47 let status = factory_proxy
48 .create_metric_event_logger(&project_spec, cobalt_server)
49 .await
50 .context("create metrics event logger")?;
51
52 match status {
53 Ok(_) => Ok(cobalt_proxy),
54 Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
55 }
56}
57
58#[derive(Clone, Debug)]
59pub struct TelemetrySender {
60 sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
61 sender_is_blocked: Arc<AtomicBool>,
62}
63
64impl TelemetrySender {
65 pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
66 Self {
67 sender: Arc::new(Mutex::new(sender)),
68 sender_is_blocked: Arc::new(AtomicBool::new(false)),
69 }
70 }
71
72 pub fn send(&self, event: TelemetryEvent) {
74 match self.sender.lock().try_send(event) {
75 Ok(_) => {
76 if self
78 .sender_is_blocked
79 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
80 .is_ok()
81 {
82 info!("TelemetrySender recovered and resumed sending");
83 }
84 }
85 Err(_) => {
86 if self
88 .sender_is_blocked
89 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
90 .is_ok()
91 {
92 warn!("TelemetrySender dropped a msg: either buffer is full or no receiver is waiting");
93 }
94 }
95 }
96 }
97}
98
99#[derive(Debug, Clone, Default, PartialEq)]
100struct SystemStateSummary {
101 system_state: IpVersions<Option<State>>,
102}
103
104impl SystemStateSummary {
105 fn ipv4_link_state_val(&self) -> u32 {
106 match self.system_state.ipv4 {
107 Some(s) => s.link as u32,
108 None => 0u32,
109 }
110 }
111
112 fn ipv6_link_state_val(&self) -> u32 {
113 match self.system_state.ipv6 {
114 Some(s) => s.link as u32,
115 None => 0u32,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Default, PartialEq)]
121struct NetworkConfig {
122 has_default_ipv4_route: bool,
123 has_default_ipv6_route: bool,
124}
125
126#[derive(Debug, Clone, Default)]
127pub struct SystemStateUpdate {
128 pub(crate) system_state: IpVersions<Option<State>>,
129}
130
131#[derive(Debug)]
132pub enum TelemetryEvent {
133 SystemStateUpdate {
134 update: SystemStateUpdate,
135 },
136 NetworkConfig {
137 has_default_ipv4_route: bool,
138 has_default_ipv6_route: bool,
139 },
140 GatewayProbe {
141 internet_available: bool,
142 gateway_discoverable: bool,
143 gateway_pingable: bool,
144 },
145 LinkPropertiesUpdate {
148 interface_identifiers: Vec<InterfaceIdentifier>,
149 link_properties: IpVersions<LinkProperties>,
150 },
151 LinkStateUpdate {
154 interface_identifiers: Vec<InterfaceIdentifier>,
155 link_state: IpVersions<LinkState>,
156 },
157 GetTimeSeries {
159 sender: oneshot::Sender<Arc<Mutex<Stats>>>,
160 },
161}
162
163const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
166
167const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
168const METADATA_NODE_NAME: &str = "metadata";
169
170pub fn serve_telemetry(
171 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
172 inspect_node: InspectNode,
173) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
174 let inspect_time_series_node = inspect_node.create_child("time_series");
176 let link_properties_state_time_series_node =
177 inspect_time_series_node.create_child("link_properties_state");
178 let (time_matrix_client, time_matrix_fut) =
179 serve_time_matrix_inspection(link_properties_state_time_series_node);
180
181 inspect_node.record(inspect_time_series_node);
182 let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
183 let link_properties_state_logger = LinkPropertiesStateLogger::new(
184 &inspect_metadata_node,
185 &format!("root/telemetry/{METADATA_NODE_NAME}"),
186 InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
187 &time_matrix_client,
188 );
189 inspect_node.record(inspect_metadata_node);
191
192 let (sender, fut) =
193 serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
194 (sender, future::try_join(fut, time_matrix_fut).map_ok(|((), ())| ()))
195}
196
197fn serve_telemetry_inner(
198 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
199 inspect_node: InspectNode,
200 link_properties_state_logger: LinkPropertiesStateLogger,
201) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
202 let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
203 let sender = TelemetrySender::new(sender);
204 let cloned_sender = sender.clone();
205
206 let fut = async move {
207 let link_properties_state_logger = link_properties_state_logger;
208
209 let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
210 const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
211 const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
212 const INTERVAL_TICKS_PER_MINUTE: u64 =
213 (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
214 const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
215 let mut interval_tick = 0u64;
216 let mut telemetry =
217 Telemetry::new(cloned_sender, cobalt_proxy, link_properties_state_logger, inspect_node);
218 loop {
219 select! {
220 event = receiver.next() => {
221 if let Some(event) = event {
222 telemetry.handle_telemetry_event(event).await;
223 }
224 }
225 _ = report_interval_stream.next() => {
226 telemetry.handle_ten_secondly_telemetry();
227
228 interval_tick += 1;
229 if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
230 telemetry.handle_hourly_telemetry().await;
231 }
232 }
233 }
234 }
235 };
236 (sender, fut)
237}
238
239macro_rules! log_cobalt {
242 ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
243 let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
244 match status {
245 Ok(Ok(())) => (),
246 Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
247 Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
248 }
249 }};
250}
251
252macro_rules! log_cobalt_batch {
253 ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
254 let status = $cobalt_proxy.log_metric_events($events).await;
255 match status {
256 Ok(Ok(())) => (),
257 Ok(Err(e)) => {
258 warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
259 }
260 Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
261 }
262 }};
263}
264
265fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
266 const MILLIS_PER_SEC: i64 = 1000;
267 let millis = duration.into_millis();
268 let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
269 millis / MILLIS_PER_SEC + rounded_portion
270}
271
272struct Telemetry {
273 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
274 state_summary: Option<SystemStateSummary>,
275 state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
276 state_last_refreshed_for_inspect: fasync::MonotonicInstant,
277 reachability_lost_at: Option<(
278 fasync::MonotonicInstant,
279 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
280 )>,
281 network_config: Option<NetworkConfig>,
282 network_config_last_refreshed: fasync::MonotonicInstant,
283 link_properties_state_logger: LinkPropertiesStateLogger,
284
285 _inspect_node: InspectNode,
286 stats: Arc<Mutex<Stats>>,
287}
288
289impl Telemetry {
290 pub fn new(
291 _telemetry_sender: TelemetrySender,
292 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
293 link_properties_state_logger: LinkPropertiesStateLogger,
294 inspect_node: InspectNode,
295 ) -> Self {
296 let stats = Arc::new(Mutex::new(Stats::new()));
297 inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
298 Self {
299 cobalt_proxy,
300 state_summary: None,
301 state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
302 state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
303 reachability_lost_at: None,
304 network_config: None,
305 network_config_last_refreshed: fasync::MonotonicInstant::now(),
306 link_properties_state_logger,
307 _inspect_node: inspect_node,
308 stats,
309 }
310 }
311
312 pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
313 let now = fasync::MonotonicInstant::now();
314 match event {
315 TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
316 let new_state = Some(match &self.state_summary {
317 Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
318 None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
319 });
320 if self.state_summary != new_state {
321 self.log_system_state_metrics(
324 new_state,
325 now,
326 "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
327 )
328 .await
329 }
330 }
331 TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
332 let new_config =
333 Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
334 self.log_network_config_metrics(
335 new_config,
336 now,
337 "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
338 )
339 .await;
340 }
341 TelemetryEvent::GatewayProbe {
342 internet_available,
343 gateway_discoverable,
344 gateway_pingable,
345 } => {
346 if internet_available {
347 let metric_id = match (gateway_discoverable, gateway_pingable) {
348 (true, true) => None,
349 (true, false) => {
350 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
351 }
352 (false, true) => {
353 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
354 }
355 (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
356 };
357 if let Some(metric_id) = metric_id {
358 log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
359 }
360 }
361 }
362 TelemetryEvent::LinkPropertiesUpdate { interface_identifiers, link_properties } => {
363 self.link_properties_state_logger
364 .update_link_properties(interface_identifiers, &link_properties);
365 }
366 TelemetryEvent::LinkStateUpdate { interface_identifiers, link_state } => {
367 self.link_properties_state_logger
368 .update_link_state(interface_identifiers, &link_state);
369 }
370 TelemetryEvent::GetTimeSeries { sender } => {
371 let _result = sender.send(Arc::clone(&self.stats));
372 }
373 }
374 }
375
376 async fn log_system_state_metrics(
377 &mut self,
378 new_state: Option<SystemStateSummary>,
379 now: fasync::MonotonicInstant,
380 ctx: &'static str,
381 ) {
382 let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
383 let duration_sec_inspect =
384 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
385 let mut metric_events = vec![];
386
387 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
388
389 if let Some(prev) = &self.state_summary {
390 if prev.system_state.has_interface_up() {
391 metric_events.push(
392 MetricEvent::builder(
393 metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
394 )
395 .as_integer(duration_cobalt.into_micros()),
396 );
397
398 let route_config_dim = convert::convert_route_config(&prev.system_state);
399 let internet_available =
400 convert::convert_yes_no_dim(prev.system_state.has_internet());
401 let gateway_reachable =
402 convert::convert_yes_no_dim(prev.system_state.has_gateway());
403 let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
404 let http_status = if prev.system_state.has_http() {
405 metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
406 } else {
407 metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
408 };
409 if let Some(route_config) = route_config_dim {
411 metric_events.push(
412 MetricEvent::builder(
413 metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
414 )
415 .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
416 route_config,
417 internet_available,
418 gateway_reachable,
419 dns_active,
420 http_status,
421 })
422 .as_integer(duration_cobalt.into_micros()),
423 );
424 }
425
426 if prev.system_state.has_internet() {
427 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
428 }
429 if prev.system_state.has_dns() {
430 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
431 }
432 if prev.system_state.has_http() {
433 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
434 }
435 }
436 }
437
438 if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
439 let previously_reachable =
440 prev.system_state.has_internet() && prev.system_state.has_dns();
441 let now_reachable =
442 new_state.system_state.has_internet() && new_state.system_state.has_dns();
443 if previously_reachable && !now_reachable {
444 let route_config_dim = convert::convert_route_config(&prev.system_state);
445 if let Some(route_config_dim) = route_config_dim {
446 metric_events.push(
447 MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
448 .with_event_code(route_config_dim.as_event_code())
449 .as_occurrence(1),
450 );
451 self.reachability_lost_at = Some((now, route_config_dim));
452 }
453 self.stats.lock().reachability_lost_count.log_value(&1);
454 }
455
456 if !previously_reachable && now_reachable {
457 if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
458 metric_events.push(
459 MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
460 .with_event_code(route_config_dim.as_event_code())
461 .as_integer((now - reachability_lost_at).into_micros()),
462 );
463 }
464 self.reachability_lost_at = None;
465 }
466 }
467
468 if let Some(new_state) = &new_state {
469 self.stats
470 .lock()
471 .ipv4_state
472 .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
473 self.stats
474 .lock()
475 .ipv6_state
476 .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
477 }
478
479 if !metric_events.is_empty() {
480 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
481 }
482
483 if let Some(new_state) = new_state {
484 self.state_summary = Some(new_state);
485 }
486 self.state_last_refreshed_for_cobalt = now;
487 self.state_last_refreshed_for_inspect = now;
488 }
489
490 async fn log_network_config_metrics(
491 &mut self,
492 new_config: Option<NetworkConfig>,
493 now: fasync::MonotonicInstant,
494 ctx: &'static str,
495 ) {
496 let duration = now - self.network_config_last_refreshed;
497 let mut metric_events = vec![];
498
499 if let Some(prev) = &self.network_config {
500 let default_route_dim = convert::convert_default_route(
501 prev.has_default_ipv4_route,
502 prev.has_default_ipv6_route,
503 );
504 if let Some(default_route_dim) = default_route_dim {
506 metric_events.push(
507 MetricEvent::builder(
508 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
509 )
510 .with_event_code(default_route_dim.as_event_code())
511 .as_integer(duration.into_micros()),
512 );
513 metric_events.push(
514 MetricEvent::builder(
515 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
516 )
517 .with_event_code(default_route_dim.as_event_code())
518 .as_occurrence(1),
519 );
520 }
521 }
522
523 if !metric_events.is_empty() {
524 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
525 }
526
527 if let Some(new_config) = new_config {
528 self.network_config = Some(new_config);
529 }
530 self.network_config_last_refreshed = now;
531 }
532
533 pub fn handle_ten_secondly_telemetry(&mut self) {
534 let now = fasync::MonotonicInstant::now();
535 let duration_sec_inspect =
536 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
537
538 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
539
540 if let Some(current) = &self.state_summary {
541 if current.system_state.has_internet() {
542 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
543 }
544 if current.system_state.has_dns() {
545 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
546 }
547 if current.system_state.has_http() {
548 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
549 }
550
551 self.stats
552 .lock()
553 .ipv4_state
554 .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
555 self.stats
556 .lock()
557 .ipv6_state
558 .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
559 }
560 self.state_last_refreshed_for_inspect = now;
561 }
562
563 pub async fn handle_hourly_telemetry(&mut self) {
564 let now = fasync::MonotonicInstant::now();
565 self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
566 self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use crate::{ApplicationState, LinkState};
573
574 use super::*;
575 use fidl::endpoints::create_proxy_and_stream;
576 use fidl_fuchsia_metrics::MetricEventPayload;
577 use fuchsia_inspect::Inspector;
578 use fuchsia_inspect_contrib::id_enum::IdEnum;
579
580 use futures::task::Poll;
581 use std::pin::Pin;
582 use test_case::test_case;
583 use windowed_stats::experimental::clock::Timed;
584 use windowed_stats::experimental::testing::{MockTimeMatrixClient, TimeMatrixCall};
585
586 const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
587
588 #[test]
589 fn test_log_state_change() {
590 let (mut test_helper, mut test_fut) = setup_test();
591
592 let mut update = SystemStateUpdate {
593 system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
594 };
595 test_helper
596 .telemetry_sender
597 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
598
599 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
600
601 update.system_state = IpVersions {
602 ipv4: Some(State {
603 link: LinkState::Internet,
604 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
605 }),
606 ipv6: None,
607 };
608 test_helper
609 .telemetry_sender
610 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
611
612 test_helper.advance_test_fut(&mut test_fut);
613
614 let logged_metrics = test_helper
615 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
616 assert_eq!(logged_metrics.len(), 1);
617 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
618
619 let logged_metrics = test_helper
620 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
621 assert_eq!(logged_metrics.len(), 1);
622 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
623 assert_eq!(
624 logged_metrics[0].event_codes,
625 &[
626 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
627 as u32,
628 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
629 as u32,
630 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
631 as u32,
632 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
633 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
634 as u32,
635 ]
636 );
637
638 test_helper.cobalt_events.clear();
639 test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
640
641 let logged_metrics = test_helper
645 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
646 assert_eq!(logged_metrics.len(), 1);
647 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
648
649 let logged_metrics = test_helper
650 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
651 assert_eq!(logged_metrics.len(), 1);
652 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
653 assert_eq!(
654 logged_metrics[0].event_codes,
655 &[
656 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
657 as u32,
658 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
659 as u32,
660 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
661 as u32,
662 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
664 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
665 as u32,
666 ]
667 );
668 }
669
670 #[test]
671 fn test_log_reachability_lost() {
672 let (mut test_helper, mut test_fut) = setup_test();
673
674 let mut update = SystemStateUpdate {
675 system_state: IpVersions {
676 ipv4: None,
677 ipv6: Some(State {
678 link: LinkState::Internet,
679 application: ApplicationState { dns_resolved: true, ..Default::default() },
680 }),
681 },
682 ..SystemStateUpdate::default()
683 };
684 test_helper
685 .telemetry_sender
686 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
687 test_helper.advance_test_fut(&mut test_fut);
688
689 update.system_state = IpVersions {
690 ipv4: None,
691 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
692 };
693 test_helper
694 .telemetry_sender
695 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
696 test_helper.advance_test_fut(&mut test_fut);
697
698 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
701 assert_eq!(logged_metrics.len(), 1);
702 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
703 assert_eq!(
704 logged_metrics[0].event_codes,
705 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
706 );
707
708 test_helper.cobalt_events.clear();
709
710 update.system_state = IpVersions {
711 ipv4: None,
712 ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
713 };
714 test_helper
715 .telemetry_sender
716 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
717 test_helper.advance_test_fut(&mut test_fut);
718
719 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
722 assert_eq!(logged_metrics.len(), 0);
723
724 test_helper.cobalt_events.clear();
725
726 test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
727 update.system_state = IpVersions {
728 ipv4: Some(State {
729 link: LinkState::Internet,
730 application: ApplicationState { dns_resolved: true, ..Default::default() },
731 }),
732 ipv6: None,
733 };
734 test_helper
735 .telemetry_sender
736 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
737 test_helper.advance_test_fut(&mut test_fut);
738
739 let logged_metrics =
741 test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
742 assert_eq!(logged_metrics.len(), 1);
743 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
744 assert_eq!(
745 logged_metrics[0].event_codes,
746 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
749 );
750 }
751
752 #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
753 #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
754 #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
755 #[test_case(false, false, None; "no default routes")]
756 #[fuchsia::test(add_test_attr = false)]
757 fn test_log_default_route(
758 has_default_ipv4_route: bool,
759 has_default_ipv6_route: bool,
760 expected_dim: Option<
761 metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
762 >,
763 ) {
764 let (mut test_helper, mut test_fut) = setup_test();
765
766 test_helper
767 .telemetry_sender
768 .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
769 test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
770
771 let logged_metrics = test_helper
772 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
773 match expected_dim {
774 Some(dim) => {
775 assert_eq!(logged_metrics.len(), 1);
776 assert_eq!(
777 logged_metrics[0].payload,
778 MetricEventPayload::IntegerValue(3600_000_000)
779 );
780 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
781 }
782 None => {
783 assert_eq!(logged_metrics.len(), 0);
784 }
785 }
786
787 let logged_metrics = test_helper
788 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
789 match expected_dim {
790 Some(dim) => {
791 assert_eq!(logged_metrics.len(), 1);
792 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
793 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
794 }
795 None => {
796 assert_eq!(logged_metrics.len(), 0);
797 }
798 }
799 }
800
801 #[test_case(true, true, vec![]; "negative")]
802 #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
803 #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
804 #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
805 #[fuchsia::test(add_test_attr = false)]
806 fn test_log_abnormal_state_situation(
807 gateway_discoverable: bool,
808 gateway_pingable: bool,
809 expected_metrics: Vec<u32>,
810 ) {
811 let (mut test_helper, mut test_fut) = setup_test();
812
813 test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
814 internet_available: true,
815 gateway_discoverable,
816 gateway_pingable,
817 });
818 test_helper.advance_test_fut(&mut test_fut);
819
820 let logged_metrics: Vec<u32> = [
821 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
822 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
823 metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
824 ]
825 .into_iter()
826 .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
827 .collect();
828
829 assert_eq!(logged_metrics, expected_metrics);
830 }
831
832 #[test]
833 fn test_state_snapshot_duration_inspect_stats() {
834 let (mut test_helper, mut test_fut) = setup_test();
835
836 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
837
838 let time_series = test_helper.get_time_series(&mut test_fut);
839 let internet_available_sec: Vec<_> =
840 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
841 let dns_active_sec: Vec<_> =
842 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
843 let http_active_sec: Vec<_> =
844 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
845 let total_duration_sec: Vec<_> =
846 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
847 assert_eq!(internet_available_sec, vec![0]);
848 assert_eq!(dns_active_sec, vec![0]);
849 assert_eq!(http_active_sec, vec![0]);
850 assert_eq!(total_duration_sec, vec![20]);
851
852 let mut update = SystemStateUpdate {
853 system_state: IpVersions {
854 ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
855 ipv6: None,
856 },
857 };
858 test_helper
859 .telemetry_sender
860 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
861 test_helper.advance_test_fut(&mut test_fut);
862 let time_series = test_helper.get_time_series(&mut test_fut);
863 let total_duration_sec: Vec<_> =
864 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
865 assert_eq!(total_duration_sec, vec![25]);
866
867 test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
868
869 let time_series = test_helper.get_time_series(&mut test_fut);
872 let internet_available_sec: Vec<_> =
873 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
874 let dns_active_sec: Vec<_> =
875 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
876 let http_active_sec: Vec<_> =
877 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
878 let total_duration_sec: Vec<_> =
879 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
880 assert_eq!(internet_available_sec, vec![15]);
881 assert_eq!(dns_active_sec, vec![0]);
882 assert_eq!(http_active_sec, vec![0]);
883 assert_eq!(total_duration_sec, vec![40]);
884
885 update.system_state = IpVersions {
886 ipv4: Some(State {
887 link: LinkState::Internet,
888 application: ApplicationState { dns_resolved: true, ..Default::default() },
889 }),
890 ipv6: None,
891 };
892 test_helper
893 .telemetry_sender
894 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
895
896 test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
897
898 let time_series = test_helper.get_time_series(&mut test_fut);
901 let internet_available_sec: Vec<_> =
902 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
903 let dns_active_sec: Vec<_> =
904 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
905 let http_active_sec: Vec<_> =
906 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
907 let total_duration_sec: Vec<_> =
908 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
909 assert_eq!(internet_available_sec, vec![25, 40]);
910 assert_eq!(dns_active_sec, vec![10, 40]);
911 assert_eq!(http_active_sec, vec![0]);
912 assert_eq!(total_duration_sec, vec![40]);
913
914 update.system_state = IpVersions {
915 ipv4: Some(State {
916 link: LinkState::Internet,
917 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
918 }),
919 ipv6: None,
920 };
921 test_helper
922 .telemetry_sender
923 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
924
925 test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
926
927 let time_series = test_helper.get_time_series(&mut test_fut);
930 let internet_available_sec: Vec<_> =
931 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
932 let dns_active_sec: Vec<_> =
933 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
934 let http_active_sec: Vec<_> =
935 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
936 let total_duration_sec: Vec<_> =
937 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
938 assert_eq!(internet_available_sec, vec![25, 60, 40]);
939 assert_eq!(dns_active_sec, vec![10, 60, 40]);
940 assert_eq!(http_active_sec, vec![0, 20, 40]);
941 assert_eq!(total_duration_sec, vec![40]);
942 }
943
944 #[test]
945 fn test_reachability_lost_count_inspect_stats() {
946 let (mut test_helper, mut test_fut) = setup_test();
947
948 let mut update = SystemStateUpdate {
949 system_state: IpVersions {
950 ipv4: None,
951 ipv6: Some(State {
952 link: LinkState::Internet,
953 application: ApplicationState { dns_resolved: true, ..Default::default() },
954 }),
955 },
956 ..SystemStateUpdate::default()
957 };
958 test_helper
959 .telemetry_sender
960 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
961 test_helper.advance_test_fut(&mut test_fut);
962
963 let time_series = test_helper.get_time_series(&mut test_fut);
964 let reachability_lost_count: Vec<_> =
965 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
966 assert_eq!(reachability_lost_count, vec![0]);
967
968 update.system_state = IpVersions {
969 ipv4: None,
970 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
971 };
972 test_helper
973 .telemetry_sender
974 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
975 test_helper.advance_test_fut(&mut test_fut);
976
977 let time_series = test_helper.get_time_series(&mut test_fut);
978 let reachability_lost_count: Vec<_> =
979 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
980 assert_eq!(reachability_lost_count, vec![1]);
981 }
982
983 #[test]
984 fn test_avg_state_inspect_stats() {
985 let (mut test_helper, mut test_fut) = setup_test();
986
987 let mut update = SystemStateUpdate {
988 system_state: IpVersions {
989 ipv4: None,
990 ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
991 },
992 ..SystemStateUpdate::default()
993 };
994 test_helper
995 .telemetry_sender
996 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
997 test_helper.advance_test_fut(&mut test_fut);
998
999 let time_series = test_helper.get_time_series(&mut test_fut);
1000 let ipv4_state: Vec<_> =
1001 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1002 let ipv6_state: Vec<_> =
1003 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1004 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
1005 assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
1006
1007 update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
1008 test_helper
1009 .telemetry_sender
1010 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1011 test_helper.advance_test_fut(&mut test_fut);
1012
1013 let time_series = test_helper.get_time_series(&mut test_fut);
1014 let ipv4_state: Vec<_> =
1015 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1016 let ipv6_state: Vec<_> =
1017 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1018 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
1019 assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
1020 }
1021
1022 #[test]
1023 fn test_link_properties_update() {
1024 let (mut test_helper, mut test_fut) = setup_test();
1025
1026 for i in 0..=15 {
1030 let link_properties = LinkProperties::from(i);
1031 let mut expected_data_vec = vec![];
1032 if i != 0 {
1036 expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(i)));
1037 }
1038
1039 let mut time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1042 assert_eq!(
1043 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1044 &[]
1045 );
1046 assert_eq!(
1047 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1048 &[]
1049 );
1050 assert_eq!(
1051 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1052 &[]
1053 );
1054 assert_eq!(
1055 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1056 &[]
1057 );
1058
1059 test_helper.telemetry_sender.send(TelemetryEvent::LinkPropertiesUpdate {
1060 interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1061 link_properties: IpVersions {
1062 ipv4: link_properties.clone(),
1063 ipv6: link_properties,
1064 },
1065 });
1066 test_helper.advance_test_fut(&mut test_fut);
1067
1068 time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1069 assert_eq!(
1070 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1071 expected_data_vec
1072 );
1073 assert_eq!(
1074 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1075 expected_data_vec
1076 );
1077 assert_eq!(
1080 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1081 &[]
1082 );
1083 assert_eq!(
1084 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1085 &[]
1086 );
1087 }
1088 }
1089
1090 #[test]
1091 fn test_link_state_update() {
1092 let (mut test_helper, mut test_fut) = setup_test();
1093
1094 let initial_value = LinkState::None.to_id() as u64;
1098 let final_value = LinkState::Internet.to_id() as u64;
1099 for i in initial_value..=final_value {
1100 let link_state = i.into();
1101 let mut expected_data_vec = vec![];
1102 if i != initial_value {
1103 expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(1 << i)));
1104 }
1105
1106 let mut time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1109 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..], &[]);
1110 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..], &[]);
1111 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1112 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1113
1114 test_helper.telemetry_sender.send(TelemetryEvent::LinkStateUpdate {
1115 interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1116 link_state: IpVersions { ipv4: link_state, ipv6: link_state },
1117 });
1118 test_helper.advance_test_fut(&mut test_fut);
1119
1120 time_matrix_calls = test_helper.mock_time_matrix_client.fold_buffered_samples();
1121 assert_eq!(
1122 &time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..],
1123 expected_data_vec
1124 );
1125 assert_eq!(
1126 &time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..],
1127 expected_data_vec
1128 );
1129 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1132 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1133 }
1134 }
1135
1136 struct TestHelper {
1137 telemetry_sender: TelemetrySender,
1138 _inspector: Inspector,
1139 cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
1140 cobalt_events: Vec<MetricEvent>,
1143 mock_time_matrix_client: MockTimeMatrixClient,
1144
1145 exec: fasync::TestExecutor,
1147 }
1148
1149 impl TestHelper {
1150 fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
1152 let mut made_progress = true;
1153 while made_progress {
1154 let _result = self.exec.run_until_stalled(test_fut);
1155 made_progress = false;
1156 while let Poll::Ready(Some(Ok(req))) =
1157 self.exec.run_until_stalled(&mut self.cobalt_stream.next())
1158 {
1159 self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
1160 made_progress = true;
1161 }
1162 }
1163
1164 match self.exec.run_until_stalled(test_fut) {
1165 Poll::Pending => (),
1166 _ => panic!("expect test_fut to resolve to Poll::Pending"),
1167 }
1168 }
1169
1170 fn advance_by<T>(
1174 &mut self,
1175 duration: zx::MonotonicDuration,
1176 test_fut: &mut (impl Future<Output = T> + Unpin),
1177 ) {
1178 assert_eq!(
1179 duration.into_nanos() % STEP_INCREMENT.into_nanos(),
1180 0,
1181 "duration {:?} is not divisible by STEP_INCREMENT",
1182 duration,
1183 );
1184 const_assert_eq!(
1185 TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1186 0
1187 );
1188
1189 self.advance_test_fut(test_fut);
1190
1191 for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1192 self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1193 let _ = self.exec.wake_expired_timers();
1194 self.advance_test_fut(test_fut);
1195 }
1196 }
1197
1198 fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1199 self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1200 }
1201
1202 fn get_time_series<T>(
1203 &mut self,
1204 test_fut: &mut (impl Future<Output = T> + Unpin),
1205 ) -> Arc<Mutex<Stats>> {
1206 let (sender, mut receiver) = oneshot::channel();
1207 self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1208 self.advance_test_fut(test_fut);
1209 match receiver.try_recv() {
1210 Ok(Some(stats)) => stats,
1211 _ => panic!("Expect Stats to be returned"),
1212 }
1213 }
1214 }
1215
1216 trait CobaltExt {
1217 fn respond_to_metric_req(
1219 self,
1220 result: Result<(), fidl_fuchsia_metrics::Error>,
1221 ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1222 }
1223
1224 impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1225 fn respond_to_metric_req(
1226 self,
1227 result: Result<(), fidl_fuchsia_metrics::Error>,
1228 ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1229 match self {
1230 Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1231 assert!(responder.send(result).is_ok());
1232 vec![MetricEvent {
1233 metric_id,
1234 event_codes,
1235 payload: MetricEventPayload::Count(count),
1236 }]
1237 }
1238 Self::LogInteger { metric_id, value, event_codes, responder } => {
1239 assert!(responder.send(result).is_ok());
1240 vec![MetricEvent {
1241 metric_id,
1242 event_codes,
1243 payload: MetricEventPayload::IntegerValue(value),
1244 }]
1245 }
1246 Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1247 assert!(responder.send(result).is_ok());
1248 vec![MetricEvent {
1249 metric_id,
1250 event_codes,
1251 payload: MetricEventPayload::Histogram(histogram),
1252 }]
1253 }
1254 Self::LogString { metric_id, string_value, event_codes, responder } => {
1255 assert!(responder.send(result).is_ok());
1256 vec![MetricEvent {
1257 metric_id,
1258 event_codes,
1259 payload: MetricEventPayload::StringValue(string_value),
1260 }]
1261 }
1262 Self::LogMetricEvents { events, responder } => {
1263 assert!(responder.send(result).is_ok());
1264 events
1265 }
1266 }
1267 }
1268 }
1269
1270 fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = Result<(), Error>>>>) {
1271 let mut exec = fasync::TestExecutor::new_with_fake_time();
1272 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1273
1274 let (cobalt_proxy, cobalt_stream) =
1275 create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1276
1277 let inspector = Inspector::default();
1278 let inspect_node = inspector.root().create_child("telemetrytest");
1279 let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
1280 let mock_time_matrix_client = MockTimeMatrixClient::new();
1281
1282 let link_properties_state_logger = LinkPropertiesStateLogger::new(
1283 &inspect_metadata_node,
1284 &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1285 InterfaceTimeSeriesGrouping::Type(vec![
1286 InterfaceType::Ethernet,
1287 InterfaceType::WlanClient,
1288 ]),
1289 &mock_time_matrix_client,
1290 );
1291
1292 let (telemetry_sender, test_fut) =
1293 serve_telemetry_inner(cobalt_proxy, inspect_node, link_properties_state_logger);
1294 let mut test_fut = Box::pin(test_fut);
1295
1296 assert_matches::assert_matches!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1297
1298 let test_helper = TestHelper {
1299 telemetry_sender,
1300 _inspector: inspector,
1301 cobalt_stream,
1302 cobalt_events: vec![],
1303 mock_time_matrix_client,
1304 exec,
1305 };
1306 (test_helper, test_fut)
1307 }
1308}