persistence/
fetcher.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::file_handler::{self, PersistData, PersistPayload, PersistSchema, Timestamps};
6use diagnostics_data::{Data, DiagnosticsHierarchy, ExtendedMoniker, Inspect};
7use diagnostics_reader::{ArchiveReader, RetryConfig};
8use fidl_fuchsia_diagnostics::{ArchiveAccessorProxy, Selector};
9use fuchsia_async::{self as fasync, Task};
10
11use futures::channel::mpsc::{self, UnboundedSender};
12use futures::StreamExt;
13use log::*;
14use persistence_config::{Config, ServiceName, Tag, TagConfig};
15use serde::ser::SerializeMap;
16use serde::{Serialize, Serializer};
17use serde_json::{Map, Value};
18use std::collections::hash_map::Entry;
19use std::collections::HashMap;
20
21// The capability name for the Inspect reader
22const INSPECT_SERVICE_PATH: &str = "/svc/fuchsia.diagnostics.ArchiveAccessor.feedback";
23
24#[derive(Clone)]
25pub(crate) struct Fetcher(UnboundedSender<FetchCommand>);
26
27pub(crate) struct FetchCommand {
28    pub(crate) service: ServiceName,
29    pub(crate) tags: Vec<Tag>,
30}
31
32fn extract_json_map(hierarchy: DiagnosticsHierarchy) -> Option<Map<String, Value>> {
33    let Ok(Value::Object(mut map)) = serde_json::to_value(hierarchy) else {
34        return None;
35    };
36    if map.len() != 1 || !map.contains_key("root") {
37        return Some(map);
38    }
39    if let Value::Object(map) = map.remove("root").unwrap() {
40        return Some(map);
41    }
42    None
43}
44
45#[derive(Debug, Eq, PartialEq)]
46struct DataMap(HashMap<ExtendedMoniker, Value>);
47
48impl Serialize for DataMap {
49    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50    where
51        S: Serializer,
52    {
53        let mut map = serializer.serialize_map(Some(self.0.len()))?;
54        for (moniker, value) in &self.0 {
55            map.serialize_entry(&moniker.to_string(), &value)?;
56        }
57        map.end()
58    }
59}
60
61impl std::ops::Deref for DataMap {
62    type Target = HashMap<ExtendedMoniker, Value>;
63
64    fn deref(&self) -> &Self::Target {
65        &self.0
66    }
67}
68
69fn condensed_map_of_data(items: impl IntoIterator<Item = Data<Inspect>>) -> DataMap {
70    DataMap(items.into_iter().fold(HashMap::new(), |mut entries, item| {
71        let Data { payload, moniker, .. } = item;
72        if let Some(new_map) = payload.and_then(extract_json_map) {
73            match entries.entry(moniker) {
74                Entry::Occupied(mut o) => {
75                    let existing_payload = o.get_mut();
76                    if let Value::Object(existing_payload_map) = existing_payload {
77                        existing_payload_map.extend(new_map);
78                    }
79                }
80                Entry::Vacant(v) => {
81                    v.insert(Value::Object(new_map));
82                }
83            }
84        }
85        entries
86    }))
87}
88
89fn save_data_for_tag(
90    inspect_data: Vec<Data<Inspect>>,
91    timestamps: Timestamps,
92    tag: &Tag,
93    tag_info: &TagInfo,
94) -> PersistSchema {
95    let mut filtered_datas = vec![];
96    for data in inspect_data {
97        match data.filter(&tag_info.selectors) {
98            Ok(Some(data)) => filtered_datas.push(data),
99            Ok(None) => {}
100            Err(e) => return PersistSchema::error(timestamps, format!("Filter error: {e}")),
101        }
102    }
103
104    if filtered_datas.is_empty() {
105        return PersistSchema::error(timestamps, format!("No data available for tag '{tag}'"));
106    }
107    // We may have multiple entries with the same moniker. Fold those together into a single entry.
108    let entries = condensed_map_of_data(filtered_datas);
109    let data_length = match serde_json::to_string(&entries) {
110        Ok(string) => string.len(),
111        Err(e) => {
112            return PersistSchema::error(timestamps, format!("Unexpected serialize error: {e}"))
113        }
114    };
115    if data_length > tag_info.max_save_length {
116        let error_description =
117            format!("Data too big: {data_length} > max length {0}", tag_info.max_save_length);
118        return PersistSchema::error(timestamps, error_description);
119    }
120    PersistSchema {
121        timestamps,
122        payload: PersistPayload::Data(PersistData { data_length, entries: entries.0 }),
123    }
124}
125
126fn utc_now() -> i64 {
127    let now_utc = chrono::prelude::Utc::now(); // Consider using SystemTime::now()?
128    now_utc.timestamp() * 1_000_000_000 + now_utc.timestamp_subsec_nanos() as i64
129}
130
131#[derive(Debug)]
132struct TagInfo {
133    max_save_length: usize,
134    selectors: Vec<Selector>,
135}
136
137type TagsInfo = HashMap<Tag, TagInfo>;
138
139type ServicesInfo = HashMap<ServiceName, TagsInfo>;
140
141/// If we've gotten an error before trying to fetch, save it to all tags and log it as an error.
142fn record_service_error(service_name: &ServiceName, tags: &[Tag], error: String) {
143    let utc = utc_now();
144    let monotonic = zx::MonotonicInstant::get().into_nanos();
145    let timestamps = Timestamps {
146        before_monotonic: monotonic,
147        after_monotonic: monotonic,
148        before_utc: utc,
149        after_utc: utc,
150    };
151    record_timestamped_error(service_name, tags, timestamps, error);
152}
153
154/// If we've gotten an error, save it and the fetch timestamp to all tags and log it as a warning.
155fn record_timestamped_error(
156    service_name: &ServiceName,
157    tags: &[Tag],
158    timestamps: Timestamps,
159    error: String,
160) {
161    warn!("{error}");
162    let error = PersistSchema::error(timestamps, error);
163    for tag in tags {
164        file_handler::write(service_name, tag, &error);
165    }
166}
167
168// Selectors for Inspect data must start with this exact string.
169const INSPECT_PREFIX: &str = "INSPECT:";
170
171fn strip_inspect_prefix<'a>(selectors: &'a [String]) -> impl Iterator<Item = &str> {
172    let get_inspect = |s: &'a String| -> Option<&str> {
173        if &s[..INSPECT_PREFIX.len()] == INSPECT_PREFIX {
174            Some(&s[INSPECT_PREFIX.len()..])
175        } else {
176            warn!("All selectors should begin with 'INSPECT:' - '{}'", s);
177            None
178        }
179    };
180    selectors.iter().filter_map(get_inspect)
181}
182
183async fn fetch_and_save(
184    proxy: &ArchiveAccessorProxy,
185    services: &ServicesInfo,
186    service_name: ServiceName,
187    tags: Vec<Tag>,
188) {
189    let service_info = match services.get(&service_name) {
190        Some(info) => info,
191        None => {
192            warn!("Bad service {service_name} received in fetch");
193            return;
194        }
195    };
196
197    // Assemble the selectors
198    // This could be done with filter_map, map, flatten, peekable, except for
199    // https://github.com/rust-lang/rust/issues/102211#issuecomment-1513931928
200    // - some iterators (including peekable) don't play well with async.
201    let selectors = {
202        let mut selectors = vec![];
203        for tag in &tags {
204            if let Some(tag_info) = service_info.get(tag) {
205                for selector in tag_info.selectors.iter() {
206                    selectors.push(selector.clone());
207                }
208            }
209        }
210        if selectors.is_empty() {
211            record_service_error(
212                &service_name,
213                &tags,
214                format!("Empty selectors from service {} and tags {:?}", service_name, tags),
215            );
216            return;
217        }
218        selectors
219    };
220
221    let mut source = ArchiveReader::inspect();
222    source
223        .with_archive(proxy.clone())
224        .retry(RetryConfig::never())
225        .add_selectors(selectors.into_iter());
226
227    // Do the fetch and record the timestamps.
228    let before_utc = utc_now();
229    let before_monotonic = zx::MonotonicInstant::get().into_nanos();
230    let data = source.snapshot().await;
231    let after_utc = utc_now();
232    let after_monotonic = zx::MonotonicInstant::get().into_nanos();
233    let timestamps = Timestamps { before_utc, before_monotonic, after_utc, after_monotonic };
234    let data = match data {
235        Err(e) => {
236            record_timestamped_error(
237                &service_name,
238                &tags,
239                timestamps,
240                format!("Failed to fetch Inspect data: {:?}", e),
241            );
242            return;
243        }
244        Ok(data) => data,
245    };
246
247    // Process the data for each tag
248    for tag in tags {
249        let Some(tag_info) = service_info.get(&tag) else {
250            warn!("Tag '{tag}' was not found in config; skipping it.");
251            continue;
252        };
253        let data_to_save = save_data_for_tag(data.clone(), timestamps.clone(), &tag, tag_info);
254        file_handler::write(&service_name, &tag, &data_to_save);
255    }
256}
257
258impl Fetcher {
259    /// Creates a Fetcher accessible through an unbounded channel. The receiving task and
260    /// its data structures will be preserved by the `async move {...}`'d Fetcher task.
261    /// so we just have to return the channel sender.
262    pub(crate) fn new(config: &Config) -> Result<(Fetcher, Task<()>), anyhow::Error> {
263        let mut services = HashMap::new();
264        let proxy = fuchsia_component::client::connect_to_protocol_at_path::<
265            fidl_fuchsia_diagnostics::ArchiveAccessorMarker,
266        >(INSPECT_SERVICE_PATH)?;
267        for (service_name, tags_info) in config.iter() {
268            let mut tags = HashMap::new();
269            for (tag_name, TagConfig { selectors, max_bytes, .. }) in tags_info.iter() {
270                let selectors = strip_inspect_prefix(selectors)
271                    .map(selectors::parse_verbose)
272                    .collect::<Result<Vec<_>, _>>()?;
273                let info = TagInfo { selectors, max_save_length: *max_bytes };
274                tags.insert(tag_name.clone(), info);
275            }
276            services.insert(service_name.clone(), tags);
277        }
278        let (invoke_fetch, mut receiver) = mpsc::unbounded::<FetchCommand>();
279        let task = fasync::Task::spawn(async move {
280            while let Some(FetchCommand { service, tags }) = receiver.next().await {
281                fetch_and_save(&proxy, &services, service, tags).await;
282            }
283        });
284        Ok((Fetcher(invoke_fetch), task))
285    }
286
287    pub(crate) fn send(&mut self, command: FetchCommand) {
288        let _ = self.0.unbounded_send(command);
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use diagnostics_data::{InspectDataBuilder, InspectHandleName, Timestamp};
296    use diagnostics_hierarchy::hierarchy;
297    use serde_json::json;
298
299    #[fuchsia::test]
300    fn test_selector_stripping() {
301        assert_eq!(
302            strip_inspect_prefix(&[
303                "INSPECT:foo".to_string(),
304                "oops:bar".to_string(),
305                "INSPECT:baz".to_string()
306            ])
307            .collect::<Vec<_>>(),
308            vec!["foo".to_string(), "baz".to_string()]
309        )
310    }
311
312    #[fuchsia::test]
313    fn test_condense_empty() {
314        let empty_data = InspectDataBuilder::new(
315            "a/b/c/d".try_into().unwrap(),
316            "fuchsia-pkg://test",
317            Timestamp::from_nanos(123456i64),
318        )
319        .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
320        .build();
321        let empty_data_result = condensed_map_of_data([empty_data]);
322        let empty_vec_result = condensed_map_of_data([]);
323
324        let expected_map = HashMap::new();
325
326        pretty_assertions::assert_eq!(*empty_data_result, expected_map, "golden diff failed.");
327        pretty_assertions::assert_eq!(*empty_vec_result, expected_map, "golden diff failed.");
328    }
329
330    fn make_data(mut hierarchy: DiagnosticsHierarchy, moniker: &str) -> Data<Inspect> {
331        hierarchy.sort();
332        InspectDataBuilder::new(
333            moniker.try_into().unwrap(),
334            "fuchsia-pkg://test",
335            Timestamp::from_nanos(123456i64),
336        )
337        .with_hierarchy(hierarchy)
338        .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
339        .build()
340    }
341
342    #[fuchsia::test]
343    fn test_condense_one() {
344        let data = make_data(
345            hierarchy! {
346                root: {
347                    "x": "foo",
348                    "y": "bar",
349                }
350            },
351            "a/b/c/d",
352        );
353
354        let expected_json = json!({
355            "a/b/c/d": {
356                "x": "foo",
357                "y": "bar",
358            }
359        });
360
361        let result = condensed_map_of_data([data]);
362
363        pretty_assertions::assert_eq!(
364            serde_json::to_value(&result).unwrap(),
365            expected_json,
366            "golden diff failed."
367        );
368    }
369
370    #[fuchsia::test]
371    fn test_condense_several_with_merge() {
372        let data_abcd = make_data(
373            hierarchy! {
374                root: {
375                    "x": "foo",
376                    "y": "bar",
377                }
378            },
379            "a/b/c/d",
380        );
381        let data_efgh = make_data(
382            hierarchy! {
383                root: {
384                    "x": "ex",
385                    "y": "why",
386                }
387            },
388            "e/f/g/h",
389        );
390        let data_abcd2 = make_data(
391            hierarchy! {
392                root: {
393                    "x": "X",
394                    "z": "zebra",
395                }
396            },
397            "a/b/c/d",
398        );
399
400        let expected_json = json!({
401            "a/b/c/d": {
402                "x": "X",
403                "y": "bar",
404                "z": "zebra",
405            },
406            "e/f/g/h": {
407                "x": "ex",
408                "y": "why"
409            }
410        });
411
412        let result = condensed_map_of_data(vec![data_abcd, data_efgh, data_abcd2]);
413
414        pretty_assertions::assert_eq!(
415            serde_json::to_value(&result).unwrap(),
416            expected_json,
417            "golden diff failed."
418        );
419    }
420
421    const TIMESTAMPS: Timestamps =
422        Timestamps { after_monotonic: 200, after_utc: 111, before_monotonic: 100, before_utc: 110 };
423
424    fn tag_info(max_save_length: usize, selectors: Vec<&str>) -> TagInfo {
425        let selectors = selectors
426            .iter()
427            .map(|s| selectors::parse_selector::<selectors::VerboseError>(s))
428            .collect::<Result<Vec<_>, _>>()
429            .unwrap();
430        TagInfo { selectors, max_save_length }
431    }
432
433    #[fuchsia::test]
434    fn save_data_no_data() {
435        let tag = Tag::new("tag".to_string()).unwrap();
436        let result = save_data_for_tag(
437            vec![],
438            TIMESTAMPS.clone(),
439            &tag,
440            &tag_info(1000, vec!["moniker:path:property"]),
441        );
442
443        assert_eq!(
444            serde_json::to_value(&result).unwrap(),
445            json!({
446                "@timestamps": {
447                    "after_monotonic": 200,
448                    "after_utc": 111,
449                    "before_monotonic": 100,
450                    "before_utc": 110,
451                },
452                ":error": {
453                    "description": "No data available for tag 'tag'",
454                },
455            })
456        );
457    }
458
459    #[fuchsia::test]
460    fn save_data_too_big() {
461        let tag = Tag::new("tag".to_string()).unwrap();
462        let data_abcd = make_data(
463            hierarchy! {
464                root: {
465                    "x": "foo",
466                    "y": "bar",
467                }
468            },
469            "a/b/c/d",
470        );
471
472        let result = save_data_for_tag(
473            vec![data_abcd],
474            TIMESTAMPS.clone(),
475            &tag,
476            &tag_info(20, vec!["a/b/c/d:root:y"]),
477        );
478
479        assert_eq!(
480            serde_json::to_value(&result).unwrap(),
481            json!({
482                "@timestamps": {
483                    "after_monotonic": 200,
484                    "after_utc": 111,
485                    "before_monotonic": 100,
486                    "before_utc": 110,
487                },
488                ":error": {
489                    "description": "Data too big: 23 > max length 20",
490                },
491            })
492        );
493    }
494
495    #[fuchsia::test]
496    fn save_string_with_data() {
497        let tag = Tag::new("tag".to_string()).unwrap();
498        let data_abcd = make_data(
499            hierarchy! {
500                root: {
501                    "x": "foo",
502                    "y": "bar",
503                }
504            },
505            "a/b/c/d",
506        );
507        let data_efgh = make_data(
508            hierarchy! {
509                root: {
510                    "x": "ex",
511                    "y": "why",
512                }
513            },
514            "e/f/g/h",
515        );
516        let data_abcd2 = make_data(
517            hierarchy! {
518                root: {
519                    "x": "X",
520                    "z": "zebra",
521                }
522            },
523            "a/b/c/d",
524        );
525
526        let result = save_data_for_tag(
527            vec![data_abcd, data_efgh, data_abcd2],
528            TIMESTAMPS.clone(),
529            &tag,
530            &tag_info(1000, vec!["a/b/c/d:root:y"]),
531        );
532
533        assert_eq!(
534            serde_json::to_value(&result).unwrap(),
535            json!({
536                "@timestamps": {
537                    "after_monotonic": 200,
538                    "after_utc": 111,
539                    "before_monotonic": 100,
540                    "before_utc": 110,
541                },
542                "@persist_size": 23,
543                "a/b/c/d": {
544                    "y": "bar",
545                },
546            })
547        );
548    }
549}