1use crate::BUILD_CONFIG;
6use crate::fetcher::{PersistenceData, ServiceData, TagData};
7use crate::file_handler::{self, Timestamps};
8use anyhow::{Context, anyhow, bail};
9use fidl::endpoints::ControlHandle;
10use futures::{StreamExt, TryStreamExt};
11use hashbrown::HashMap;
12use itertools::Itertools;
13use log::{debug, error, warn};
14
15use fidl_fuchsia_diagnostics as fdiagnostics;
16use fuchsia_async as fasync;
17use persistence_config::{Config, ServiceName, Tag};
18use selectors::SelectorExt;
19use std::collections::VecDeque;
20use std::pin::pin;
21use std::sync::Arc;
22
23#[derive(Clone)]
30pub(crate) struct Scheduler {
31 tag_info: Arc<Vec<QuickTagInfo>>,
34}
35
36#[derive(thiserror::Error, Debug)]
38pub(crate) enum Error {
39 #[error("invalid selector")]
40 InvalidSelector(#[from] selectors::Error),
41 #[error("fidl error: {0:?}")]
42 Fidl(#[from] fidl::Error),
43 #[error("unable to configure Archivist sampling: {0:?}")]
44 UnableToSample(#[from] anyhow::Error),
45}
46
47impl Scheduler {
48 pub(crate) fn new(config: &Config) -> Self {
49 let tag_info = config
50 .clone()
51 .into_iter()
52 .flat_map(|(service, tags)| {
53 tags.into_iter().map(move |(tag, config)| QuickTagInfo {
54 service: service.clone(),
55 tag,
56 max_bytes: config.max_bytes,
57 selectors: config.selectors,
58 })
59 })
60 .collect::<Vec<_>>();
61
62 Self { tag_info: Arc::new(tag_info) }
63 }
64
65 pub(crate) async fn subscribe(
66 &self,
67 scope: fasync::ScopeHandle,
68 config: &Config,
69 ) -> Result<(), Error> {
70 let sample_datums = config
71 .values()
72 .flat_map(|tags| {
73 tags.values().flat_map(|tag| {
74 tag.selectors
75 .clone()
76 .into_iter()
77 .map(|selector| (selector, tag.min_seconds_between_fetch))
78 })
79 })
80 .map(|(selector, min_seconds_between_fetch)| fdiagnostics::SampleDatum {
82 selector: Some(fdiagnostics::SelectorArgument::StructuredSelector(selector)),
83 strategy: Some(fdiagnostics::SampleStrategy::OnDiff),
84 interval_secs: Some(min_seconds_between_fetch),
85 ..Default::default()
86 })
87 .collect::<Vec<_>>();
88
89 if sample_datums.is_empty() {
90 warn!("No tags configured; skipping subscription to fuchsia.diagnostics.Sample");
91 return Ok(());
92 }
93
94 let sample =
95 fuchsia_component::client::connect_to_protocol::<fdiagnostics::SampleMarker>()?;
96
97 for chunk in sample_datums.chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize) {
98 sample.set(&fdiagnostics::SampleParameters {
99 data: Some(chunk.to_vec()),
100 ..Default::default()
101 })?;
102 }
103
104 let (client_end, sample_sink_stream) =
105 fidl::endpoints::create_request_stream::<fdiagnostics::SampleSinkMarker>();
106
107 let scheduler = self.clone();
110 scope.spawn(async move {
111 if let Err(e) = scheduler.handle_sample_sink(sample_sink_stream).await {
112 error!("Error serving SampleSink: {e:?}");
113 }
114 });
115
116 sample
117 .commit(client_end)
118 .await?
119 .map_err(|e| anyhow!("failed to commit fuchsia.diagnostics.Sample config: {e:?}"))?;
120
121 Ok(())
122 }
123
124 pub(crate) async fn handle_sample_sink(
125 &self,
126 stream: fdiagnostics::SampleSinkRequestStream,
127 ) -> Result<(), anyhow::Error> {
128 let (stream, stalled) = detect_stall::until_stalled(stream, BUILD_CONFIG.stall_interval);
129 let mut stream = pin!(stream);
130 loop {
131 match stream.try_next().await {
132 Ok(Some(fdiagnostics::SampleSinkRequest::OnSampleReadied {
133 event,
134 control_handle,
135 })) => match event {
136 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
137 batch_iter,
138 seconds_since_start: _,
139 __source_breaking: _,
140 }) => {
141 if let Some(iter) = batch_iter {
142 if let Err(e) = self.handle_sample_ready(iter).await {
143 warn!("Failed to process Sample: {e:?}");
144 }
145 } else {
146 bail!("expected BatchIterator, got None");
147 }
148 }
149 fdiagnostics::SampleSinkResult::Error(err) => {
150 control_handle.shutdown();
151 bail!("Failed receiving samples: {err:?}");
152 }
153 fdiagnostics::SampleSinkResult::__SourceBreaking { unknown_ordinal } => {
154 control_handle.shutdown();
155 bail!("unknown ordinal {unknown_ordinal}");
156 }
157 },
158 Ok(Some(req)) => {
159 warn!("Unknown SampleSinkRequest {req:?}");
160 }
161 Ok(None) => break,
162 Err(e) => bail!("Unexpected error handling SampleSinkRequest: {e}"),
163 }
164 }
165 if let Some(server_end) = stalled.await.context("FIDL error")? {
166 debug!("Escrowing fuchsia.diagnostics.SampleSink");
168 fuchsia_component::client::connect_channel_to_protocol_at_path(
169 server_end,
170 "/escrow/fuchsia.diagnostics.SampleSink",
171 )
172 .context("Failed to connect to fuchsia.diagnostics.SampleSink")?;
173 }
174 Ok(())
175 }
176
177 async fn handle_sample_ready(
178 &self,
179 iter: fidl::endpoints::ClientEnd<fdiagnostics::BatchIteratorMarker>,
180 ) -> Result<(), anyhow::Error> {
181 let timestamps = file_handler::Timestamps {
182 last_sample_boot: zx::BootInstant::get(),
183 last_sample_utc: fuchsia_runtime::utc_time(),
184 };
185
186 let proxy = Arc::new(iter.into_proxy());
187 let (snapshot, errs): (Vec<_>, Vec<_>) =
188 diagnostics_reader::drain_batch_iterator::<diagnostics_data::InspectData>(proxy)
189 .collect::<Vec<_>>()
190 .await
191 .into_iter()
192 .partition_result();
193 if !errs.is_empty() {
194 if snapshot.is_empty() {
195 bail!("failed reading all Inspect data: {errs:?}")
196 } else {
197 warn!("failed reading some Inspect data: {errs:?}")
198 }
199 }
200
201 let mut current_data = match file_handler::current_data().await {
202 Ok(Some(data)) => data,
203 Ok(None) => PersistenceData::default(),
204 Err(e) => {
205 error!("Current data corrupted, starting fresh: {e:?}");
206 PersistenceData::default()
207 }
208 };
209
210 for tag_info in self.tag_info.iter() {
211 for data in &snapshot {
212 if data.moniker.match_against_selectors(&tag_info.selectors).next().is_none() {
213 continue;
214 }
215 match data.clone().filter(&tag_info.selectors) {
216 Ok(Some(data)) => {
217 modify_tag_data(&mut current_data, tag_info, ×tamps, |tag_data| {
218 tag_data.merge(timestamps.clone(), data)
219 })
220 }
221 Ok(None) => {}
222 Err(e) => {
223 modify_tag_data(&mut current_data, tag_info, ×tamps, |tag_data| {
224 tag_data.add_error(e.to_string())
225 })
226 }
227 }
228 }
229 }
230
231 file_handler::write_current_data(¤t_data).await
232 }
233}
234
235fn modify_tag_data<'a>(
236 data: &'a mut PersistenceData,
237 lookup: &'a QuickTagInfo,
238 timestamps: &Timestamps,
239 modify_fn: impl FnOnce(&mut TagData),
240) {
241 let QuickTagInfo { service, tag, max_bytes, selectors } = lookup;
242
243 let service_data = data.entry_ref(service).or_insert_with(ServiceData::default);
244 let tag_data = service_data.entry_ref(tag).or_insert_with(|| TagData {
245 max_bytes: *max_bytes,
246 total_bytes: 0,
247 timestamps: timestamps.clone(),
248 selectors: selectors.clone(),
249 data: HashMap::new(),
250 errors: VecDeque::new(),
251 });
252
253 modify_fn(tag_data)
254}
255
256struct QuickTagInfo {
258 service: ServiceName,
259 tag: Tag,
260 max_bytes: usize,
261 selectors: Vec<fidl_fuchsia_diagnostics::Selector>,
262}