1mod convert;
6mod inspect;
7
8use self::inspect::{inspect_record_stats, Stats};
9use crate::{IpVersions, State};
10
11use anyhow::{format_err, Context, Error};
12use cobalt_client::traits::AsEventCode;
13use fidl_fuchsia_metrics::MetricEvent;
14use fuchsia_cobalt_builders::MetricEventExt;
15use fuchsia_inspect::Node as InspectNode;
16use fuchsia_sync::Mutex;
17use futures::channel::{mpsc, oneshot};
18use futures::{select, Future, StreamExt};
19use log::{info, warn};
20use static_assertions::const_assert_eq;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use windowed_stats::aggregations::SumAndCount;
24use {fuchsia_async as fasync, network_policy_metrics_registry as metrics};
25
26pub async fn create_metrics_logger(
27 factory_proxy: fidl_fuchsia_metrics::MetricEventLoggerFactoryProxy,
28) -> Result<fidl_fuchsia_metrics::MetricEventLoggerProxy, Error> {
29 let (cobalt_proxy, cobalt_server) =
30 fidl::endpoints::create_proxy::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
31
32 let project_spec = fidl_fuchsia_metrics::ProjectSpec {
33 customer_id: None, project_id: Some(metrics::PROJECT_ID),
35 ..Default::default()
36 };
37
38 let status = factory_proxy
39 .create_metric_event_logger(&project_spec, cobalt_server)
40 .await
41 .context("create metrics event logger")?;
42
43 match status {
44 Ok(_) => Ok(cobalt_proxy),
45 Err(err) => Err(format_err!("failed to create metrics event logger: {:?}", err)),
46 }
47}
48
49#[derive(Clone, Debug)]
50pub struct TelemetrySender {
51 sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
52 sender_is_blocked: Arc<AtomicBool>,
53}
54
55impl TelemetrySender {
56 pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
57 Self {
58 sender: Arc::new(Mutex::new(sender)),
59 sender_is_blocked: Arc::new(AtomicBool::new(false)),
60 }
61 }
62
63 pub fn send(&self, event: TelemetryEvent) {
65 match self.sender.lock().try_send(event) {
66 Ok(_) => {
67 if self
69 .sender_is_blocked
70 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
71 .is_ok()
72 {
73 info!("TelemetrySender recovered and resumed sending");
74 }
75 }
76 Err(_) => {
77 if self
79 .sender_is_blocked
80 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
81 .is_ok()
82 {
83 warn!("TelemetrySender dropped a msg: either buffer is full or no receiver is waiting");
84 }
85 }
86 }
87 }
88}
89
90#[derive(Debug, Clone, Default, PartialEq)]
91struct SystemStateSummary {
92 system_state: IpVersions<Option<State>>,
93}
94
95impl SystemStateSummary {
96 fn ipv4_link_state_val(&self) -> u32 {
97 match self.system_state.ipv4 {
98 Some(s) => s.link as u32,
99 None => 0u32,
100 }
101 }
102
103 fn ipv6_link_state_val(&self) -> u32 {
104 match self.system_state.ipv6 {
105 Some(s) => s.link as u32,
106 None => 0u32,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Default, PartialEq)]
112struct NetworkConfig {
113 has_default_ipv4_route: bool,
114 has_default_ipv6_route: bool,
115}
116
117#[derive(Debug, Clone, Default)]
118pub struct SystemStateUpdate {
119 pub(crate) system_state: IpVersions<Option<State>>,
120}
121
122#[derive(Debug)]
123pub enum TelemetryEvent {
124 SystemStateUpdate {
125 update: SystemStateUpdate,
126 },
127 NetworkConfig {
128 has_default_ipv4_route: bool,
129 has_default_ipv6_route: bool,
130 },
131 GatewayProbe {
132 internet_available: bool,
133 gateway_discoverable: bool,
134 gateway_pingable: bool,
135 },
136 GetTimeSeries {
138 sender: oneshot::Sender<Arc<Mutex<Stats>>>,
139 },
140}
141
142const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
145
146const TELEMETRY_QUERY_INTERVAL: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
147
148pub fn serve_telemetry(
149 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
150 inspect_node: InspectNode,
151) -> (TelemetrySender, impl Future<Output = ()>) {
152 let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
153 let sender = TelemetrySender::new(sender);
154 let cloned_sender = sender.clone();
155 let fut = async move {
156 let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
157 const ONE_MINUTE: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(1);
158 const_assert_eq!(ONE_MINUTE.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
159 const INTERVAL_TICKS_PER_MINUTE: u64 =
160 (ONE_MINUTE.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
161 const INTERVAL_TICKS_PER_HR: u64 = INTERVAL_TICKS_PER_MINUTE * 60;
162 let mut interval_tick = 0u64;
163 let mut telemetry = Telemetry::new(cloned_sender, cobalt_proxy, inspect_node);
164 loop {
165 select! {
166 event = receiver.next() => {
167 if let Some(event) = event {
168 telemetry.handle_telemetry_event(event).await;
169 }
170 }
171 _ = report_interval_stream.next() => {
172 telemetry.handle_ten_secondly_telemetry();
173
174 interval_tick += 1;
175 if interval_tick % INTERVAL_TICKS_PER_HR == 0 {
176 telemetry.handle_hourly_telemetry().await;
177 }
178 }
179 }
180 }
181 };
182 (sender, fut)
183}
184
185macro_rules! log_cobalt {
188 ($cobalt_proxy:expr, $method_name:ident, $metric_id:expr, $value:expr, $event_codes:expr $(,)?) => {{
189 let status = $cobalt_proxy.$method_name($metric_id, $value, $event_codes).await;
190 match status {
191 Ok(Ok(())) => (),
192 Ok(Err(e)) => warn!("Failed logging metric: {}, error: {:?}", $metric_id, e),
193 Err(e) => warn!("Failed logging metric: {}, error: {}", $metric_id, e),
194 }
195 }};
196}
197
198macro_rules! log_cobalt_batch {
199 ($cobalt_proxy:expr, $events:expr, $context:expr $(,)?) => {{
200 let status = $cobalt_proxy.log_metric_events($events).await;
201 match status {
202 Ok(Ok(())) => (),
203 Ok(Err(e)) => {
204 warn!("Failed logging batch metrics, context: {}, error: {:?}", $context, e)
205 }
206 Err(e) => warn!("Failed logging batch metrics, context: {}, error: {}", $context, e),
207 }
208 }};
209}
210
211fn round_to_nearest_second(duration: zx::MonotonicDuration) -> i64 {
212 const MILLIS_PER_SEC: i64 = 1000;
213 let millis = duration.into_millis();
214 let rounded_portion = if millis % MILLIS_PER_SEC >= 500 { 1 } else { 0 };
215 millis / MILLIS_PER_SEC + rounded_portion
216}
217
218struct Telemetry {
219 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
220 state_summary: Option<SystemStateSummary>,
221 state_last_refreshed_for_cobalt: fasync::MonotonicInstant,
222 state_last_refreshed_for_inspect: fasync::MonotonicInstant,
223 reachability_lost_at: Option<(
224 fasync::MonotonicInstant,
225 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig,
226 )>,
227 network_config: Option<NetworkConfig>,
228 network_config_last_refreshed: fasync::MonotonicInstant,
229
230 _inspect_node: InspectNode,
231 stats: Arc<Mutex<Stats>>,
232}
233
234impl Telemetry {
235 pub fn new(
236 _telemetry_sender: TelemetrySender,
237 cobalt_proxy: fidl_fuchsia_metrics::MetricEventLoggerProxy,
238 inspect_node: InspectNode,
239 ) -> Self {
240 let stats = Arc::new(Mutex::new(Stats::new()));
241 inspect_record_stats(&inspect_node, "stats", Arc::clone(&stats));
242 Self {
243 cobalt_proxy,
244 state_summary: None,
245 state_last_refreshed_for_cobalt: fasync::MonotonicInstant::now(),
246 state_last_refreshed_for_inspect: fasync::MonotonicInstant::now(),
247 reachability_lost_at: None,
248 network_config: None,
249 network_config_last_refreshed: fasync::MonotonicInstant::now(),
250 _inspect_node: inspect_node,
251 stats,
252 }
253 }
254
255 pub async fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
256 let now = fasync::MonotonicInstant::now();
257 match event {
258 TelemetryEvent::SystemStateUpdate { update: SystemStateUpdate { system_state } } => {
259 let new_state = Some(match &self.state_summary {
260 Some(summary) => SystemStateSummary { system_state, ..summary.clone() },
261 None => SystemStateSummary { system_state, ..SystemStateSummary::default() },
262 });
263 if self.state_summary != new_state {
264 self.log_system_state_metrics(
267 new_state,
268 now,
269 "handle_telemetry_event(TelemetryEvent::SystemStateUpdate)",
270 )
271 .await
272 }
273 }
274 TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route } => {
275 let new_config =
276 Some(NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
277 self.log_network_config_metrics(
278 new_config,
279 now,
280 "handle_telemetry_event(TelemetryEvent::NetworkConfig)",
281 )
282 .await;
283 }
284 TelemetryEvent::GatewayProbe {
285 internet_available,
286 gateway_discoverable,
287 gateway_pingable,
288 } => {
289 if internet_available {
290 let metric_id = match (gateway_discoverable, gateway_pingable) {
291 (true, true) => None,
292 (true, false) => {
293 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID)
294 }
295 (false, true) => {
296 Some(metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID)
297 }
298 (false, false) => Some(metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID),
299 };
300 if let Some(metric_id) = metric_id {
301 log_cobalt!(self.cobalt_proxy, log_occurrence, metric_id, 1, &[]);
302 }
303 }
304 }
305 TelemetryEvent::GetTimeSeries { sender } => {
306 let _result = sender.send(Arc::clone(&self.stats));
307 }
308 }
309 }
310
311 async fn log_system_state_metrics(
312 &mut self,
313 new_state: Option<SystemStateSummary>,
314 now: fasync::MonotonicInstant,
315 ctx: &'static str,
316 ) {
317 let duration_cobalt = now - self.state_last_refreshed_for_cobalt;
318 let duration_sec_inspect =
319 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
320 let mut metric_events = vec![];
321
322 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
323
324 if let Some(prev) = &self.state_summary {
325 if prev.system_state.has_interface_up() {
326 metric_events.push(
327 MetricEvent::builder(
328 metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID,
329 )
330 .as_integer(duration_cobalt.into_micros()),
331 );
332
333 let route_config_dim = convert::convert_route_config(&prev.system_state);
334 let internet_available =
335 convert::convert_yes_no_dim(prev.system_state.has_internet());
336 let gateway_reachable =
337 convert::convert_yes_no_dim(prev.system_state.has_gateway());
338 let dns_active = convert::convert_yes_no_dim(prev.system_state.has_dns());
339 let http_status = if prev.system_state.has_http() {
340 metrics::NetworkPolicyMetricDimensionHttpStatus::HttpOnly
341 } else {
342 metrics::NetworkPolicyMetricDimensionHttpStatus::Neither
343 };
344 if let Some(route_config) = route_config_dim {
346 metric_events.push(
347 MetricEvent::builder(
348 metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID,
349 )
350 .with_event_codes(metrics::ReachabilityGlobalSnapshotDurationEventCodes {
351 route_config,
352 internet_available,
353 gateway_reachable,
354 dns_active,
355 http_status,
356 })
357 .as_integer(duration_cobalt.into_micros()),
358 );
359 }
360
361 if prev.system_state.has_internet() {
362 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
363 }
364 if prev.system_state.has_dns() {
365 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
366 }
367 if prev.system_state.has_http() {
368 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
369 }
370 }
371 }
372
373 if let (Some(prev), Some(new_state)) = (&self.state_summary, &new_state) {
374 let previously_reachable =
375 prev.system_state.has_internet() && prev.system_state.has_dns();
376 let now_reachable =
377 new_state.system_state.has_internet() && new_state.system_state.has_dns();
378 if previously_reachable && !now_reachable {
379 let route_config_dim = convert::convert_route_config(&prev.system_state);
380 if let Some(route_config_dim) = route_config_dim {
381 metric_events.push(
382 MetricEvent::builder(metrics::REACHABILITY_LOST_METRIC_ID)
383 .with_event_code(route_config_dim.as_event_code())
384 .as_occurrence(1),
385 );
386 self.reachability_lost_at = Some((now, route_config_dim));
387 }
388 self.stats.lock().reachability_lost_count.log_value(&1);
389 }
390
391 if !previously_reachable && now_reachable {
392 if let Some((reachability_lost_at, route_config_dim)) = self.reachability_lost_at {
393 metric_events.push(
394 MetricEvent::builder(metrics::REACHABILITY_LOST_DURATION_METRIC_ID)
395 .with_event_code(route_config_dim.as_event_code())
396 .as_integer((now - reachability_lost_at).into_micros()),
397 );
398 }
399 self.reachability_lost_at = None;
400 }
401 }
402
403 if let Some(new_state) = &new_state {
404 self.stats
405 .lock()
406 .ipv4_state
407 .log_value(&SumAndCount { sum: new_state.ipv4_link_state_val(), count: 1 });
408 self.stats
409 .lock()
410 .ipv6_state
411 .log_value(&SumAndCount { sum: new_state.ipv6_link_state_val(), count: 1 });
412 }
413
414 if !metric_events.is_empty() {
415 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
416 }
417
418 if let Some(new_state) = new_state {
419 self.state_summary = Some(new_state);
420 }
421 self.state_last_refreshed_for_cobalt = now;
422 self.state_last_refreshed_for_inspect = now;
423 }
424
425 async fn log_network_config_metrics(
426 &mut self,
427 new_config: Option<NetworkConfig>,
428 now: fasync::MonotonicInstant,
429 ctx: &'static str,
430 ) {
431 let duration = now - self.network_config_last_refreshed;
432 let mut metric_events = vec![];
433
434 if let Some(prev) = &self.network_config {
435 let default_route_dim = convert::convert_default_route(
436 prev.has_default_ipv4_route,
437 prev.has_default_ipv6_route,
438 );
439 if let Some(default_route_dim) = default_route_dim {
441 metric_events.push(
442 MetricEvent::builder(
443 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID,
444 )
445 .with_event_code(default_route_dim.as_event_code())
446 .as_integer(duration.into_micros()),
447 );
448 metric_events.push(
449 MetricEvent::builder(
450 metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID,
451 )
452 .with_event_code(default_route_dim.as_event_code())
453 .as_occurrence(1),
454 );
455 }
456 }
457
458 if !metric_events.is_empty() {
459 log_cobalt_batch!(self.cobalt_proxy, &metric_events, ctx);
460 }
461
462 if let Some(new_config) = new_config {
463 self.network_config = Some(new_config);
464 }
465 self.network_config_last_refreshed = now;
466 }
467
468 pub fn handle_ten_secondly_telemetry(&mut self) {
469 let now = fasync::MonotonicInstant::now();
470 let duration_sec_inspect =
471 round_to_nearest_second(now - self.state_last_refreshed_for_inspect) as i32;
472
473 self.stats.lock().total_duration_sec.log_value(&duration_sec_inspect);
474
475 if let Some(current) = &self.state_summary {
476 if current.system_state.has_internet() {
477 self.stats.lock().internet_available_sec.log_value(&duration_sec_inspect);
478 }
479 if current.system_state.has_dns() {
480 self.stats.lock().dns_active_sec.log_value(&duration_sec_inspect);
481 }
482 if current.system_state.has_http() {
483 self.stats.lock().http_active_sec.log_value(&duration_sec_inspect);
484 }
485
486 self.stats
487 .lock()
488 .ipv4_state
489 .log_value(&SumAndCount { sum: current.ipv4_link_state_val(), count: 1 });
490 self.stats
491 .lock()
492 .ipv6_state
493 .log_value(&SumAndCount { sum: current.ipv6_link_state_val(), count: 1 });
494 }
495 self.state_last_refreshed_for_inspect = now;
496 }
497
498 pub async fn handle_hourly_telemetry(&mut self) {
499 let now = fasync::MonotonicInstant::now();
500 self.log_system_state_metrics(None, now, "handle_hourly_telemetry").await;
501 self.log_network_config_metrics(None, now, "handle_hourly_telemetry").await;
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use crate::{ApplicationState, LinkState};
508
509 use super::*;
510 use fidl::endpoints::create_proxy_and_stream;
511 use fidl_fuchsia_metrics::MetricEventPayload;
512 use fuchsia_inspect::Inspector;
513
514 use futures::task::Poll;
515 use std::pin::Pin;
516 use test_case::test_case;
517
518 const STEP_INCREMENT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(1);
519
520 #[test]
521 fn test_log_state_change() {
522 let (mut test_helper, mut test_fut) = setup_test();
523
524 let mut update = SystemStateUpdate {
525 system_state: IpVersions { ipv4: Some(LinkState::Internet.into()), ipv6: None },
526 };
527 test_helper
528 .telemetry_sender
529 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
530
531 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
532
533 update.system_state = IpVersions {
534 ipv4: Some(State {
535 link: LinkState::Internet,
536 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
537 }),
538 ipv6: None,
539 };
540 test_helper
541 .telemetry_sender
542 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
543
544 test_helper.advance_test_fut(&mut test_fut);
545
546 let logged_metrics = test_helper
547 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
548 assert_eq!(logged_metrics.len(), 1);
549 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
550
551 let logged_metrics = test_helper
552 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
553 assert_eq!(logged_metrics.len(), 1);
554 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(25_000_000));
555 assert_eq!(
556 logged_metrics[0].event_codes,
557 &[
558 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
559 as u32,
560 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
561 as u32,
562 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
563 as u32,
564 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::No as u32,
565 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::Neither
566 as u32,
567 ]
568 );
569
570 test_helper.cobalt_events.clear();
571 test_helper.advance_by(zx::MonotonicDuration::from_seconds(3575), &mut test_fut);
572
573 let logged_metrics = test_helper
577 .get_logged_metrics(metrics::REACHABILITY_STATE_UP_OR_ABOVE_DURATION_METRIC_ID);
578 assert_eq!(logged_metrics.len(), 1);
579 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
580
581 let logged_metrics = test_helper
582 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_SNAPSHOT_DURATION_METRIC_ID);
583 assert_eq!(logged_metrics.len(), 1);
584 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(3575_000_000));
585 assert_eq!(
586 logged_metrics[0].event_codes,
587 &[
588 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionRouteConfig::Ipv4Only
589 as u32,
590 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionInternetAvailable::Yes
591 as u32,
592 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionGatewayReachable::Yes
593 as u32,
594 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionDnsActive::Yes as u32,
596 metrics::ReachabilityGlobalSnapshotDurationMetricDimensionHttpStatus::HttpOnly
597 as u32,
598 ]
599 );
600 }
601
602 #[test]
603 fn test_log_reachability_lost() {
604 let (mut test_helper, mut test_fut) = setup_test();
605
606 let mut update = SystemStateUpdate {
607 system_state: IpVersions {
608 ipv4: None,
609 ipv6: Some(State {
610 link: LinkState::Internet,
611 application: ApplicationState { dns_resolved: true, ..Default::default() },
612 }),
613 },
614 ..SystemStateUpdate::default()
615 };
616 test_helper
617 .telemetry_sender
618 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
619 test_helper.advance_test_fut(&mut test_fut);
620
621 update.system_state = IpVersions {
622 ipv4: None,
623 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
624 };
625 test_helper
626 .telemetry_sender
627 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
628 test_helper.advance_test_fut(&mut test_fut);
629
630 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
633 assert_eq!(logged_metrics.len(), 1);
634 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
635 assert_eq!(
636 logged_metrics[0].event_codes,
637 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
638 );
639
640 test_helper.cobalt_events.clear();
641
642 update.system_state = IpVersions {
643 ipv4: None,
644 ipv6: Some(State { link: LinkState::Down, ..Default::default() }),
645 };
646 test_helper
647 .telemetry_sender
648 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
649 test_helper.advance_test_fut(&mut test_fut);
650
651 let logged_metrics = test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_METRIC_ID);
654 assert_eq!(logged_metrics.len(), 0);
655
656 test_helper.cobalt_events.clear();
657
658 test_helper.advance_by(zx::MonotonicDuration::from_hours(2), &mut test_fut);
659 update.system_state = IpVersions {
660 ipv4: Some(State {
661 link: LinkState::Internet,
662 application: ApplicationState { dns_resolved: true, ..Default::default() },
663 }),
664 ipv6: None,
665 };
666 test_helper
667 .telemetry_sender
668 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
669 test_helper.advance_test_fut(&mut test_fut);
670
671 let logged_metrics =
673 test_helper.get_logged_metrics(metrics::REACHABILITY_LOST_DURATION_METRIC_ID);
674 assert_eq!(logged_metrics.len(), 1);
675 assert_eq!(logged_metrics[0].payload, MetricEventPayload::IntegerValue(7200_000_000));
676 assert_eq!(
677 logged_metrics[0].event_codes,
678 &[metrics::ReachabilityLostMetricDimensionRouteConfig::Ipv6Only as u32]
681 );
682 }
683
684 #[test_case(true, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Ipv6); "ipv4+ipv6 default routes")]
685 #[test_case(true, false, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv4Only); "Ipv4Only default routes")]
686 #[test_case(false, true, Some(metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute::Ipv6Only); "Ipv6Only default routes")]
687 #[test_case(false, false, None; "no default routes")]
688 #[fuchsia::test(add_test_attr = false)]
689 fn test_log_default_route(
690 has_default_ipv4_route: bool,
691 has_default_ipv6_route: bool,
692 expected_dim: Option<
693 metrics::ReachabilityGlobalDefaultRouteDurationMetricDimensionDefaultRoute,
694 >,
695 ) {
696 let (mut test_helper, mut test_fut) = setup_test();
697
698 test_helper
699 .telemetry_sender
700 .send(TelemetryEvent::NetworkConfig { has_default_ipv4_route, has_default_ipv6_route });
701 test_helper.advance_by(zx::MonotonicDuration::from_hours(1), &mut test_fut);
702
703 let logged_metrics = test_helper
704 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_DURATION_METRIC_ID);
705 match expected_dim {
706 Some(dim) => {
707 assert_eq!(logged_metrics.len(), 1);
708 assert_eq!(
709 logged_metrics[0].payload,
710 MetricEventPayload::IntegerValue(3600_000_000)
711 );
712 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
713 }
714 None => {
715 assert_eq!(logged_metrics.len(), 0);
716 }
717 }
718
719 let logged_metrics = test_helper
720 .get_logged_metrics(metrics::REACHABILITY_GLOBAL_DEFAULT_ROUTE_OCCURRENCE_METRIC_ID);
721 match expected_dim {
722 Some(dim) => {
723 assert_eq!(logged_metrics.len(), 1);
724 assert_eq!(logged_metrics[0].payload, MetricEventPayload::Count(1));
725 assert_eq!(logged_metrics[0].event_codes, &[dim as u32]);
726 }
727 None => {
728 assert_eq!(logged_metrics.len(), 0);
729 }
730 }
731 }
732
733 #[test_case(true, true, vec![]; "negative")]
734 #[test_case(true, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID]; "gateway_discoverable_but_not_pingable")]
735 #[test_case(false, true, vec![metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID]; "gateway_pingable_but_not_discoverable")]
736 #[test_case(false, false, vec![metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID]; "gateway_neither_dicoverable_nor_pingable")]
737 #[fuchsia::test(add_test_attr = false)]
738 fn test_log_abnormal_state_situation(
739 gateway_discoverable: bool,
740 gateway_pingable: bool,
741 expected_metrics: Vec<u32>,
742 ) {
743 let (mut test_helper, mut test_fut) = setup_test();
744
745 test_helper.telemetry_sender.send(TelemetryEvent::GatewayProbe {
746 internet_available: true,
747 gateway_discoverable,
748 gateway_pingable,
749 });
750 test_helper.advance_test_fut(&mut test_fut);
751
752 let logged_metrics: Vec<u32> = [
753 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_PINGABLE_METRIC_ID,
754 metrics::INTERNET_AVAILABLE_GATEWAY_NOT_DISCOVERABLE_METRIC_ID,
755 metrics::INTERNET_AVAILABLE_GATEWAY_LOST_METRIC_ID,
756 ]
757 .into_iter()
758 .filter(|id| test_helper.get_logged_metrics(*id).len() > 0)
759 .collect();
760
761 assert_eq!(logged_metrics, expected_metrics);
762 }
763
764 #[test]
765 fn test_state_snapshot_duration_inspect_stats() {
766 let (mut test_helper, mut test_fut) = setup_test();
767
768 test_helper.advance_by(zx::MonotonicDuration::from_seconds(25), &mut test_fut);
769
770 let time_series = test_helper.get_time_series(&mut test_fut);
771 let internet_available_sec: Vec<_> =
772 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
773 let dns_active_sec: Vec<_> =
774 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
775 let http_active_sec: Vec<_> =
776 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
777 let total_duration_sec: Vec<_> =
778 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
779 assert_eq!(internet_available_sec, vec![0]);
780 assert_eq!(dns_active_sec, vec![0]);
781 assert_eq!(http_active_sec, vec![0]);
782 assert_eq!(total_duration_sec, vec![20]);
783
784 let mut update = SystemStateUpdate {
785 system_state: IpVersions {
786 ipv4: Some(State { link: LinkState::Internet, ..Default::default() }),
787 ipv6: None,
788 },
789 };
790 test_helper
791 .telemetry_sender
792 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
793 test_helper.advance_test_fut(&mut test_fut);
794 let time_series = test_helper.get_time_series(&mut test_fut);
795 let total_duration_sec: Vec<_> =
796 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
797 assert_eq!(total_duration_sec, vec![25]);
798
799 test_helper.advance_by(zx::MonotonicDuration::from_seconds(15), &mut test_fut);
800
801 let time_series = test_helper.get_time_series(&mut test_fut);
804 let internet_available_sec: Vec<_> =
805 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
806 let dns_active_sec: Vec<_> =
807 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
808 let http_active_sec: Vec<_> =
809 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
810 let total_duration_sec: Vec<_> =
811 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
812 assert_eq!(internet_available_sec, vec![15]);
813 assert_eq!(dns_active_sec, vec![0]);
814 assert_eq!(http_active_sec, vec![0]);
815 assert_eq!(total_duration_sec, vec![40]);
816
817 update.system_state = IpVersions {
818 ipv4: Some(State {
819 link: LinkState::Internet,
820 application: ApplicationState { dns_resolved: true, ..Default::default() },
821 }),
822 ipv6: None,
823 };
824 test_helper
825 .telemetry_sender
826 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
827
828 test_helper.advance_by(zx::MonotonicDuration::from_seconds(50), &mut test_fut);
829
830 let time_series = test_helper.get_time_series(&mut test_fut);
833 let internet_available_sec: Vec<_> =
834 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
835 let dns_active_sec: Vec<_> =
836 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
837 let http_active_sec: Vec<_> =
838 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
839 let total_duration_sec: Vec<_> =
840 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
841 assert_eq!(internet_available_sec, vec![25, 40]);
842 assert_eq!(dns_active_sec, vec![10, 40]);
843 assert_eq!(http_active_sec, vec![0]);
844 assert_eq!(total_duration_sec, vec![40]);
845
846 update.system_state = IpVersions {
847 ipv4: Some(State {
848 link: LinkState::Internet,
849 application: ApplicationState { dns_resolved: true, http_fetch_succeeded: true },
850 }),
851 ipv6: None,
852 };
853 test_helper
854 .telemetry_sender
855 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
856
857 test_helper.advance_by(zx::MonotonicDuration::from_seconds(60), &mut test_fut);
858
859 let time_series = test_helper.get_time_series(&mut test_fut);
862 let internet_available_sec: Vec<_> =
863 time_series.lock().internet_available_sec.minutely_iter().map(|v| *v).collect();
864 let dns_active_sec: Vec<_> =
865 time_series.lock().dns_active_sec.minutely_iter().map(|v| *v).collect();
866 let http_active_sec: Vec<_> =
867 time_series.lock().http_active_sec.minutely_iter().map(|v| *v).collect();
868 let total_duration_sec: Vec<_> =
869 time_series.lock().total_duration_sec.minutely_iter().map(|v| *v).collect();
870 assert_eq!(internet_available_sec, vec![25, 60, 40]);
871 assert_eq!(dns_active_sec, vec![10, 60, 40]);
872 assert_eq!(http_active_sec, vec![0, 20, 40]);
873 assert_eq!(total_duration_sec, vec![40]);
874 }
875
876 #[test]
877 fn test_reachability_lost_count_inspect_stats() {
878 let (mut test_helper, mut test_fut) = setup_test();
879
880 let mut update = SystemStateUpdate {
881 system_state: IpVersions {
882 ipv4: None,
883 ipv6: Some(State {
884 link: LinkState::Internet,
885 application: ApplicationState { dns_resolved: true, ..Default::default() },
886 }),
887 },
888 ..SystemStateUpdate::default()
889 };
890 test_helper
891 .telemetry_sender
892 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
893 test_helper.advance_test_fut(&mut test_fut);
894
895 let time_series = test_helper.get_time_series(&mut test_fut);
896 let reachability_lost_count: Vec<_> =
897 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
898 assert_eq!(reachability_lost_count, vec![0]);
899
900 update.system_state = IpVersions {
901 ipv4: None,
902 ipv6: Some(State { link: LinkState::Internet, ..Default::default() }),
903 };
904 test_helper
905 .telemetry_sender
906 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
907 test_helper.advance_test_fut(&mut test_fut);
908
909 let time_series = test_helper.get_time_series(&mut test_fut);
910 let reachability_lost_count: Vec<_> =
911 time_series.lock().reachability_lost_count.minutely_iter().map(|v| *v).collect();
912 assert_eq!(reachability_lost_count, vec![1]);
913 }
914
915 #[test]
916 fn test_avg_state_inspect_stats() {
917 let (mut test_helper, mut test_fut) = setup_test();
918
919 let mut update = SystemStateUpdate {
920 system_state: IpVersions {
921 ipv4: None,
922 ipv6: Some(State { link: LinkState::Gateway, ..Default::default() }),
923 },
924 ..SystemStateUpdate::default()
925 };
926 test_helper
927 .telemetry_sender
928 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
929 test_helper.advance_test_fut(&mut test_fut);
930
931 let time_series = test_helper.get_time_series(&mut test_fut);
932 let ipv4_state: Vec<_> =
933 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
934 let ipv6_state: Vec<_> =
935 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
936 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 1 }]);
937 assert_eq!(ipv6_state, vec![SumAndCount { sum: 25, count: 1 }]);
938
939 update.system_state.ipv6 = Some(State { link: LinkState::Internet, ..Default::default() });
940 test_helper
941 .telemetry_sender
942 .send(TelemetryEvent::SystemStateUpdate { update: update.clone() });
943 test_helper.advance_test_fut(&mut test_fut);
944
945 let time_series = test_helper.get_time_series(&mut test_fut);
946 let ipv4_state: Vec<_> =
947 time_series.lock().ipv4_state.minutely_iter().map(|v| *v).collect();
948 let ipv6_state: Vec<_> =
949 time_series.lock().ipv6_state.minutely_iter().map(|v| *v).collect();
950 assert_eq!(ipv4_state, vec![SumAndCount { sum: 0, count: 2 }]);
951 assert_eq!(ipv6_state, vec![SumAndCount { sum: 55, count: 2 }]);
952 }
953
954 struct TestHelper {
955 telemetry_sender: TelemetrySender,
956 _inspector: Inspector,
957 cobalt_stream: fidl_fuchsia_metrics::MetricEventLoggerRequestStream,
958 cobalt_events: Vec<MetricEvent>,
961
962 exec: fasync::TestExecutor,
964 }
965
966 impl TestHelper {
967 fn advance_test_fut<T>(&mut self, test_fut: &mut (impl Future<Output = T> + Unpin)) {
969 let mut made_progress = true;
970 while made_progress {
971 let _result = self.exec.run_until_stalled(test_fut);
972 made_progress = false;
973 while let Poll::Ready(Some(Ok(req))) =
974 self.exec.run_until_stalled(&mut self.cobalt_stream.next())
975 {
976 self.cobalt_events.append(&mut req.respond_to_metric_req(Ok(())));
977 made_progress = true;
978 }
979 }
980
981 match self.exec.run_until_stalled(test_fut) {
982 Poll::Pending => (),
983 _ => panic!("expect test_fut to resolve to Poll::Pending"),
984 }
985 }
986
987 fn advance_by<T>(
991 &mut self,
992 duration: zx::MonotonicDuration,
993 test_fut: &mut (impl Future<Output = T> + Unpin),
994 ) {
995 assert_eq!(
996 duration.into_nanos() % STEP_INCREMENT.into_nanos(),
997 0,
998 "duration {:?} is not divisible by STEP_INCREMENT",
999 duration,
1000 );
1001 const_assert_eq!(
1002 TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
1003 0
1004 );
1005
1006 self.advance_test_fut(test_fut);
1007
1008 for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
1009 self.exec.set_fake_time(fasync::MonotonicInstant::after(STEP_INCREMENT));
1010 let _ = self.exec.wake_expired_timers();
1011 self.advance_test_fut(test_fut);
1012 }
1013 }
1014
1015 fn get_logged_metrics(&self, metric_id: u32) -> Vec<MetricEvent> {
1016 self.cobalt_events.iter().filter(|ev| ev.metric_id == metric_id).cloned().collect()
1017 }
1018
1019 fn get_time_series<T>(
1020 &mut self,
1021 test_fut: &mut (impl Future<Output = T> + Unpin),
1022 ) -> Arc<Mutex<Stats>> {
1023 let (sender, mut receiver) = oneshot::channel();
1024 self.telemetry_sender.send(TelemetryEvent::GetTimeSeries { sender });
1025 self.advance_test_fut(test_fut);
1026 match receiver.try_recv() {
1027 Ok(Some(stats)) => stats,
1028 _ => panic!("Expect Stats to be returned"),
1029 }
1030 }
1031 }
1032
1033 trait CobaltExt {
1034 fn respond_to_metric_req(
1036 self,
1037 result: Result<(), fidl_fuchsia_metrics::Error>,
1038 ) -> Vec<fidl_fuchsia_metrics::MetricEvent>;
1039 }
1040
1041 impl CobaltExt for fidl_fuchsia_metrics::MetricEventLoggerRequest {
1042 fn respond_to_metric_req(
1043 self,
1044 result: Result<(), fidl_fuchsia_metrics::Error>,
1045 ) -> Vec<fidl_fuchsia_metrics::MetricEvent> {
1046 match self {
1047 Self::LogOccurrence { metric_id, count, event_codes, responder } => {
1048 assert!(responder.send(result).is_ok());
1049 vec![MetricEvent {
1050 metric_id,
1051 event_codes,
1052 payload: MetricEventPayload::Count(count),
1053 }]
1054 }
1055 Self::LogInteger { metric_id, value, event_codes, responder } => {
1056 assert!(responder.send(result).is_ok());
1057 vec![MetricEvent {
1058 metric_id,
1059 event_codes,
1060 payload: MetricEventPayload::IntegerValue(value),
1061 }]
1062 }
1063 Self::LogIntegerHistogram { metric_id, histogram, event_codes, responder } => {
1064 assert!(responder.send(result).is_ok());
1065 vec![MetricEvent {
1066 metric_id,
1067 event_codes,
1068 payload: MetricEventPayload::Histogram(histogram),
1069 }]
1070 }
1071 Self::LogString { metric_id, string_value, event_codes, responder } => {
1072 assert!(responder.send(result).is_ok());
1073 vec![MetricEvent {
1074 metric_id,
1075 event_codes,
1076 payload: MetricEventPayload::StringValue(string_value),
1077 }]
1078 }
1079 Self::LogMetricEvents { events, responder } => {
1080 assert!(responder.send(result).is_ok());
1081 events
1082 }
1083 }
1084 }
1085 }
1086
1087 fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = ()>>>) {
1088 let mut exec = fasync::TestExecutor::new_with_fake_time();
1089 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(0));
1090
1091 let (cobalt_proxy, cobalt_stream) =
1092 create_proxy_and_stream::<fidl_fuchsia_metrics::MetricEventLoggerMarker>();
1093
1094 let inspector = Inspector::default();
1095 let inspect_node = inspector.root().create_child("telemetry");
1096
1097 let (telemetry_sender, test_fut) = serve_telemetry(cobalt_proxy, inspect_node);
1098 let mut test_fut = Box::pin(test_fut);
1099
1100 assert_eq!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
1101
1102 let test_helper = TestHelper {
1103 telemetry_sender,
1104 _inspector: inspector,
1105 cobalt_stream,
1106 cobalt_events: vec![],
1107 exec,
1108 };
1109 (test_helper, test_fut)
1110 }
1111}