1mod convert;
6mod inspect;
7pub mod processors;
8
9use self::inspect::{Stats, inspect_record_stats};
10use crate::fetch::FetchError;
11use crate::ping::PingError;
12use crate::telemetry::processors::link_properties_state::{
13 LinkProperties, LinkPropertiesStateLogger,
14};
15use crate::telemetry::processors::{
16 InterfaceIdentifier, InterfaceTimeSeriesGrouping, InterfaceType,
17};
18use crate::{FetchParameters, IpVersions, LinkState, PingParameters, State};
19use processors::interface_aware_logger::InterfaceAwareLogger;
20
21use anyhow::{Context, Error, format_err};
22use cobalt_client::traits::AsEventCode;
23use fidl_fuchsia_metrics::MetricEvent;
24use fuchsia_async as fasync;
25use fuchsia_cobalt_builders::MetricEventExt;
26use fuchsia_inspect::Node as InspectNode;
27use fuchsia_sync::Mutex;
28use futures::channel::{mpsc, oneshot};
29use futures::{Future, StreamExt, select};
30use log::{info, warn};
31use network_policy_metrics_registry as metrics;
32use static_assertions::const_assert_eq;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use windowed_stats::aggregations::SumAndCount;
36use windowed_stats::experimental::inspect::TimeMatrixClient;
37
38#[cfg(test)]
39mod testing;
40
41pub async fn create_metrics_logger(
42 factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
43) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
44 let (cobalt_proxy, cobalt_server) =
45 fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
46
47 let project_spec = fidl_fuchsia_metrics::ProjectSpec {
48 customer_id: None, project_id: Some(metrics::PROJECT_ID),
50 ..Default::default()
51 };
52
53 let status = factory_proxy
54 .create_metric_event_logger(&project_spec, cobalt_server)
55 .await
56 .context("create metrics event logger")?;
57
58 match status {
59 Ok(_) => Ok(cobalt_proxy),
60 Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
61 }
62}
63
64#[derive(Clone, Debug)]
65pub struct TelemetrySender {
66 sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
67 sender_is_blocked: Arc<AtomicBool>,
68}
69
70impl TelemetrySender {
71 pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
72 Self {
73 sender: Arc::new(Mutex::new(sender)),
74 sender_is_blocked: Arc::new(AtomicBool::new(false)),
75 }
76 }
77
78 pub fn send(&self, event: TelemetryEvent) {
80 match self.sender.lock().try_send(event) {
81 Ok(_) => {
82 if self
84 .sender_is_blocked
85 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
86 .is_ok()
87 {
88 info!("TelemetrySender recovered and resumed sending");
89 }
90 }
91 Err(_) => {
92 if self
94 .sender_is_blocked
95 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
96 .is_ok()
97 {
98 warn!(
99 "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
100 );
101 }
102 }
103 }
104 }
105}
106
107#[derive(Debug, Clone, Default, PartialEq)]
108struct SystemStateSummary {
109 system_state: IpVersions<Option<State>>,
110}
111
112impl SystemStateSummary {
113 fn ipv4_link_state_val(&self) -> u32 {
114 match self.system_state.ipv4 {
115 Some(s) => s.link as u32,
116 None => 0u32,
117 }
118 }
119
120 fn ipv6_link_state_val(&self) -> u32 {
121 match self.system_state.ipv6 {
122 Some(s) => s.link as u32,
123 None => 0u32,
124 }
125 }
126}
127
128#[derive(Debug, Clone, Default, PartialEq)]
129struct NetworkConfig {
130 has_default_ipv4_route: bool,
131 has_default_ipv6_route: bool,
132}
133
134#[derive(Debug, Clone, Default)]
135pub struct SystemStateUpdate {
136 pub(crate) system_state: IpVersions<Option<State>>,
137}
138
139#[derive(Debug)]
140pub enum TelemetryEvent {
141 SystemStateUpdate {
142 update: SystemStateUpdate,
143 },
144 NetworkConfig {
145 has_default_ipv4_route: bool,
146 has_default_ipv6_route: bool,
147 },
148 GatewayProbe {
149 internet_available: bool,
150 gateway_discoverable: bool,
151 gateway_pingable: bool,
152 },
153 LinkPropertiesUpdate {
156 interface_identifiers: Vec<InterfaceIdentifier>,
157 link_properties: IpVersions<LinkProperties>,
158 },
159 LinkStateUpdate {
162 interface_identifiers: Vec<InterfaceIdentifier>,
163 link_state: IpVersions<LinkState>,
164 },
165 GatewayPingResult {
166 interface_identifiers: Vec<InterfaceIdentifier>,
167 ping_parameters: PingParameters,
168 gateway_ping_result: Result<(), PingError>,
169 },
170 InternetPingResult {
171 interface_identifiers: Vec<InterfaceIdentifier>,
172 ping_parameters: PingParameters,
173 internet_ping_result: Result<(), PingError>,
174 },
175 FetchResult {
176 interface_identifiers: Vec<InterfaceIdentifier>,
177 fetch_parameters: FetchParameters,
178 fetch_result: Result<u16, FetchError>,
179 },
180 GetTimeSeries {
182 sender: oneshot::Sender<Arc<Mutex<Stats>>>,
183 },
184}
185
186const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
189
190const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
191const METADATA_NODE_NAME: &str = "metadata";
192
193pub fn serve_telemetry(
194 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
195 inspect_node: InspectNode,
196) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
197 let inspect_events_node = inspect_node.create_child("events");
199
200 let inspect_time_series_node = inspect_node.create_child("time_series");
202 let link_properties_state_time_series_node =
203 inspect_time_series_node.create_child("link_properties_state");
204 let time_matrix_client =
205 TimeMatrixClient::new(link_properties_state_time_series_node.clone_weak());
206
207 inspect_node.record(link_properties_state_time_series_node);
208 let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
209 let link_properties_state_logger = LinkPropertiesStateLogger::new(
210 &inspect_metadata_node,
211 &format!("root/telemetry/{METADATA_NODE_NAME}"),
212 InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
213 &time_matrix_client,
214 );
215
216 let interface_aware_events_node = inspect_events_node.create_child("interfaces");
217 let interface_aware_time_series_node = inspect_time_series_node.create_child("interfaces");
218 let interface_aware_logger = InterfaceAwareLogger::new(
219 &inspect_metadata_node,
220 &format!("root/telemetry/{METADATA_NODE_NAME}"),
221 InterfaceTimeSeriesGrouping::Type(vec![InterfaceType::Ethernet, InterfaceType::WlanClient]),
222 interface_aware_events_node,
223 interface_aware_time_series_node,
224 );
225
226 inspect_node.record(inspect_events_node);
228 inspect_node.record(inspect_time_series_node);
229 inspect_node.record(inspect_metadata_node);
230
231 let (sender, fut) = serve_telemetry_inner(
232 cobalt_proxy,
233 inspect_node,
234 link_properties_state_logger,
235 interface_aware_logger,
236 );
237 (sender, fut)
238}
239
240fn serve_telemetry_inner(
241 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
242 inspect_node: InspectNode,
243 link_properties_state_logger: LinkPropertiesStateLogger,
244 interface_aware_logger: InterfaceAwareLogger,
245) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
246 let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
247 let sender = TelemetrySender::new(sender);
248 let cloned_sender = sender.clone();
249
250 let fut = async move {
251 let link_properties_state_logger = link_properties_state_logger;
252
253 let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
254 const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
255 const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
256 const INTERVAL_TICKS_PER_MINUTE: u64 =
257 (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
258 const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
259 let mut interval_tick = 0u64;
260 let mut telemetry = Telemetry::new(
261 cloned_sender,
262 cobalt_proxy,
263 link_properties_state_logger,
264 interface_aware_logger,
265 inspect_node,
266 );
267 loop {
268 select! {
269 event = receiver.next() => {
270 if let Some(event) = event {
271 telemetry.handle_telemetry_event(event).await;
272 }
273 }
274 _ = report_interval_stream.next() => {
275 telemetry.handle_ten_secondly_telemetry();
276
277 interval_tick += 1;
278 if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
279 telemetry.handle_hourly_telemetry().await;
280 }
281 }
282 }
283 }
284 };
285 (sender, fut)
286}
287
288macro_rules! log_cobalt {
291 ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
292 let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
293 match status {
294 Ok(Ok(())) => (),
295 Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
296 Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
297 }
298 }};
299}
300
301macro_rules! log_cobalt_batch {
302 ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
303 let status = $cobalt_proxy.log_metric_events($events).await;
304 match status {
305 Ok(Ok(())) => (),
306 Ok(Err(e)) => {
307 warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
308 }
309 Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
310 }
311 }};
312}
313
314fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
315 const MILLIS_PER_SEC: i64 = 1000;
316 let millis = duration.into_millis();
317 let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
318 millis / MILLIS_PER_SEC + rounded_portion
319}
320
321struct Telemetry {
322 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
323 state_summary: Option<SystemStateSummary>,
324 state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
325 state_last_refreshed_for_inspect: fasync::MonotonicInstant,
326 reachability_lost_at: Option<(
327 fasync::MonotonicInstant,
328 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
329 )>,
330 network_config: Option<NetworkConfig>,
331 network_config_last_refreshed: fasync::MonotonicInstant,
332 link_properties_state_logger: LinkPropertiesStateLogger,
333 interface_aware_logger: InterfaceAwareLogger,
334
335 _inspect_node: InspectNode,
336 stats: Arc<Mutex<Stats>>,
337}
338
339impl Telemetry {
340 pub fn new(
341 _telemetry_sender: TelemetrySender,
342 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
343 link_properties_state_logger: LinkPropertiesStateLogger,
344 interface_aware_logger: InterfaceAwareLogger,
345 inspect_node: InspectNode,
346 ) -> Self {
347 let stats = Arc::new(Mutex::new(Stats::new()));
348 inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
349 Self {
350 cobalt_proxy,
351 state_summary: None,
352 state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
353 state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
354 reachability_lost_at: None,
355 network_config: None,
356 network_config_last_refreshed: fasync::MonotonicInstant::now(),
357 link_properties_state_logger,
358 interface_aware_logger,
359 _inspect_node: inspect_node,
360 stats,
361 }
362 }
363
364 pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
365 let now = fasync::MonotonicInstant::now();
366 match event {
367 TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
368 let new_state = Some(match &self.state_summary {
369 Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
370 None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
371 });
372 if self.state_summary != new_state {
373 self.log_system_state_metrics(
376 new_state,
377 now,
378 "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
379 )
380 .await
381 }
382 }
383 TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
384 let new_config =
385 Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
386 self.log_network_config_metrics(
387 new_config,
388 now,
389 "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
390 )
391 .await;
392 }
393 TelemetryEvent::GatewayProbe {
394 internet_available,
395 gateway_discoverable,
396 gateway_pingable,
397 } => {
398 if internet_available {
399 let metric_id = match (gateway_discoverable, gateway_pingable) {
400 (true, true) => None,
401 (true, false) => {
402 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
403 }
404 (false, true) => {
405 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
406 }
407 (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
408 };
409 if let Some(metric_id) = metric_id {
410 log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
411 }
412 }
413 }
414 TelemetryEvent::LinkPropertiesUpdate { interface_identifiers, link_properties } => {
415 self.link_properties_state_logger
416 .update_link_properties(interface_identifiers, &link_properties);
417 }
418 TelemetryEvent::LinkStateUpdate { interface_identifiers, link_state } => {
419 self.link_properties_state_logger
420 .update_link_state(interface_identifiers, &link_state);
421 }
422 TelemetryEvent::GatewayPingResult {
423 interface_identifiers,
424 ping_parameters,
425 gateway_ping_result,
426 } => {
427 self.interface_aware_logger.log_gateway_ping_result(
428 interface_identifiers,
429 &ping_parameters,
430 &gateway_ping_result,
431 );
432 }
433 TelemetryEvent::InternetPingResult {
434 interface_identifiers,
435 ping_parameters,
436 internet_ping_result,
437 } => {
438 self.interface_aware_logger.log_internet_ping_result(
439 interface_identifiers,
440 &ping_parameters,
441 &internet_ping_result,
442 );
443 }
444 TelemetryEvent::FetchResult {
445 interface_identifiers,
446 fetch_parameters,
447 fetch_result,
448 } => {
449 self.interface_aware_logger.log_fetch_result(
450 interface_identifiers,
451 &fetch_parameters,
452 &fetch_result,
453 );
454 }
455 TelemetryEvent::GetTimeSeries { sender } => {
456 let _result = sender.send(Arc::clone(&self.stats));
457 }
458 }
459 }
460
461 async fn log_system_state_metrics(
462 &mut self,
463 new_state: Option<SystemStateSummary>,
464 now: fasync::MonotonicInstant,
465 ctx: &'static str,
466 ) {
467 let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
468 let duration_sec_inspect =
469 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
470 let mut metric_events = vec![];
471
472 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
473
474 if let Some(prev) = &self.state_summary {
475 if prev.system_state.has_interface_up() {
476 metric_events.push(
477 MetricEvent::builder(
478 metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
479 )
480 .as_integer(duration_cobalt.into_micros()),
481 );
482
483 let route_config_dim = convert::convert_route_config(&prev.system_state);
484 let internet_available =
485 convert::convert_yes_no_dim(prev.system_state.has_internet());
486 let gateway_reachable =
487 convert::convert_yes_no_dim(prev.system_state.has_gateway());
488 let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
489 let http_status = if prev.system_state.has_http() {
490 metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
491 } else {
492 metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
493 };
494 if let Some(route_config) = route_config_dim {
496 metric_events.push(
497 MetricEvent::builder(
498 metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
499 )
500 .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
501 route_config,
502 internet_available,
503 gateway_reachable,
504 dns_active,
505 http_status,
506 })
507 .as_integer(duration_cobalt.into_micros()),
508 );
509 }
510
511 if prev.system_state.has_internet() {
512 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
513 }
514 if prev.system_state.has_dns() {
515 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
516 }
517 if prev.system_state.has_http() {
518 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
519 }
520 }
521 }
522
523 if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
524 let previously_reachable =
525 prev.system_state.has_internet() && prev.system_state.has_dns();
526 let now_reachable =
527 new_state.system_state.has_internet() && new_state.system_state.has_dns();
528 if previously_reachable && !now_reachable {
529 let route_config_dim = convert::convert_route_config(&prev.system_state);
530 if let Some(route_config_dim) = route_config_dim {
531 metric_events.push(
532 MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
533 .with_event_code(route_config_dim.as_event_code())
534 .as_occurrence(1),
535 );
536 self.reachability_lost_at = Some((now, route_config_dim));
537 }
538 self.stats.lock().reachability_lost_count.log_value(&1);
539 }
540
541 if !previously_reachable && now_reachable {
542 if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
543 metric_events.push(
544 MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
545 .with_event_code(route_config_dim.as_event_code())
546 .as_integer((now - reachability_lost_at).into_micros()),
547 );
548 }
549 self.reachability_lost_at = None;
550 }
551 }
552
553 if let Some(new_state) = &new_state {
554 self.stats
555 .lock()
556 .ipv4_state
557 .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
558 self.stats
559 .lock()
560 .ipv6_state
561 .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
562 }
563
564 if !metric_events.is_empty() {
565 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
566 }
567
568 if let Some(new_state) = new_state {
569 self.state_summary = Some(new_state);
570 }
571 self.state_last_refreshed_for_cobalt = now;
572 self.state_last_refreshed_for_inspect = now;
573 }
574
575 async fn log_network_config_metrics(
576 &mut self,
577 new_config: Option<NetworkConfig>,
578 now: fasync::MonotonicInstant,
579 ctx: &'static str,
580 ) {
581 let duration = now - self.network_config_last_refreshed;
582 let mut metric_events = vec![];
583
584 if let Some(prev) = &self.network_config {
585 let default_route_dim = convert::convert_default_route(
586 prev.has_default_ipv4_route,
587 prev.has_default_ipv6_route,
588 );
589 if let Some(default_route_dim) = default_route_dim {
591 metric_events.push(
592 MetricEvent::builder(
593 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
594 )
595 .with_event_code(default_route_dim.as_event_code())
596 .as_integer(duration.into_micros()),
597 );
598 metric_events.push(
599 MetricEvent::builder(
600 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
601 )
602 .with_event_code(default_route_dim.as_event_code())
603 .as_occurrence(1),
604 );
605 }
606 }
607
608 if !metric_events.is_empty() {
609 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
610 }
611
612 if let Some(new_config) = new_config {
613 self.network_config = Some(new_config);
614 }
615 self.network_config_last_refreshed = now;
616 }
617
618 pub fn handle_ten_secondly_telemetry(&mut self) {
619 let now = fasync::MonotonicInstant::now();
620 let duration_sec_inspect =
621 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
622
623 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
624
625 if let Some(current) = &self.state_summary {
626 if current.system_state.has_internet() {
627 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
628 }
629 if current.system_state.has_dns() {
630 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
631 }
632 if current.system_state.has_http() {
633 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
634 }
635
636 self.stats
637 .lock()
638 .ipv4_state
639 .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
640 self.stats
641 .lock()
642 .ipv6_state
643 .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
644 }
645 self.state_last_refreshed_for_inspect = now;
646 }
647
648 pub async fn handle_hourly_telemetry(&mut self) {
649 let now = fasync::MonotonicInstant::now();
650 self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
651 self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use crate::{ApplicationState, LinkState};
658
659 use super::*;
660 use fidl::endpoints::create_proxy_and_stream;
661 use fidl_fuchsia_metrics::MetricEventPayload;
662 use fuchsia_inspect::Inspector;
663 use fuchsia_inspect_contrib::id_enum::IdEnum;
664
665 use futures::task::Poll;
666 use std::pin::Pin;
667 use test_case::test_case;
668 use windowed_stats::experimental::clock::Timed;
669 use windowed_stats::experimental::testing::{MockTimeMatrixClient, TimeMatrixCall};
670
671 const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
672
673 #[test]
674 fn test_log_state_change() {
675 let (mut test_helper, mut test_fut) = setup_test();
676
677 let mut update = SystemStateUpdate {
678 system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
679 };
680 test_helper
681 .telemetry_sender
682 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
683
684 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
685
686 update.system_state = IpVersions {
687 ipv4: Some(State {
688 link: LinkState::Internet,
689 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
690 }),
691 ipv6: None,
692 };
693 test_helper
694 .telemetry_sender
695 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
696
697 test_helper.advance_test_fut(&mut test_fut);
698
699 let logged_metrics = test_helper
700 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
701 assert_eq!(logged_metrics.len(), 1);
702 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
703
704 let logged_metrics = test_helper
705 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
706 assert_eq!(logged_metrics.len(), 1);
707 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
708 assert_eq!(
709 logged_metrics[0].event_codes,
710 &[
711 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
712 as u32,
713 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
714 as u32,
715 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
716 as u32,
717 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
718 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
719 as u32,
720 ]
721 );
722
723 test_helper.cobalt_events.clear();
724 test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
725
726 let logged_metrics = test_helper
730 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
731 assert_eq!(logged_metrics.len(), 1);
732 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
733
734 let logged_metrics = test_helper
735 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
736 assert_eq!(logged_metrics.len(), 1);
737 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
738 assert_eq!(
739 logged_metrics[0].event_codes,
740 &[
741 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
742 as u32,
743 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
744 as u32,
745 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
746 as u32,
747 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
749 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
750 as u32,
751 ]
752 );
753 }
754
755 #[test]
756 fn test_log_reachability_lost() {
757 let (mut test_helper, mut test_fut) = setup_test();
758
759 let mut update = SystemStateUpdate {
760 system_state: IpVersions {
761 ipv4: None,
762 ipv6: Some(State {
763 link: LinkState::Internet,
764 application: ApplicationState { dns_resolved: true, ..Default::default() },
765 }),
766 },
767 ..SystemStateUpdate::default()
768 };
769 test_helper
770 .telemetry_sender
771 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
772 test_helper.advance_test_fut(&mut test_fut);
773
774 update.system_state = IpVersions {
775 ipv4: None,
776 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
777 };
778 test_helper
779 .telemetry_sender
780 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
781 test_helper.advance_test_fut(&mut test_fut);
782
783 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
786 assert_eq!(logged_metrics.len(), 1);
787 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
788 assert_eq!(
789 logged_metrics[0].event_codes,
790 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
791 );
792
793 test_helper.cobalt_events.clear();
794
795 update.system_state = IpVersions {
796 ipv4: None,
797 ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
798 };
799 test_helper
800 .telemetry_sender
801 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
802 test_helper.advance_test_fut(&mut test_fut);
803
804 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
807 assert_eq!(logged_metrics.len(), 0);
808
809 test_helper.cobalt_events.clear();
810
811 test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
812 update.system_state = IpVersions {
813 ipv4: Some(State {
814 link: LinkState::Internet,
815 application: ApplicationState { dns_resolved: true, ..Default::default() },
816 }),
817 ipv6: None,
818 };
819 test_helper
820 .telemetry_sender
821 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
822 test_helper.advance_test_fut(&mut test_fut);
823
824 let logged_metrics =
826 test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
827 assert_eq!(logged_metrics.len(), 1);
828 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
829 assert_eq!(
830 logged_metrics[0].event_codes,
831 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
834 );
835 }
836
837 #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
838 #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
839 #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
840 #[test_case(false, false, None; "no default routes")]
841 #[fuchsia::test(add_test_attr = false)]
842 fn test_log_default_route(
843 has_default_ipv4_route: bool,
844 has_default_ipv6_route: bool,
845 expected_dim: Option<
846 metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
847 >,
848 ) {
849 let (mut test_helper, mut test_fut) = setup_test();
850
851 test_helper
852 .telemetry_sender
853 .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
854 test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
855
856 let logged_metrics = test_helper
857 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
858 match expected_dim {
859 Some(dim) => {
860 assert_eq!(logged_metrics.len(), 1);
861 assert_eq!(
862 logged_metrics[0].payload,
863 MetricEventPayload::IntegerValue(3600_000_000)
864 );
865 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
866 }
867 None => {
868 assert_eq!(logged_metrics.len(), 0);
869 }
870 }
871
872 let logged_metrics = test_helper
873 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
874 match expected_dim {
875 Some(dim) => {
876 assert_eq!(logged_metrics.len(), 1);
877 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
878 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
879 }
880 None => {
881 assert_eq!(logged_metrics.len(), 0);
882 }
883 }
884 }
885
886 #[test_case(true, true, vec![]; "negative")]
887 #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
888 #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
889 #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
890 #[fuchsia::test(add_test_attr = false)]
891 fn test_log_abnormal_state_situation(
892 gateway_discoverable: bool,
893 gateway_pingable: bool,
894 expected_metrics: Vec<u32>,
895 ) {
896 let (mut test_helper, mut test_fut) = setup_test();
897
898 test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
899 internet_available: true,
900 gateway_discoverable,
901 gateway_pingable,
902 });
903 test_helper.advance_test_fut(&mut test_fut);
904
905 let logged_metrics: Vec<u32> = [
906 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
907 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
908 metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
909 ]
910 .into_iter()
911 .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
912 .collect();
913
914 assert_eq!(logged_metrics, expected_metrics);
915 }
916
917 #[test]
918 fn test_state_snapshot_duration_inspect_stats() {
919 let (mut test_helper, mut test_fut) = setup_test();
920
921 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
922
923 let time_series = test_helper.get_time_series(&mut test_fut);
924 let internet_available_sec: Vec<_> =
925 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
926 let dns_active_sec: Vec<_> =
927 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
928 let http_active_sec: Vec<_> =
929 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
930 let total_duration_sec: Vec<_> =
931 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
932 assert_eq!(internet_available_sec, vec![0]);
933 assert_eq!(dns_active_sec, vec![0]);
934 assert_eq!(http_active_sec, vec![0]);
935 assert_eq!(total_duration_sec, vec![20]);
936
937 let mut update = SystemStateUpdate {
938 system_state: IpVersions {
939 ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
940 ipv6: None,
941 },
942 };
943 test_helper
944 .telemetry_sender
945 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
946 test_helper.advance_test_fut(&mut test_fut);
947 let time_series = test_helper.get_time_series(&mut test_fut);
948 let total_duration_sec: Vec<_> =
949 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
950 assert_eq!(total_duration_sec, vec![25]);
951
952 test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
953
954 let time_series = test_helper.get_time_series(&mut test_fut);
957 let internet_available_sec: Vec<_> =
958 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
959 let dns_active_sec: Vec<_> =
960 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
961 let http_active_sec: Vec<_> =
962 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
963 let total_duration_sec: Vec<_> =
964 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
965 assert_eq!(internet_available_sec, vec![15]);
966 assert_eq!(dns_active_sec, vec![0]);
967 assert_eq!(http_active_sec, vec![0]);
968 assert_eq!(total_duration_sec, vec![40]);
969
970 update.system_state = IpVersions {
971 ipv4: Some(State {
972 link: LinkState::Internet,
973 application: ApplicationState { dns_resolved: true, ..Default::default() },
974 }),
975 ipv6: None,
976 };
977 test_helper
978 .telemetry_sender
979 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
980
981 test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
982
983 let time_series = test_helper.get_time_series(&mut test_fut);
986 let internet_available_sec: Vec<_> =
987 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
988 let dns_active_sec: Vec<_> =
989 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
990 let http_active_sec: Vec<_> =
991 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
992 let total_duration_sec: Vec<_> =
993 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
994 assert_eq!(internet_available_sec, vec![25, 40]);
995 assert_eq!(dns_active_sec, vec![10, 40]);
996 assert_eq!(http_active_sec, vec![0]);
997 assert_eq!(total_duration_sec, vec![40]);
998
999 update.system_state = IpVersions {
1000 ipv4: Some(State {
1001 link: LinkState::Internet,
1002 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
1003 }),
1004 ipv6: None,
1005 };
1006 test_helper
1007 .telemetry_sender
1008 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1009
1010 test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
1011
1012 let time_series = test_helper.get_time_series(&mut test_fut);
1015 let internet_available_sec: Vec<_> =
1016 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
1017 let dns_active_sec: Vec<_> =
1018 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
1019 let http_active_sec: Vec<_> =
1020 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
1021 let total_duration_sec: Vec<_> =
1022 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
1023 assert_eq!(internet_available_sec, vec![25, 60, 40]);
1024 assert_eq!(dns_active_sec, vec![10, 60, 40]);
1025 assert_eq!(http_active_sec, vec![0, 20, 40]);
1026 assert_eq!(total_duration_sec, vec![40]);
1027 }
1028
1029 #[test]
1030 fn test_reachability_lost_count_inspect_stats() {
1031 let (mut test_helper, mut test_fut) = setup_test();
1032
1033 let mut update = SystemStateUpdate {
1034 system_state: IpVersions {
1035 ipv4: None,
1036 ipv6: Some(State {
1037 link: LinkState::Internet,
1038 application: ApplicationState { dns_resolved: true, ..Default::default() },
1039 }),
1040 },
1041 ..SystemStateUpdate::default()
1042 };
1043 test_helper
1044 .telemetry_sender
1045 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1046 test_helper.advance_test_fut(&mut test_fut);
1047
1048 let time_series = test_helper.get_time_series(&mut test_fut);
1049 let reachability_lost_count: Vec<_> =
1050 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
1051 assert_eq!(reachability_lost_count, vec![0]);
1052
1053 update.system_state = IpVersions {
1054 ipv4: None,
1055 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
1056 };
1057 test_helper
1058 .telemetry_sender
1059 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1060 test_helper.advance_test_fut(&mut test_fut);
1061
1062 let time_series = test_helper.get_time_series(&mut test_fut);
1063 let reachability_lost_count: Vec<_> =
1064 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
1065 assert_eq!(reachability_lost_count, vec![1]);
1066 }
1067
1068 #[test]
1069 fn test_avg_state_inspect_stats() {
1070 let (mut test_helper, mut test_fut) = setup_test();
1071
1072 let mut update = SystemStateUpdate {
1073 system_state: IpVersions {
1074 ipv4: None,
1075 ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
1076 },
1077 ..SystemStateUpdate::default()
1078 };
1079 test_helper
1080 .telemetry_sender
1081 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1082 test_helper.advance_test_fut(&mut test_fut);
1083
1084 let time_series = test_helper.get_time_series(&mut test_fut);
1085 let ipv4_state: Vec<_> =
1086 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1087 let ipv6_state: Vec<_> =
1088 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1089 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
1090 assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
1091
1092 update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
1093 test_helper
1094 .telemetry_sender
1095 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
1096 test_helper.advance_test_fut(&mut test_fut);
1097
1098 let time_series = test_helper.get_time_series(&mut test_fut);
1099 let ipv4_state: Vec<_> =
1100 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
1101 let ipv6_state: Vec<_> =
1102 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
1103 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
1104 assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
1105 }
1106
1107 #[test]
1108 fn test_link_properties_update() {
1109 let (mut test_helper, mut test_fut) = setup_test();
1110
1111 for i in 0..=15 {
1115 let link_properties = LinkProperties::from(i);
1116 let mut expected_data_vec = vec![];
1117 if i != 0 {
1121 expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(i)));
1122 }
1123
1124 let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1127 assert_eq!(
1128 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1129 &[]
1130 );
1131 assert_eq!(
1132 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1133 &[]
1134 );
1135 assert_eq!(
1136 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1137 &[]
1138 );
1139 assert_eq!(
1140 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1141 &[]
1142 );
1143
1144 test_helper.telemetry_sender.send(TelemetryEvent::LinkPropertiesUpdate {
1145 interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1146 link_properties: IpVersions {
1147 ipv4: link_properties.clone(),
1148 ipv6: link_properties,
1149 },
1150 });
1151 test_helper.advance_test_fut(&mut test_fut);
1152
1153 time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1154 assert_eq!(
1155 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_ethernet")[..],
1156 expected_data_vec
1157 );
1158 assert_eq!(
1159 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_ethernet")[..],
1160 expected_data_vec
1161 );
1162 assert_eq!(
1165 &time_matrix_calls.drain::<u64>("link_properties_v4_TYPE_wlanclient")[..],
1166 &[]
1167 );
1168 assert_eq!(
1169 &time_matrix_calls.drain::<u64>("link_properties_v6_TYPE_wlanclient")[..],
1170 &[]
1171 );
1172 }
1173 }
1174
1175 #[test]
1176 fn test_link_state_update() {
1177 let (mut test_helper, mut test_fut) = setup_test();
1178
1179 let initial_value = LinkState::None.to_id() as u64;
1183 let final_value = LinkState::Internet.to_id() as u64;
1184 for i in initial_value..=final_value {
1185 let link_state = i.into();
1186 let mut expected_data_vec = vec![];
1187 if i != initial_value {
1188 expected_data_vec.push(TimeMatrixCall::Fold(Timed::now(1 << i)));
1189 }
1190
1191 let mut time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1194 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..], &[]);
1195 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..], &[]);
1196 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1197 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1198
1199 test_helper.telemetry_sender.send(TelemetryEvent::LinkStateUpdate {
1200 interface_identifiers: vec![InterfaceIdentifier::Type(InterfaceType::Ethernet)],
1201 link_state: IpVersions { ipv4: link_state, ipv6: link_state },
1202 });
1203 test_helper.advance_test_fut(&mut test_fut);
1204
1205 time_matrix_calls = test_helper.mock_time_matrix_client.drain_calls();
1206 assert_eq!(
1207 &time_matrix_calls.drain::<u64>("link_state_v4_TYPE_ethernet")[..],
1208 expected_data_vec
1209 );
1210 assert_eq!(
1211 &time_matrix_calls.drain::<u64>("link_state_v6_TYPE_ethernet")[..],
1212 expected_data_vec
1213 );
1214 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v4_TYPE_wlanclient")[..], &[]);
1217 assert_eq!(&time_matrix_calls.drain::<u64>("link_state_v6_TYPE_wlanclient")[..], &[]);
1218 }
1219 }
1220
1221 struct TestHelper {
1222 telemetry_sender: TelemetrySender,
1223 _inspector: Inspector,
1224 cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
1225 cobalt_events: Vec<MetricEvent>,
1228 mock_time_matrix_client: MockTimeMatrixClient,
1229
1230 exec: fasync::TestExecutor,
1232 }
1233
1234 impl TestHelper {
1235 fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
1237 let mut made_progress = true;
1238 while made_progress {
1239 let _result = self.exec.run_until_stalled(test_fut);
1240 made_progress = false;
1241 while let Poll::Ready(Some(Ok(req))) =
1242 self.exec.run_until_stalled(&mut self.cobalt_stream.next())
1243 {
1244 self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
1245 made_progress = true;
1246 }
1247 }
1248
1249 match self.exec.run_until_stalled(test_fut) {
1250 Poll::Pending => (),
1251 _ => panic!("expect test_fut to resolve to Poll::Pending"),
1252 }
1253 }
1254
1255 fn advance_by<T>(
1259 &mut self,
1260 duration: zx::MonotonicDuration,
1261 test_fut: &mut (impl Future<Output = T> + Unpin),
1262 ) {
1263 assert_eq!(
1264 duration.into_nanos() % STEP_INCREMENT.into_nanos(),
1265 0,
1266 "duration {:?} is not divisible by STEP_INCREMENT",
1267 duration,
1268 );
1269 const_assert_eq!(
1270 TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1271 0
1272 );
1273
1274 self.advance_test_fut(test_fut);
1275
1276 for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1277 self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1278 let _ = self.exec.wake_expired_timers();
1279 self.advance_test_fut(test_fut);
1280 }
1281 }
1282
1283 fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1284 self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1285 }
1286
1287 fn get_time_series<T>(
1288 &mut self,
1289 test_fut: &mut (impl Future<Output = T> + Unpin),
1290 ) -> Arc<Mutex<Stats>> {
1291 let (sender, mut receiver) = oneshot::channel();
1292 self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1293 self.advance_test_fut(test_fut);
1294 match receiver.try_recv() {
1295 Ok(Some(stats)) => stats,
1296 _ => panic!("Expect Stats to be returned"),
1297 }
1298 }
1299 }
1300
1301 trait CobaltExt {
1302 fn respond_to_metric_req(
1304 self,
1305 result: Result<(), fidl_fuchsia_metrics::Error>,
1306 ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1307 }
1308
1309 impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1310 fn respond_to_metric_req(
1311 self,
1312 result: Result<(), fidl_fuchsia_metrics::Error>,
1313 ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1314 match self {
1315 Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1316 assert!(responder.send(result).is_ok());
1317 vec![MetricEvent {
1318 metric_id,
1319 event_codes,
1320 payload: MetricEventPayload::Count(count),
1321 }]
1322 }
1323 Self::LogInteger { metric_id, value, event_codes, responder } => {
1324 assert!(responder.send(result).is_ok());
1325 vec![MetricEvent {
1326 metric_id,
1327 event_codes,
1328 payload: MetricEventPayload::IntegerValue(value),
1329 }]
1330 }
1331 Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1332 assert!(responder.send(result).is_ok());
1333 vec![MetricEvent {
1334 metric_id,
1335 event_codes,
1336 payload: MetricEventPayload::Histogram(histogram),
1337 }]
1338 }
1339 Self::LogString { metric_id, string_value, event_codes, responder } => {
1340 assert!(responder.send(result).is_ok());
1341 vec![MetricEvent {
1342 metric_id,
1343 event_codes,
1344 payload: MetricEventPayload::StringValue(string_value),
1345 }]
1346 }
1347 Self::LogMetricEvents { events, responder } => {
1348 assert!(responder.send(result).is_ok());
1349 events
1350 }
1351 }
1352 }
1353 }
1354
1355 fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = Result<(), Error>>>>) {
1356 let mut exec = fasync::TestExecutor::new_with_fake_time();
1357 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1358
1359 let (cobalt_proxy, cobalt_stream) =
1360 create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1361
1362 let inspector = Inspector::default();
1363 let inspect_node = inspector.root().create_child("telemetrytest");
1364 let inspect_metadata_node = inspect_node.create_child(METADATA_NODE_NAME);
1365 let mock_time_matrix_client = MockTimeMatrixClient::new();
1366
1367 let link_properties_state_logger = LinkPropertiesStateLogger::new(
1368 &inspect_metadata_node,
1369 &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1370 InterfaceTimeSeriesGrouping::Type(vec![
1371 InterfaceType::Ethernet,
1372 InterfaceType::WlanClient,
1373 ]),
1374 &mock_time_matrix_client,
1375 );
1376
1377 let interface_aware_logger_node = inspect_node.create_child("interfaces");
1378 let events_node = interface_aware_logger_node.create_child("events");
1379 let time_series_node = interface_aware_logger_node.create_child("time_series");
1380 let interface_aware_logger = InterfaceAwareLogger::new(
1381 &inspect_metadata_node,
1382 &format!("root/telemetrytest/{METADATA_NODE_NAME}"),
1383 InterfaceTimeSeriesGrouping::Type(vec![
1384 InterfaceType::Ethernet,
1385 InterfaceType::WlanClient,
1386 ]),
1387 events_node,
1388 time_series_node,
1389 );
1390
1391 inspect_node.record(interface_aware_logger_node);
1392
1393 let (telemetry_sender, test_fut) = serve_telemetry_inner(
1394 cobalt_proxy,
1395 inspect_node,
1396 link_properties_state_logger,
1397 interface_aware_logger,
1398 );
1399 let mut test_fut = Box::pin(test_fut);
1400
1401 assert_matches::assert_matches!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1402
1403 let test_helper = TestHelper {
1404 telemetry_sender,
1405 _inspector: inspector,
1406 cobalt_stream,
1407 cobalt_events: vec![],
1408 mock_time_matrix_client,
1409 exec,
1410 };
1411 (test_helper, test_fut)
1412 }
1413}