Skip to main content

netcfg/telemetry/
mod.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod processors;
6
7use crate::telemetry::processors::network_properties::NetworkPropertiesProcessor;
8use anyhow::Error;
9use fuchsia_inspect::Inspector;
10use fuchsia_sync::Mutex;
11use futures::channel::mpsc;
12use futures::{Future, StreamExt};
13use log::{info, warn};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16
17#[derive(Clone, Debug)]
18pub struct NetworkEventMetadata {
19    pub id: u64,
20    pub name: Option<String>,
21    pub transport: fidl_fuchsia_net_policy_socketproxy::NetworkType,
22    pub is_fuchsia_provisioned: bool,
23}
24
25#[derive(Debug)]
26pub enum TelemetryEvent {
27    DefaultNetworkChanged(NetworkEventMetadata),
28    DefaultNetworkLost,
29}
30
31#[derive(Clone, Debug)]
32pub struct TelemetrySender {
33    sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
34    sender_is_blocked: Arc<AtomicBool>,
35}
36
37impl TelemetrySender {
38    pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
39        Self {
40            sender: Arc::new(Mutex::new(sender)),
41            sender_is_blocked: Arc::new(AtomicBool::new(false)),
42        }
43    }
44
45    pub fn send(&self, event: TelemetryEvent) {
46        match self.sender.lock().try_send(event) {
47            Ok(_) => {
48                if self
49                    .sender_is_blocked
50                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
51                    .is_ok()
52                {
53                    info!("TelemetrySender recovered and resumed sending");
54                }
55            }
56            Err(_) => {
57                if self
58                    .sender_is_blocked
59                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
60                    .is_ok()
61                {
62                    warn!(
63                        "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
64                    );
65                }
66            }
67        }
68    }
69}
70
71const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
72
73pub fn serve_telemetry(
74    inspector: &Inspector,
75) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
76    let inspect_node = inspector.root();
77    let telemetry_node = inspect_node.create_child("telemetry");
78    let time_series_node = telemetry_node.create_child("time_series");
79    let client =
80        windowed_stats::experimental::inspect::TimeMatrixClient::new(time_series_node.clone_weak());
81
82    let processor = NetworkPropertiesProcessor::new(&telemetry_node, "root/telemetry", &client);
83    inspect_node.record(time_series_node);
84    inspect_node.record(telemetry_node);
85
86    let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
87    let sender = TelemetrySender::new(sender);
88
89    let fut = async move {
90        let mut processor = processor;
91        while let Some(event) = receiver.next().await {
92            match event {
93                TelemetryEvent::DefaultNetworkChanged(metadata) => {
94                    processor.log_default_network_changed(metadata);
95                }
96                TelemetryEvent::DefaultNetworkLost => {
97                    processor.log_default_network_lost();
98                }
99            }
100        }
101        Ok(())
102    };
103    (sender, fut)
104}