1use crate::config::SamplerConfig;
6use crate::project::Project;
7use anyhow::Error as AnyhowError;
8use argh::FromArgs;
9use diagnostics_reader::drain_batch_iterator;
10use fidl::endpoints::{ControlHandle, RequestStream, create_endpoints};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_hardware_power_statecontrol::{
13 ShutdownWatcherMarker, ShutdownWatcherRegisterMarker, ShutdownWatcherRequest,
14};
15use fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker;
16use fuchsia_component_client::connect_to_protocol;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use futures::future::{Either, select};
20use futures::stream::{self, StreamExt};
21use inspect_runtime::publish;
22use itertools::Itertools;
23use log::{info, warn};
24use sampler_component_config::Config;
25use std::sync::Arc;
26
27mod config;
28mod error;
29mod project;
30
31#[derive(Debug, Default, FromArgs, PartialEq)]
33#[argh(subcommand, name = "sampler")]
34pub struct Args {}
35
36pub const PROGRAM_NAME: &str = "sampler";
37
38pub async fn main() -> Result<(), AnyhowError> {
39 info!("Sampler starting up");
40 component::health().set_starting_up();
41
42 let _inspect = publish(component::inspector(), Default::default());
43
44 let execution_stats = component::inspector().root().create_child("sampler_executor_stats");
45 let config = SamplerConfig::new(Config::take_from_startup_handle(), &execution_stats)?;
46
47 let sampler = connect_to_protocol::<fdiagnostics::SampleMarker>()?;
48
49 for chunk in &config
50 .sample_data()
51 .into_iter()
52 .chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize)
53 {
54 sampler.set(&fdiagnostics::SampleParameters {
55 data: Some(chunk.collect()),
56 ..Default::default()
57 })?;
58 }
59
60 let (sample_sink_client, sample_sink_server) =
61 create_endpoints::<fdiagnostics::SampleSinkMarker>();
62
63 if let Err(e) = sampler.commit(sample_sink_client).await? {
64 match e {
65 fdiagnostics::ConfigurationError::SamplePeriodTooSmall => {
66 return Err(anyhow::anyhow!(
67 "Configured sample period was too small, indicating a config bug. Exiting."
68 ));
69 }
70 err => warn!(err:?; "Sampler encountered non-fatal error. Review Archivist's logs."),
71 }
72 }
73
74 let metric_logger_factory = connect_to_protocol::<MetricEventLoggerFactoryMarker>()?;
75
76 let mut projects = futures::stream::iter(config.project_configs)
77 .filter_map(|project_config| async {
78 let project_id = *project_config.project_id;
79 let customer_id = *project_config.customer_id;
80 let stats = config.stats.projects.get(&project_config.project_id);
81 match Project::new(&metric_logger_factory, project_config, stats).await {
82 Ok(project) => Some(project),
83 Err(e) => {
84 warn!(
85 e:?,
86 project_id,
87 customer_id;
88 "Sampler failed to configure a project",
89 );
90 None
91 }
92 }
93 })
94 .collect::<Vec<_>>()
95 .await;
96
97 let (shutdown_watcher_client, shutdown_watcher_request_stream) =
98 fidl::endpoints::create_request_stream::<ShutdownWatcherMarker>();
99 let shutdown_watcher_register = connect_to_protocol::<ShutdownWatcherRegisterMarker>()?;
100 shutdown_watcher_register.register_watcher(shutdown_watcher_client).await?;
101
102 let sink_stream = sample_sink_server.into_stream();
103 let sample_sink_control = sink_stream.control_handle();
104 let mut sink_stream = sink_stream.fuse();
105 let mut shutdown_stream = Either::Left(shutdown_watcher_request_stream);
106 let mut shutdown = false;
107
108 component::health().set_ok();
109
110 loop {
111 match select(shutdown_stream.next(), sink_stream.next()).await {
112 Either::Left((shutdown_request, _)) => match shutdown_request {
113 Some(Ok(ShutdownWatcherRequest::OnShutdown { responder, .. })) => {
114 shutdown = true;
115 sample_sink_control.send_on_now_or_never()?;
116 responder.send()?;
117 }
118 Some(Ok(ShutdownWatcherRequest::_UnknownMethod { .. })) => {
119 warn!("Sampler encountered unknown method on ShutdownWatcher");
120 }
121 Some(Err(err)) => {
122 warn!(err:?; "Sampler encountered error on ShutdownWatcher, data may be missing");
123 }
124 None => {
125 shutdown_stream = Either::Right(stream::pending());
126 continue;
127 }
128 },
129 Either::Right((event, _)) => {
130 let Some(Ok(event)) = event else {
131 break;
132 };
133
134 handle_sample_sink_request(event, shutdown, &mut projects).await;
135
136 if shutdown {
137 break;
138 }
139 }
140 }
141 }
142
143 Ok(())
144}
145
146async fn handle_sample_sink_request(
147 event: fdiagnostics::SampleSinkRequest,
148 shutdown: bool,
149 projects: &mut [Project<'_>],
150) {
151 match event {
152 fdiagnostics::SampleSinkRequest::OnSampleReadied {
153 event:
154 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
155 batch_iter: Some(batch_iter),
156 seconds_since_start: Some(seconds_since_start),
157 ..
158 }),
159 control_handle: _control_handle,
160 } => {
161 let data = drain_batch_iterator::<diagnostics_data::InspectData>(Arc::new(
162 batch_iter.into_proxy(),
163 ))
164 .filter_map(|v| async {
165 match v {
166 Ok(v) => Some(v),
167 Err(e) => {
168 warn!(e:?; "Failed to read some Inspect data; skipping");
169 None
170 }
171 }
172 })
173 .collect::<Vec<_>>()
174 .await;
175
176 let seconds_since_start = if shutdown {
177 None
178 } else {
179 Some(zx::MonotonicDuration::from_seconds(seconds_since_start))
180 };
181
182 for project in projects {
183 if let Err(e) = project.log(&data, seconds_since_start).await {
184 warn!(e:?; "Project failed to log");
185 }
186
187 }
191 }
192 fdiagnostics::SampleSinkRequest::OnSampleReadied {
193 event:
194 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
195 batch_iter,
196 seconds_since_start,
197 ..
198 }),
199 control_handle,
200 } => {
201 control_handle.shutdown();
202 warn!(
203 batch_iter:?, seconds_since_start:?;
204 "Sample server sent Ready but crucial fields were None"
205 );
206 }
207 fdiagnostics::SampleSinkRequest::OnSampleReadied {
208 event: fdiagnostics::SampleSinkResult::Error(e),
209 ..
210 } => {
211 warn!(e:?; "Sample server sent an error, data may be missing");
212 }
213 fdiagnostics::SampleSinkRequest::OnSampleReadied {
214 event: fdiagnostics::SampleSinkResult::__SourceBreaking { .. },
215 control_handle,
216 }
217 | fdiagnostics::SampleSinkRequest::_UnknownMethod { control_handle, .. } => {
218 control_handle.shutdown();
219 warn!("Sample server sent a source-breaking or unknown event")
220 }
221 }
222}