1use crate::file_handler::{self, PersistData, PersistPayload, PersistSchema, Timestamps};
6use diagnostics_data::{Data, DiagnosticsHierarchy, ExtendedMoniker, Inspect};
7use diagnostics_reader::{ArchiveReader, RetryConfig};
8use fidl_fuchsia_diagnostics::{ArchiveAccessorProxy, Selector};
9use fuchsia_async::{self as fasync, Task};
10
11use futures::channel::mpsc::{self, UnboundedSender};
12use futures::StreamExt;
13use log::*;
14use persistence_config::{Config, ServiceName, Tag, TagConfig};
15use serde::ser::SerializeMap;
16use serde::{Serialize, Serializer};
17use serde_json::{Map, Value};
18use std::collections::hash_map::Entry;
19use std::collections::HashMap;
20
21const INSPECT_SERVICE_PATH: &str = "/svc/fuchsia.diagnostics.ArchiveAccessor.feedback";
23
24#[derive(Clone)]
25pub(crate) struct Fetcher(UnboundedSender<FetchCommand>);
26
27pub(crate) struct FetchCommand {
28 pub(crate) service: ServiceName,
29 pub(crate) tags: Vec<Tag>,
30}
31
32fn extract_json_map(hierarchy: DiagnosticsHierarchy) -> Option<Map<String, Value>> {
33 let Ok(Value::Object(mut map)) = serde_json::to_value(hierarchy) else {
34 return None;
35 };
36 if map.len() != 1 || !map.contains_key("root") {
37 return Some(map);
38 }
39 if let Value::Object(map) = map.remove("root").unwrap() {
40 return Some(map);
41 }
42 None
43}
44
45#[derive(Debug, Eq, PartialEq)]
46struct DataMap(HashMap<ExtendedMoniker, Value>);
47
48impl Serialize for DataMap {
49 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50 where
51 S: Serializer,
52 {
53 let mut map = serializer.serialize_map(Some(self.0.len()))?;
54 for (moniker, value) in &self.0 {
55 map.serialize_entry(&moniker.to_string(), &value)?;
56 }
57 map.end()
58 }
59}
60
61impl std::ops::Deref for DataMap {
62 type Target = HashMap<ExtendedMoniker, Value>;
63
64 fn deref(&self) -> &Self::Target {
65 &self.0
66 }
67}
68
69fn condensed_map_of_data(items: impl IntoIterator<Item = Data<Inspect>>) -> DataMap {
70 DataMap(items.into_iter().fold(HashMap::new(), |mut entries, item| {
71 let Data { payload, moniker, .. } = item;
72 if let Some(new_map) = payload.and_then(extract_json_map) {
73 match entries.entry(moniker) {
74 Entry::Occupied(mut o) => {
75 let existing_payload = o.get_mut();
76 if let Value::Object(existing_payload_map) = existing_payload {
77 existing_payload_map.extend(new_map);
78 }
79 }
80 Entry::Vacant(v) => {
81 v.insert(Value::Object(new_map));
82 }
83 }
84 }
85 entries
86 }))
87}
88
89fn save_data_for_tag(
90 inspect_data: Vec<Data<Inspect>>,
91 timestamps: Timestamps,
92 tag: &Tag,
93 tag_info: &TagInfo,
94) -> PersistSchema {
95 let mut filtered_datas = vec![];
96 for data in inspect_data {
97 match data.filter(&tag_info.selectors) {
98 Ok(Some(data)) => filtered_datas.push(data),
99 Ok(None) => {}
100 Err(e) => return PersistSchema::error(timestamps, format!("Filter error: {e}")),
101 }
102 }
103
104 if filtered_datas.is_empty() {
105 return PersistSchema::error(timestamps, format!("No data available for tag '{tag}'"));
106 }
107 let entries = condensed_map_of_data(filtered_datas);
109 let data_length = match serde_json::to_string(&entries) {
110 Ok(string) => string.len(),
111 Err(e) => {
112 return PersistSchema::error(timestamps, format!("Unexpected serialize error: {e}"))
113 }
114 };
115 if data_length > tag_info.max_save_length {
116 let error_description =
117 format!("Data too big: {data_length} > max length {0}", tag_info.max_save_length);
118 return PersistSchema::error(timestamps, error_description);
119 }
120 PersistSchema {
121 timestamps,
122 payload: PersistPayload::Data(PersistData { data_length, entries: entries.0 }),
123 }
124}
125
126fn utc_now() -> i64 {
127 let now_utc = chrono::prelude::Utc::now(); now_utc.timestamp() * 1_000_000_000 + now_utc.timestamp_subsec_nanos() as i64
129}
130
131#[derive(Debug)]
132struct TagInfo {
133 max_save_length: usize,
134 selectors: Vec<Selector>,
135}
136
137type TagsInfo = HashMap<Tag, TagInfo>;
138
139type ServicesInfo = HashMap<ServiceName, TagsInfo>;
140
141fn record_service_error(service_name: &ServiceName, tags: &[Tag], error: String) {
143 let utc = utc_now();
144 let monotonic = zx::MonotonicInstant::get().into_nanos();
145 let timestamps = Timestamps {
146 before_monotonic: monotonic,
147 after_monotonic: monotonic,
148 before_utc: utc,
149 after_utc: utc,
150 };
151 record_timestamped_error(service_name, tags, timestamps, error);
152}
153
154fn record_timestamped_error(
156 service_name: &ServiceName,
157 tags: &[Tag],
158 timestamps: Timestamps,
159 error: String,
160) {
161 warn!("{error}");
162 let error = PersistSchema::error(timestamps, error);
163 for tag in tags {
164 file_handler::write(service_name, tag, &error);
165 }
166}
167
168const INSPECT_PREFIX: &str = "INSPECT:";
170
171fn strip_inspect_prefix<'a>(selectors: &'a [String]) -> impl Iterator<Item = &str> {
172 let get_inspect = |s: &'a String| -> Option<&str> {
173 if &s[..INSPECT_PREFIX.len()] == INSPECT_PREFIX {
174 Some(&s[INSPECT_PREFIX.len()..])
175 } else {
176 warn!("All selectors should begin with 'INSPECT:' - '{}'", s);
177 None
178 }
179 };
180 selectors.iter().filter_map(get_inspect)
181}
182
183async fn fetch_and_save(
184 proxy: &ArchiveAccessorProxy,
185 services: &ServicesInfo,
186 service_name: ServiceName,
187 tags: Vec<Tag>,
188) {
189 let service_info = match services.get(&service_name) {
190 Some(info) => info,
191 None => {
192 warn!("Bad service {service_name} received in fetch");
193 return;
194 }
195 };
196
197 let selectors = {
202 let mut selectors = vec![];
203 for tag in &tags {
204 if let Some(tag_info) = service_info.get(tag) {
205 for selector in tag_info.selectors.iter() {
206 selectors.push(selector.clone());
207 }
208 }
209 }
210 if selectors.is_empty() {
211 record_service_error(
212 &service_name,
213 &tags,
214 format!("Empty selectors from service {} and tags {:?}", service_name, tags),
215 );
216 return;
217 }
218 selectors
219 };
220
221 let mut source = ArchiveReader::inspect();
222 source
223 .with_archive(proxy.clone())
224 .retry(RetryConfig::never())
225 .add_selectors(selectors.into_iter());
226
227 let before_utc = utc_now();
229 let before_monotonic = zx::MonotonicInstant::get().into_nanos();
230 let data = source.snapshot().await;
231 let after_utc = utc_now();
232 let after_monotonic = zx::MonotonicInstant::get().into_nanos();
233 let timestamps = Timestamps { before_utc, before_monotonic, after_utc, after_monotonic };
234 let data = match data {
235 Err(e) => {
236 record_timestamped_error(
237 &service_name,
238 &tags,
239 timestamps,
240 format!("Failed to fetch Inspect data: {:?}", e),
241 );
242 return;
243 }
244 Ok(data) => data,
245 };
246
247 for tag in tags {
249 let Some(tag_info) = service_info.get(&tag) else {
250 warn!("Tag '{tag}' was not found in config; skipping it.");
251 continue;
252 };
253 let data_to_save = save_data_for_tag(data.clone(), timestamps.clone(), &tag, tag_info);
254 file_handler::write(&service_name, &tag, &data_to_save);
255 }
256}
257
258impl Fetcher {
259 pub(crate) fn new(config: &Config) -> Result<(Fetcher, Task<()>), anyhow::Error> {
263 let mut services = HashMap::new();
264 let proxy = fuchsia_component::client::connect_to_protocol_at_path::<
265 fidl_fuchsia_diagnostics::ArchiveAccessorMarker,
266 >(INSPECT_SERVICE_PATH)?;
267 for (service_name, tags_info) in config.iter() {
268 let mut tags = HashMap::new();
269 for (tag_name, TagConfig { selectors, max_bytes, .. }) in tags_info.iter() {
270 let selectors = strip_inspect_prefix(selectors)
271 .map(selectors::parse_verbose)
272 .collect::<Result<Vec<_>, _>>()?;
273 let info = TagInfo { selectors, max_save_length: *max_bytes };
274 tags.insert(tag_name.clone(), info);
275 }
276 services.insert(service_name.clone(), tags);
277 }
278 let (invoke_fetch, mut receiver) = mpsc::unbounded::<FetchCommand>();
279 let task = fasync::Task::spawn(async move {
280 while let Some(FetchCommand { service, tags }) = receiver.next().await {
281 fetch_and_save(&proxy, &services, service, tags).await;
282 }
283 });
284 Ok((Fetcher(invoke_fetch), task))
285 }
286
287 pub(crate) fn send(&mut self, command: FetchCommand) {
288 let _ = self.0.unbounded_send(command);
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use diagnostics_data::{InspectDataBuilder, InspectHandleName, Timestamp};
296 use diagnostics_hierarchy::hierarchy;
297 use serde_json::json;
298
299 #[fuchsia::test]
300 fn test_selector_stripping() {
301 assert_eq!(
302 strip_inspect_prefix(&[
303 "INSPECT:foo".to_string(),
304 "oops:bar".to_string(),
305 "INSPECT:baz".to_string()
306 ])
307 .collect::<Vec<_>>(),
308 vec!["foo".to_string(), "baz".to_string()]
309 )
310 }
311
312 #[fuchsia::test]
313 fn test_condense_empty() {
314 let empty_data = InspectDataBuilder::new(
315 "a/b/c/d".try_into().unwrap(),
316 "fuchsia-pkg://test",
317 Timestamp::from_nanos(123456i64),
318 )
319 .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
320 .build();
321 let empty_data_result = condensed_map_of_data([empty_data]);
322 let empty_vec_result = condensed_map_of_data([]);
323
324 let expected_map = HashMap::new();
325
326 pretty_assertions::assert_eq!(*empty_data_result, expected_map, "golden diff failed.");
327 pretty_assertions::assert_eq!(*empty_vec_result, expected_map, "golden diff failed.");
328 }
329
330 fn make_data(mut hierarchy: DiagnosticsHierarchy, moniker: &str) -> Data<Inspect> {
331 hierarchy.sort();
332 InspectDataBuilder::new(
333 moniker.try_into().unwrap(),
334 "fuchsia-pkg://test",
335 Timestamp::from_nanos(123456i64),
336 )
337 .with_hierarchy(hierarchy)
338 .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
339 .build()
340 }
341
342 #[fuchsia::test]
343 fn test_condense_one() {
344 let data = make_data(
345 hierarchy! {
346 root: {
347 "x": "foo",
348 "y": "bar",
349 }
350 },
351 "a/b/c/d",
352 );
353
354 let expected_json = json!({
355 "a/b/c/d": {
356 "x": "foo",
357 "y": "bar",
358 }
359 });
360
361 let result = condensed_map_of_data([data]);
362
363 pretty_assertions::assert_eq!(
364 serde_json::to_value(&result).unwrap(),
365 expected_json,
366 "golden diff failed."
367 );
368 }
369
370 #[fuchsia::test]
371 fn test_condense_several_with_merge() {
372 let data_abcd = make_data(
373 hierarchy! {
374 root: {
375 "x": "foo",
376 "y": "bar",
377 }
378 },
379 "a/b/c/d",
380 );
381 let data_efgh = make_data(
382 hierarchy! {
383 root: {
384 "x": "ex",
385 "y": "why",
386 }
387 },
388 "e/f/g/h",
389 );
390 let data_abcd2 = make_data(
391 hierarchy! {
392 root: {
393 "x": "X",
394 "z": "zebra",
395 }
396 },
397 "a/b/c/d",
398 );
399
400 let expected_json = json!({
401 "a/b/c/d": {
402 "x": "X",
403 "y": "bar",
404 "z": "zebra",
405 },
406 "e/f/g/h": {
407 "x": "ex",
408 "y": "why"
409 }
410 });
411
412 let result = condensed_map_of_data(vec![data_abcd, data_efgh, data_abcd2]);
413
414 pretty_assertions::assert_eq!(
415 serde_json::to_value(&result).unwrap(),
416 expected_json,
417 "golden diff failed."
418 );
419 }
420
421 const TIMESTAMPS: Timestamps =
422 Timestamps { after_monotonic: 200, after_utc: 111, before_monotonic: 100, before_utc: 110 };
423
424 fn tag_info(max_save_length: usize, selectors: Vec<&str>) -> TagInfo {
425 let selectors = selectors
426 .iter()
427 .map(|s| selectors::parse_selector::<selectors::VerboseError>(s))
428 .collect::<Result<Vec<_>, _>>()
429 .unwrap();
430 TagInfo { selectors, max_save_length }
431 }
432
433 #[fuchsia::test]
434 fn save_data_no_data() {
435 let tag = Tag::new("tag".to_string()).unwrap();
436 let result = save_data_for_tag(
437 vec![],
438 TIMESTAMPS.clone(),
439 &tag,
440 &tag_info(1000, vec!["moniker:path:property"]),
441 );
442
443 assert_eq!(
444 serde_json::to_value(&result).unwrap(),
445 json!({
446 "@timestamps": {
447 "after_monotonic": 200,
448 "after_utc": 111,
449 "before_monotonic": 100,
450 "before_utc": 110,
451 },
452 ":error": {
453 "description": "No data available for tag 'tag'",
454 },
455 })
456 );
457 }
458
459 #[fuchsia::test]
460 fn save_data_too_big() {
461 let tag = Tag::new("tag".to_string()).unwrap();
462 let data_abcd = make_data(
463 hierarchy! {
464 root: {
465 "x": "foo",
466 "y": "bar",
467 }
468 },
469 "a/b/c/d",
470 );
471
472 let result = save_data_for_tag(
473 vec![data_abcd],
474 TIMESTAMPS.clone(),
475 &tag,
476 &tag_info(20, vec!["a/b/c/d:root:y"]),
477 );
478
479 assert_eq!(
480 serde_json::to_value(&result).unwrap(),
481 json!({
482 "@timestamps": {
483 "after_monotonic": 200,
484 "after_utc": 111,
485 "before_monotonic": 100,
486 "before_utc": 110,
487 },
488 ":error": {
489 "description": "Data too big: 23 > max length 20",
490 },
491 })
492 );
493 }
494
495 #[fuchsia::test]
496 fn save_string_with_data() {
497 let tag = Tag::new("tag".to_string()).unwrap();
498 let data_abcd = make_data(
499 hierarchy! {
500 root: {
501 "x": "foo",
502 "y": "bar",
503 }
504 },
505 "a/b/c/d",
506 );
507 let data_efgh = make_data(
508 hierarchy! {
509 root: {
510 "x": "ex",
511 "y": "why",
512 }
513 },
514 "e/f/g/h",
515 );
516 let data_abcd2 = make_data(
517 hierarchy! {
518 root: {
519 "x": "X",
520 "z": "zebra",
521 }
522 },
523 "a/b/c/d",
524 );
525
526 let result = save_data_for_tag(
527 vec![data_abcd, data_efgh, data_abcd2],
528 TIMESTAMPS.clone(),
529 &tag,
530 &tag_info(1000, vec!["a/b/c/d:root:y"]),
531 );
532
533 assert_eq!(
534 serde_json::to_value(&result).unwrap(),
535 json!({
536 "@timestamps": {
537 "after_monotonic": 200,
538 "after_utc": 111,
539 "before_monotonic": 100,
540 "before_utc": 110,
541 },
542 "@persist_size": 23,
543 "a/b/c/d": {
544 "y": "bar",
545 },
546 })
547 );
548 }
549}