sampler/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::config::SamplerConfig;
6use crate::project::Project;
7use anyhow::Error as AnyhowError;
8use argh::FromArgs;
9use diagnostics_reader::drain_batch_iterator;
10use fidl::endpoints::{ControlHandle, RequestStream, create_endpoints};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_hardware_power_statecontrol::{
13    RebootMethodsWatcherRegisterMarker, RebootWatcherMarker, RebootWatcherRequest,
14};
15use fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker;
16use fuchsia_component_client::connect_to_protocol;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use futures::future::{Either, select};
20use futures::stream::{self, StreamExt};
21use inspect_runtime::publish;
22use itertools::Itertools;
23use log::{info, warn};
24use sampler_component_config::Config;
25use std::sync::Arc;
26
27mod config;
28mod error;
29mod project;
30
31/// Arguments used to configure sampler.
32#[derive(Debug, Default, FromArgs, PartialEq)]
33#[argh(subcommand, name = "sampler")]
34pub struct Args {}
35
36pub const PROGRAM_NAME: &str = "sampler";
37
38pub async fn main() -> Result<(), AnyhowError> {
39    info!("Sampler starting up");
40    component::health().set_starting_up();
41
42    let _inspect = publish(component::inspector(), Default::default());
43
44    let execution_stats = component::inspector().root().create_child("sampler_executor_stats");
45    let config = SamplerConfig::new(Config::take_from_startup_handle(), &execution_stats)?;
46
47    let sampler = connect_to_protocol::<fdiagnostics::SampleMarker>()?;
48
49    for chunk in &config
50        .sample_data()
51        .into_iter()
52        .chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize)
53    {
54        sampler.set(&fdiagnostics::SampleParameters {
55            data: Some(chunk.collect()),
56            ..Default::default()
57        })?;
58    }
59
60    let (sample_sink_client, sample_sink_server) =
61        create_endpoints::<fdiagnostics::SampleSinkMarker>();
62
63    if let Err(e) = sampler.commit(sample_sink_client).await? {
64        match e {
65            fdiagnostics::ConfigurationError::SamplePeriodTooSmall => {
66                return Err(anyhow::anyhow!(
67                    "Configured sample period was too small, indicating a config bug. Exiting."
68                ));
69            }
70            err => warn!(err:?; "Sampler encountered non-fatal error. Review Archivist's logs."),
71        }
72    }
73
74    let metric_logger_factory = connect_to_protocol::<MetricEventLoggerFactoryMarker>()?;
75
76    let mut projects = futures::stream::iter(config.project_configs)
77        .filter_map(|project_config| async {
78            let project_id = *project_config.project_id;
79            let customer_id = *project_config.customer_id;
80            let stats = config.stats.projects.get(&project_config.project_id);
81            match Project::new(&metric_logger_factory, project_config, stats).await {
82                Ok(project) => Some(project),
83                Err(e) => {
84                    warn!(
85                        e:?,
86                        project_id,
87                        customer_id;
88                        "Sampler failed to configure a project",
89                    );
90                    None
91                }
92            }
93        })
94        .collect::<Vec<_>>()
95        .await;
96
97    let (reboot_watcher_client, reboot_watcher_request_stream) =
98        fidl::endpoints::create_request_stream::<RebootWatcherMarker>();
99    let reboot_watcher_register = connect_to_protocol::<RebootMethodsWatcherRegisterMarker>()?;
100    reboot_watcher_register.register_watcher(reboot_watcher_client).await?;
101
102    let sink_stream = sample_sink_server.into_stream();
103    let sample_sink_control = sink_stream.control_handle();
104    let mut sink_stream = sink_stream.fuse();
105    let mut reboot_stream = Either::Left(reboot_watcher_request_stream);
106    let mut shutdown = false;
107
108    component::health().set_ok();
109
110    loop {
111        match select(reboot_stream.next(), sink_stream.next()).await {
112            Either::Left((reboot, _)) => match reboot {
113                Some(Ok(RebootWatcherRequest::OnReboot { responder, .. })) => {
114                    shutdown = true;
115                    sample_sink_control.send_on_now_or_never()?;
116                    responder.send()?;
117                }
118                Some(Err(err)) => {
119                    warn!(err:?; "Sampler encountered error on RebootWatcher, data may be missing");
120                }
121                None => {
122                    reboot_stream = Either::Right(stream::pending());
123                    continue;
124                }
125            },
126            Either::Right((event, _)) => {
127                let Some(Ok(event)) = event else {
128                    break;
129                };
130
131                handle_sample_sink_request(event, shutdown, &mut projects).await;
132
133                if shutdown {
134                    break;
135                }
136            }
137        }
138    }
139
140    Ok(())
141}
142
143async fn handle_sample_sink_request(
144    event: fdiagnostics::SampleSinkRequest,
145    shutdown: bool,
146    projects: &mut [Project<'_>],
147) {
148    match event {
149        fdiagnostics::SampleSinkRequest::OnSampleReadied {
150            event:
151                fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
152                    batch_iter: Some(batch_iter),
153                    seconds_since_start: Some(seconds_since_start),
154                    ..
155                }),
156            control_handle: _control_handle,
157        } => {
158            let data = drain_batch_iterator::<diagnostics_data::InspectData>(Arc::new(
159                batch_iter.into_proxy(),
160            ))
161            .filter_map(|v| async {
162                match v {
163                    Ok(v) => Some(v),
164                    Err(e) => {
165                        warn!(e:?; "Failed to read some Inspect data; skipping");
166                        None
167                    }
168                }
169            })
170            .collect::<Vec<_>>()
171            .await;
172
173            let seconds_since_start = if shutdown {
174                None
175            } else {
176                Some(zx::MonotonicDuration::from_seconds(seconds_since_start))
177            };
178
179            for project in projects {
180                if let Err(e) = project.log(&data, seconds_since_start).await {
181                    warn!(e:?; "Project failed to log");
182                }
183
184                // TODO: b/440153294 - Update SampleSink to allow removing selectors during
185                // runtime. These selectors are returned by Project::log. That will reduce
186                // the load on Archivist, but is not required for correctness of Sampler.
187            }
188        }
189        fdiagnostics::SampleSinkRequest::OnSampleReadied {
190            event:
191                fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
192                    batch_iter,
193                    seconds_since_start,
194                    ..
195                }),
196            control_handle,
197        } => {
198            control_handle.shutdown();
199            warn!(
200                batch_iter:?, seconds_since_start:?;
201                "Sample server sent Ready but crucial fields were None"
202            );
203        }
204        fdiagnostics::SampleSinkRequest::OnSampleReadied {
205            event: fdiagnostics::SampleSinkResult::Error(e),
206            ..
207        } => {
208            warn!(e:?; "Sample server sent an error, data may be missing");
209        }
210        fdiagnostics::SampleSinkRequest::OnSampleReadied {
211            event: fdiagnostics::SampleSinkResult::__SourceBreaking { .. },
212            control_handle,
213        }
214        | fdiagnostics::SampleSinkRequest::_UnknownMethod { control_handle, .. } => {
215            control_handle.shutdown();
216            warn!("Sample server sent a source-breaking or unknown event")
217        }
218    }
219}