1use crate::config::SamplerConfig;
6use crate::project::Project;
7use anyhow::Error as AnyhowError;
8use argh::FromArgs;
9use diagnostics_reader::drain_batch_iterator;
10use fidl::endpoints::{ControlHandle, RequestStream, create_endpoints};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_hardware_power_statecontrol::{
13 RebootMethodsWatcherRegisterMarker, RebootWatcherMarker, RebootWatcherRequest,
14};
15use fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker;
16use fuchsia_component_client::connect_to_protocol;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use futures::future::{Either, select};
20use futures::stream::{self, StreamExt};
21use inspect_runtime::publish;
22use itertools::Itertools;
23use log::{info, warn};
24use sampler_component_config::Config;
25use std::sync::Arc;
26
27mod config;
28mod error;
29mod project;
30
31#[derive(Debug, Default, FromArgs, PartialEq)]
33#[argh(subcommand, name = "sampler")]
34pub struct Args {}
35
36pub const PROGRAM_NAME: &str = "sampler";
37
38pub async fn main() -> Result<(), AnyhowError> {
39 info!("Sampler starting up");
40 component::health().set_starting_up();
41
42 let _inspect = publish(component::inspector(), Default::default());
43
44 let execution_stats = component::inspector().root().create_child("sampler_executor_stats");
45 let config = SamplerConfig::new(Config::take_from_startup_handle(), &execution_stats)?;
46
47 let sampler = connect_to_protocol::<fdiagnostics::SampleMarker>()?;
48
49 for chunk in &config
50 .sample_data()
51 .into_iter()
52 .chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize)
53 {
54 sampler.set(&fdiagnostics::SampleParameters {
55 data: Some(chunk.collect()),
56 ..Default::default()
57 })?;
58 }
59
60 let (sample_sink_client, sample_sink_server) =
61 create_endpoints::<fdiagnostics::SampleSinkMarker>();
62
63 if let Err(e) = sampler.commit(sample_sink_client).await? {
64 match e {
65 fdiagnostics::ConfigurationError::SamplePeriodTooSmall => {
66 return Err(anyhow::anyhow!(
67 "Configured sample period was too small, indicating a config bug. Exiting."
68 ));
69 }
70 err => warn!(err:?; "Sampler encountered non-fatal error. Review Archivist's logs."),
71 }
72 }
73
74 let metric_logger_factory = connect_to_protocol::<MetricEventLoggerFactoryMarker>()?;
75
76 let mut projects = futures::stream::iter(config.project_configs)
77 .filter_map(|project_config| async {
78 let project_id = *project_config.project_id;
79 let customer_id = *project_config.customer_id;
80 let stats = config.stats.projects.get(&project_config.project_id);
81 match Project::new(&metric_logger_factory, project_config, stats).await {
82 Ok(project) => Some(project),
83 Err(e) => {
84 warn!(
85 e:?,
86 project_id,
87 customer_id;
88 "Sampler failed to configure a project",
89 );
90 None
91 }
92 }
93 })
94 .collect::<Vec<_>>()
95 .await;
96
97 let (reboot_watcher_client, reboot_watcher_request_stream) =
98 fidl::endpoints::create_request_stream::<RebootWatcherMarker>();
99 let reboot_watcher_register = connect_to_protocol::<RebootMethodsWatcherRegisterMarker>()?;
100 reboot_watcher_register.register_watcher(reboot_watcher_client).await?;
101
102 let sink_stream = sample_sink_server.into_stream();
103 let sample_sink_control = sink_stream.control_handle();
104 let mut sink_stream = sink_stream.fuse();
105 let mut reboot_stream = Either::Left(reboot_watcher_request_stream);
106 let mut shutdown = false;
107
108 component::health().set_ok();
109
110 loop {
111 match select(reboot_stream.next(), sink_stream.next()).await {
112 Either::Left((reboot, _)) => match reboot {
113 Some(Ok(RebootWatcherRequest::OnReboot { responder, .. })) => {
114 shutdown = true;
115 sample_sink_control.send_on_now_or_never()?;
116 responder.send()?;
117 }
118 Some(Err(err)) => {
119 warn!(err:?; "Sampler encountered error on RebootWatcher, data may be missing");
120 }
121 None => {
122 reboot_stream = Either::Right(stream::pending());
123 continue;
124 }
125 },
126 Either::Right((event, _)) => {
127 let Some(Ok(event)) = event else {
128 break;
129 };
130
131 handle_sample_sink_request(event, shutdown, &mut projects).await;
132
133 if shutdown {
134 break;
135 }
136 }
137 }
138 }
139
140 Ok(())
141}
142
143async fn handle_sample_sink_request(
144 event: fdiagnostics::SampleSinkRequest,
145 shutdown: bool,
146 projects: &mut [Project<'_>],
147) {
148 match event {
149 fdiagnostics::SampleSinkRequest::OnSampleReadied {
150 event:
151 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
152 batch_iter: Some(batch_iter),
153 seconds_since_start: Some(seconds_since_start),
154 ..
155 }),
156 control_handle: _control_handle,
157 } => {
158 let data = drain_batch_iterator::<diagnostics_data::InspectData>(Arc::new(
159 batch_iter.into_proxy(),
160 ))
161 .filter_map(|v| async {
162 match v {
163 Ok(v) => Some(v),
164 Err(e) => {
165 warn!(e:?; "Failed to read some Inspect data; skipping");
166 None
167 }
168 }
169 })
170 .collect::<Vec<_>>()
171 .await;
172
173 let seconds_since_start = if shutdown {
174 None
175 } else {
176 Some(zx::MonotonicDuration::from_seconds(seconds_since_start))
177 };
178
179 for project in projects {
180 if let Err(e) = project.log(&data, seconds_since_start).await {
181 warn!(e:?; "Project failed to log");
182 }
183
184 }
188 }
189 fdiagnostics::SampleSinkRequest::OnSampleReadied {
190 event:
191 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
192 batch_iter,
193 seconds_since_start,
194 ..
195 }),
196 control_handle,
197 } => {
198 control_handle.shutdown();
199 warn!(
200 batch_iter:?, seconds_since_start:?;
201 "Sample server sent Ready but crucial fields were None"
202 );
203 }
204 fdiagnostics::SampleSinkRequest::OnSampleReadied {
205 event: fdiagnostics::SampleSinkResult::Error(e),
206 ..
207 } => {
208 warn!(e:?; "Sample server sent an error, data may be missing");
209 }
210 fdiagnostics::SampleSinkRequest::OnSampleReadied {
211 event: fdiagnostics::SampleSinkResult::__SourceBreaking { .. },
212 control_handle,
213 }
214 | fdiagnostics::SampleSinkRequest::_UnknownMethod { control_handle, .. } => {
215 control_handle.shutdown();
216 warn!("Sample server sent a source-breaking or unknown event")
217 }
218 }
219}