sampler/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::config::SamplerConfig;
6use crate::project::Project;
7use anyhow::Error as AnyhowError;
8use argh::FromArgs;
9use diagnostics_reader::drain_batch_iterator;
10use fidl::endpoints::{ControlHandle, RequestStream, create_endpoints};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_hardware_power_statecontrol::{
13    ShutdownWatcherMarker, ShutdownWatcherRegisterMarker, ShutdownWatcherRequest,
14};
15use fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker;
16use fuchsia_component_client::connect_to_protocol;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use futures::future::{Either, select};
20use futures::stream::{self, StreamExt};
21use inspect_runtime::publish;
22use itertools::Itertools;
23use log::{info, warn};
24use sampler_component_config::Config;
25use std::sync::Arc;
26
27mod config;
28mod error;
29mod project;
30
31/// Arguments used to configure sampler.
32#[derive(Debug, Default, FromArgs, PartialEq)]
33#[argh(subcommand, name = "sampler")]
34pub struct Args {}
35
36pub const PROGRAM_NAME: &str = "sampler";
37
38pub async fn main() -> Result<(), AnyhowError> {
39    info!("Sampler starting up");
40    component::health().set_starting_up();
41
42    let _inspect = publish(component::inspector(), Default::default());
43
44    let execution_stats = component::inspector().root().create_child("sampler_executor_stats");
45    let config = SamplerConfig::new(Config::take_from_startup_handle(), &execution_stats)?;
46
47    let sampler = connect_to_protocol::<fdiagnostics::SampleMarker>()?;
48
49    for chunk in &config
50        .sample_data()
51        .into_iter()
52        .chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize)
53    {
54        sampler.set(&fdiagnostics::SampleParameters {
55            data: Some(chunk.collect()),
56            ..Default::default()
57        })?;
58    }
59
60    let (sample_sink_client, sample_sink_server) =
61        create_endpoints::<fdiagnostics::SampleSinkMarker>();
62
63    if let Err(e) = sampler.commit(sample_sink_client).await? {
64        match e {
65            fdiagnostics::ConfigurationError::SamplePeriodTooSmall => {
66                return Err(anyhow::anyhow!(
67                    "Configured sample period was too small, indicating a config bug. Exiting."
68                ));
69            }
70            err => warn!(err:?; "Sampler encountered non-fatal error. Review Archivist's logs."),
71        }
72    }
73
74    let metric_logger_factory = connect_to_protocol::<MetricEventLoggerFactoryMarker>()?;
75
76    let mut projects = futures::stream::iter(config.project_configs)
77        .filter_map(|project_config| async {
78            let project_id = *project_config.project_id;
79            let customer_id = *project_config.customer_id;
80            let stats = config.stats.projects.get(&project_config.project_id);
81            match Project::new(&metric_logger_factory, project_config, stats).await {
82                Ok(project) => Some(project),
83                Err(e) => {
84                    warn!(
85                        e:?,
86                        project_id,
87                        customer_id;
88                        "Sampler failed to configure a project",
89                    );
90                    None
91                }
92            }
93        })
94        .collect::<Vec<_>>()
95        .await;
96
97    let (shutdown_watcher_client, shutdown_watcher_request_stream) =
98        fidl::endpoints::create_request_stream::<ShutdownWatcherMarker>();
99    let shutdown_watcher_register = connect_to_protocol::<ShutdownWatcherRegisterMarker>()?;
100    shutdown_watcher_register.register_watcher(shutdown_watcher_client).await?;
101
102    let sink_stream = sample_sink_server.into_stream();
103    let sample_sink_control = sink_stream.control_handle();
104    let mut sink_stream = sink_stream.fuse();
105    let mut shutdown_stream = Either::Left(shutdown_watcher_request_stream);
106    let mut shutdown = false;
107
108    component::health().set_ok();
109
110    loop {
111        match select(shutdown_stream.next(), sink_stream.next()).await {
112            Either::Left((shutdown_request, _)) => match shutdown_request {
113                Some(Ok(ShutdownWatcherRequest::OnShutdown { responder, .. })) => {
114                    shutdown = true;
115                    sample_sink_control.send_on_now_or_never()?;
116                    responder.send()?;
117                }
118                Some(Ok(ShutdownWatcherRequest::_UnknownMethod { .. })) => {
119                    warn!("Sampler encountered unknown method on ShutdownWatcher");
120                }
121                Some(Err(err)) => {
122                    warn!(err:?; "Sampler encountered error on ShutdownWatcher, data may be missing");
123                }
124                None => {
125                    shutdown_stream = Either::Right(stream::pending());
126                    continue;
127                }
128            },
129            Either::Right((event, _)) => {
130                let Some(Ok(event)) = event else {
131                    break;
132                };
133
134                handle_sample_sink_request(event, shutdown, &mut projects).await;
135
136                if shutdown {
137                    break;
138                }
139            }
140        }
141    }
142
143    Ok(())
144}
145
146async fn handle_sample_sink_request(
147    event: fdiagnostics::SampleSinkRequest,
148    shutdown: bool,
149    projects: &mut [Project<'_>],
150) {
151    match event {
152        fdiagnostics::SampleSinkRequest::OnSampleReadied {
153            event:
154                fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
155                    batch_iter: Some(batch_iter),
156                    seconds_since_start: Some(seconds_since_start),
157                    ..
158                }),
159            control_handle: _control_handle,
160        } => {
161            let data = drain_batch_iterator::<diagnostics_data::InspectData>(Arc::new(
162                batch_iter.into_proxy(),
163            ))
164            .filter_map(|v| async {
165                match v {
166                    Ok(v) => Some(v),
167                    Err(e) => {
168                        warn!(e:?; "Failed to read some Inspect data; skipping");
169                        None
170                    }
171                }
172            })
173            .collect::<Vec<_>>()
174            .await;
175
176            let seconds_since_start = if shutdown {
177                None
178            } else {
179                Some(zx::MonotonicDuration::from_seconds(seconds_since_start))
180            };
181
182            for project in projects {
183                if let Err(e) = project.log(&data, seconds_since_start).await {
184                    warn!(e:?; "Project failed to log");
185                }
186
187                // TODO: b/440153294 - Update SampleSink to allow removing selectors during
188                // runtime. These selectors are returned by Project::log. That will reduce
189                // the load on Archivist, but is not required for correctness of Sampler.
190            }
191        }
192        fdiagnostics::SampleSinkRequest::OnSampleReadied {
193            event:
194                fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
195                    batch_iter,
196                    seconds_since_start,
197                    ..
198                }),
199            control_handle,
200        } => {
201            control_handle.shutdown();
202            warn!(
203                batch_iter:?, seconds_since_start:?;
204                "Sample server sent Ready but crucial fields were None"
205            );
206        }
207        fdiagnostics::SampleSinkRequest::OnSampleReadied {
208            event: fdiagnostics::SampleSinkResult::Error(e),
209            ..
210        } => {
211            warn!(e:?; "Sample server sent an error, data may be missing");
212        }
213        fdiagnostics::SampleSinkRequest::OnSampleReadied {
214            event: fdiagnostics::SampleSinkResult::__SourceBreaking { .. },
215            control_handle,
216        }
217        | fdiagnostics::SampleSinkRequest::_UnknownMethod { control_handle, .. } => {
218            control_handle.shutdown();
219            warn!("Sample server sent a source-breaking or unknown event")
220        }
221    }
222}