1use crate::file_handler::{self, PersistData, PersistPayload, PersistSchema, Timestamps};
6use crate::scheduler::TagState;
7use anyhow::{Context, Error, bail};
8use diagnostics_data::{Data, DiagnosticsHierarchy, ExtendedMoniker, Inspect};
9use diagnostics_reader::{ArchiveReader, RetryConfig};
10use fidl_fuchsia_diagnostics as fdiagnostics;
11use log::*;
12use persistence_config::{ServiceName, Tag};
13use serde::ser::SerializeMap;
14use serde::{Serialize, Serializer};
15use serde_json::{Map, Value};
16use std::collections::HashMap;
17use std::collections::hash_map::Entry;
18
19const INSPECT_SERVICE_PATH: &str = "/svc/fuchsia.diagnostics.ArchiveAccessor.feedback";
21
22fn extract_json_map(hierarchy: DiagnosticsHierarchy) -> Option<Map<String, Value>> {
23 let Ok(Value::Object(mut map)) = serde_json::to_value(hierarchy) else {
24 return None;
25 };
26 if map.len() != 1 || !map.contains_key("root") {
27 return Some(map);
28 }
29 if let Value::Object(map) = map.remove("root").unwrap() {
30 return Some(map);
31 }
32 None
33}
34
35#[derive(Debug, Eq, PartialEq)]
36struct DataMap(HashMap<ExtendedMoniker, Value>);
37
38impl Serialize for DataMap {
39 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40 where
41 S: Serializer,
42 {
43 let mut map = serializer.serialize_map(Some(self.0.len()))?;
44 for (moniker, value) in &self.0 {
45 map.serialize_entry(&moniker.to_string(), &value)?;
46 }
47 map.end()
48 }
49}
50
51impl std::ops::Deref for DataMap {
52 type Target = HashMap<ExtendedMoniker, Value>;
53
54 fn deref(&self) -> &Self::Target {
55 &self.0
56 }
57}
58
59fn condensed_map_of_data(items: impl IntoIterator<Item = Data<Inspect>>) -> DataMap {
60 DataMap(items.into_iter().fold(HashMap::new(), |mut entries, item| {
61 let Data { payload, moniker, .. } = item;
62 if let Some(new_map) = payload.and_then(extract_json_map) {
63 match entries.entry(moniker) {
64 Entry::Occupied(mut o) => {
65 let existing_payload = o.get_mut();
66 if let Value::Object(existing_payload_map) = existing_payload {
67 existing_payload_map.extend(new_map);
68 }
69 }
70 Entry::Vacant(v) => {
71 v.insert(Value::Object(new_map));
72 }
73 }
74 }
75 entries
76 }))
77}
78
79fn save_data_for_tag(
80 inspect_data: Vec<Data<Inspect>>,
81 timestamps: Timestamps,
82 tag: &Tag,
83 selectors: &[fdiagnostics::Selector],
84 max_bytes: usize,
85) -> PersistSchema {
86 let mut filtered_datas = vec![];
87 for data in inspect_data {
88 match data.filter(selectors) {
89 Ok(Some(data)) => filtered_datas.push(data),
90 Ok(None) => {}
91 Err(e) => return PersistSchema::error(timestamps, format!("Filter error: {e}")),
92 }
93 }
94
95 if filtered_datas.is_empty() {
96 return PersistSchema::error(timestamps, format!("No data available for tag '{tag}'"));
97 }
98 let entries = condensed_map_of_data(filtered_datas);
100 let data_length = match serde_json::to_string(&entries) {
101 Ok(string) => string.len(),
102 Err(e) => {
103 return PersistSchema::error(timestamps, format!("Unexpected serialize error: {e}"));
104 }
105 };
106 if data_length > max_bytes {
107 let error_description = format!("Data too big: {data_length} > max length {max_bytes}");
108 return PersistSchema::error(timestamps, error_description);
109 }
110 PersistSchema {
111 timestamps,
112 payload: PersistPayload::Data(PersistData { data_length, entries: entries.0 }),
113 }
114}
115
116fn utc_now() -> i64 {
117 let now_utc = chrono::prelude::Utc::now(); now_utc.timestamp() * 1_000_000_000 + now_utc.timestamp_subsec_nanos() as i64
119}
120
121pub(crate) async fn fetch_and_save<'a, P>(
122 services_state: &'a HashMap<ServiceName, HashMap<Tag, TagState>>,
123 pending: P,
124) -> Result<(), Error>
125where
126 P: IntoIterator<Item = (&'a ServiceName, &'a Vec<Tag>)>,
127 P::IntoIter: Clone,
128{
129 let pending_services = pending.into_iter().filter_map(|(service, tags)| {
130 services_state
131 .get(service)
132 .or_else(|| {
133 warn!("Skipping fetch request for unknown service \"{service}\"");
134 None
135 })
136 .map(move |service_state| {
137 let tag_states = tags.iter().filter_map(move |tag| {
138 service_state.get(tag).or_else(|| {
139 warn!("Skipping fetch request for unknown tag \"{tag}\" (service \"{service}\")");
140 None
141 }).map(|state| (tag, state))
142 });
143 (service, tag_states)
144 })
145 });
146
147 let selectors: Vec<fdiagnostics::Selector> = pending_services
148 .clone()
149 .flat_map(|(_, tags)| tags)
150 .flat_map(|(_, tag_state)| tag_state.selectors.clone())
151 .collect();
153
154 if selectors.is_empty() {
155 bail!("Nothing to fetch! This shouldn't ever happen; please file a bug");
156 }
157
158 let proxy = fuchsia_component::client::connect_to_protocol_at_path::<
159 fdiagnostics::ArchiveAccessorMarker,
160 >(INSPECT_SERVICE_PATH)?;
161 let mut source = ArchiveReader::inspect();
162 source
163 .with_archive(proxy.clone())
164 .retry(RetryConfig::never())
165 .add_selectors(selectors.into_iter());
166
167 let before_utc = utc_now();
169 let before_monotonic = zx::MonotonicInstant::get().into_nanos();
170 let data = source.snapshot().await.context("Failed to fetch Inspect data")?;
171 let after_utc = utc_now();
172 let after_monotonic = zx::MonotonicInstant::get().into_nanos();
173 let timestamps = Timestamps { before_utc, before_monotonic, after_utc, after_monotonic };
174
175 for (service, pending_tags) in pending_services {
177 for (tag, state) in pending_tags {
178 let data_to_save = save_data_for_tag(
179 data.clone(),
180 timestamps.clone(),
181 tag,
182 &state.selectors,
183 state.max_bytes,
184 );
185 file_handler::write(service, tag, &data_to_save);
186 }
187 }
188
189 Ok(())
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use diagnostics_data::{InspectDataBuilder, InspectHandleName, Timestamp};
196 use diagnostics_hierarchy::hierarchy;
197 use serde_json::json;
198
199 #[fuchsia::test]
200 fn test_condense_empty() {
201 let empty_data = InspectDataBuilder::new(
202 "a/b/c/d".try_into().unwrap(),
203 "fuchsia-pkg://test",
204 Timestamp::from_nanos(123456i64),
205 )
206 .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
207 .build();
208 let empty_data_result = condensed_map_of_data([empty_data]);
209 let empty_vec_result = condensed_map_of_data([]);
210
211 let expected_map = HashMap::new();
212
213 pretty_assertions::assert_eq!(*empty_data_result, expected_map, "golden diff failed.");
214 pretty_assertions::assert_eq!(*empty_vec_result, expected_map, "golden diff failed.");
215 }
216
217 fn make_data(mut hierarchy: DiagnosticsHierarchy, moniker: &str) -> Data<Inspect> {
218 hierarchy.sort();
219 InspectDataBuilder::new(
220 moniker.try_into().unwrap(),
221 "fuchsia-pkg://test",
222 Timestamp::from_nanos(123456i64),
223 )
224 .with_hierarchy(hierarchy)
225 .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
226 .build()
227 }
228
229 #[fuchsia::test]
230 fn test_condense_one() {
231 let data = make_data(
232 hierarchy! {
233 root: {
234 "x": "foo",
235 "y": "bar",
236 }
237 },
238 "a/b/c/d",
239 );
240
241 let expected_json = json!({
242 "a/b/c/d": {
243 "x": "foo",
244 "y": "bar",
245 }
246 });
247
248 let result = condensed_map_of_data([data]);
249
250 pretty_assertions::assert_eq!(
251 serde_json::to_value(&result).unwrap(),
252 expected_json,
253 "golden diff failed."
254 );
255 }
256
257 #[fuchsia::test]
258 fn test_condense_several_with_merge() {
259 let data_abcd = make_data(
260 hierarchy! {
261 root: {
262 "x": "foo",
263 "y": "bar",
264 }
265 },
266 "a/b/c/d",
267 );
268 let data_efgh = make_data(
269 hierarchy! {
270 root: {
271 "x": "ex",
272 "y": "why",
273 }
274 },
275 "e/f/g/h",
276 );
277 let data_abcd2 = make_data(
278 hierarchy! {
279 root: {
280 "x": "X",
281 "z": "zebra",
282 }
283 },
284 "a/b/c/d",
285 );
286
287 let expected_json = json!({
288 "a/b/c/d": {
289 "x": "X",
290 "y": "bar",
291 "z": "zebra",
292 },
293 "e/f/g/h": {
294 "x": "ex",
295 "y": "why"
296 }
297 });
298
299 let result = condensed_map_of_data(vec![data_abcd, data_efgh, data_abcd2]);
300
301 pretty_assertions::assert_eq!(
302 serde_json::to_value(&result).unwrap(),
303 expected_json,
304 "golden diff failed."
305 );
306 }
307
308 const TIMESTAMPS: Timestamps =
309 Timestamps { after_monotonic: 200, after_utc: 111, before_monotonic: 100, before_utc: 110 };
310
311 fn parse_selectors(selectors: &'static [&'static str]) -> Vec<fdiagnostics::Selector> {
312 selectors
313 .iter()
314 .map(|s| selectors::parse_selector::<selectors::VerboseError>(s))
315 .collect::<Result<Vec<_>, _>>()
316 .unwrap()
317 }
318
319 #[fuchsia::test]
320 fn save_data_no_data() {
321 let tag = Tag::new("tag".to_string()).unwrap();
322 let result = save_data_for_tag(
323 vec![],
324 TIMESTAMPS.clone(),
325 &tag,
326 &parse_selectors(&["moniker:path:property"]),
327 1000,
328 );
329
330 assert_eq!(
331 serde_json::to_value(&result).unwrap(),
332 json!({
333 "@timestamps": {
334 "after_monotonic": 200,
335 "after_utc": 111,
336 "before_monotonic": 100,
337 "before_utc": 110,
338 },
339 ":error": {
340 "description": "No data available for tag 'tag'",
341 },
342 })
343 );
344 }
345
346 #[fuchsia::test]
347 fn save_data_too_big() {
348 let tag = Tag::new("tag".to_string()).unwrap();
349 let data_abcd = make_data(
350 hierarchy! {
351 root: {
352 "x": "foo",
353 "y": "bar",
354 }
355 },
356 "a/b/c/d",
357 );
358
359 let result = save_data_for_tag(
360 vec![data_abcd],
361 TIMESTAMPS.clone(),
362 &tag,
363 &parse_selectors(&["a/b/c/d:root:y"]),
364 20,
365 );
366
367 assert_eq!(
368 serde_json::to_value(&result).unwrap(),
369 json!({
370 "@timestamps": {
371 "after_monotonic": 200,
372 "after_utc": 111,
373 "before_monotonic": 100,
374 "before_utc": 110,
375 },
376 ":error": {
377 "description": "Data too big: 23 > max length 20",
378 },
379 })
380 );
381 }
382
383 #[fuchsia::test]
384 fn save_string_with_data() {
385 let tag = Tag::new("tag".to_string()).unwrap();
386 let data_abcd = make_data(
387 hierarchy! {
388 root: {
389 "x": "foo",
390 "y": "bar",
391 }
392 },
393 "a/b/c/d",
394 );
395 let data_efgh = make_data(
396 hierarchy! {
397 root: {
398 "x": "ex",
399 "y": "why",
400 }
401 },
402 "e/f/g/h",
403 );
404 let data_abcd2 = make_data(
405 hierarchy! {
406 root: {
407 "x": "X",
408 "z": "zebra",
409 }
410 },
411 "a/b/c/d",
412 );
413
414 let result = save_data_for_tag(
415 vec![data_abcd, data_efgh, data_abcd2],
416 TIMESTAMPS.clone(),
417 &tag,
418 &parse_selectors(&["a/b/c/d:root:y"]),
419 1000,
420 );
421
422 assert_eq!(
423 serde_json::to_value(&result).unwrap(),
424 json!({
425 "@timestamps": {
426 "after_monotonic": 200,
427 "after_utc": 111,
428 "before_monotonic": 100,
429 "before_utc": 110,
430 },
431 "@persist_size": 23,
432 "a/b/c/d": {
433 "y": "bar",
434 },
435 })
436 );
437 }
438}