1pub mod processors;
6
7use crate::telemetry::processors::network_properties::NetworkPropertiesProcessor;
8use anyhow::Error;
9use fuchsia_inspect::Inspector;
10use fuchsia_sync::Mutex;
11use futures::channel::mpsc;
12use futures::{Future, StreamExt};
13use log::{info, warn};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16
17#[derive(Clone, Debug)]
18pub struct NetworkEventMetadata {
19 pub id: u64,
20 pub name: Option<String>,
21 pub transport: fidl_fuchsia_net_policy_socketproxy::NetworkType,
22 pub is_fuchsia_provisioned: bool,
23}
24
25#[derive(Debug)]
26pub enum TelemetryEvent {
27 DefaultNetworkChanged(NetworkEventMetadata),
28 DefaultNetworkLost,
29}
30
31#[derive(Clone, Debug)]
32pub struct TelemetrySender {
33 sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
34 sender_is_blocked: Arc<AtomicBool>,
35}
36
37impl TelemetrySender {
38 pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
39 Self {
40 sender: Arc::new(Mutex::new(sender)),
41 sender_is_blocked: Arc::new(AtomicBool::new(false)),
42 }
43 }
44
45 pub fn send(&self, event: TelemetryEvent) {
46 match self.sender.lock().try_send(event) {
47 Ok(_) => {
48 if self
49 .sender_is_blocked
50 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
51 .is_ok()
52 {
53 info!("TelemetrySender recovered and resumed sending");
54 }
55 }
56 Err(_) => {
57 if self
58 .sender_is_blocked
59 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
60 .is_ok()
61 {
62 warn!(
63 "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
64 );
65 }
66 }
67 }
68 }
69}
70
71const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
72
73pub fn serve_telemetry(
74 inspector: &Inspector,
75) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
76 let inspect_node = inspector.root();
77 let telemetry_node = inspect_node.create_child("telemetry");
78 let time_series_node = telemetry_node.create_child("time_series");
79 let client =
80 windowed_stats::experimental::inspect::TimeMatrixClient::new(time_series_node.clone_weak());
81
82 let processor = NetworkPropertiesProcessor::new(&telemetry_node, "root/telemetry", &client);
83 inspect_node.record(time_series_node);
84 inspect_node.record(telemetry_node);
85
86 let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
87 let sender = TelemetrySender::new(sender);
88
89 let fut = async move {
90 let mut processor = processor;
91 while let Some(event) = receiver.next().await {
92 match event {
93 TelemetryEvent::DefaultNetworkChanged(metadata) => {
94 processor.log_default_network_changed(metadata);
95 }
96 TelemetryEvent::DefaultNetworkLost => {
97 processor.log_default_network_lost();
98 }
99 }
100 }
101 Ok(())
102 };
103 (sender, fut)
104}