persistence/
fetcher.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::file_handler::{self, PersistData, PersistPayload, PersistSchema, Timestamps};
6use crate::scheduler::TagState;
7use anyhow::{Context, Error, bail};
8use diagnostics_data::{Data, DiagnosticsHierarchy, ExtendedMoniker, Inspect};
9use diagnostics_reader::{ArchiveReader, RetryConfig};
10use fidl_fuchsia_diagnostics as fdiagnostics;
11use log::*;
12use persistence_config::{ServiceName, Tag};
13use serde::ser::SerializeMap;
14use serde::{Serialize, Serializer};
15use serde_json::{Map, Value};
16use std::collections::HashMap;
17use std::collections::hash_map::Entry;
18
19// The capability name for the Inspect reader
20const INSPECT_SERVICE_PATH: &str = "/svc/fuchsia.diagnostics.ArchiveAccessor.feedback";
21
22fn extract_json_map(hierarchy: DiagnosticsHierarchy) -> Option<Map<String, Value>> {
23    let Ok(Value::Object(mut map)) = serde_json::to_value(hierarchy) else {
24        return None;
25    };
26    if map.len() != 1 || !map.contains_key("root") {
27        return Some(map);
28    }
29    if let Value::Object(map) = map.remove("root").unwrap() {
30        return Some(map);
31    }
32    None
33}
34
35#[derive(Debug, Eq, PartialEq)]
36struct DataMap(HashMap<ExtendedMoniker, Value>);
37
38impl Serialize for DataMap {
39    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40    where
41        S: Serializer,
42    {
43        let mut map = serializer.serialize_map(Some(self.0.len()))?;
44        for (moniker, value) in &self.0 {
45            map.serialize_entry(&moniker.to_string(), &value)?;
46        }
47        map.end()
48    }
49}
50
51impl std::ops::Deref for DataMap {
52    type Target = HashMap<ExtendedMoniker, Value>;
53
54    fn deref(&self) -> &Self::Target {
55        &self.0
56    }
57}
58
59fn condensed_map_of_data(items: impl IntoIterator<Item = Data<Inspect>>) -> DataMap {
60    DataMap(items.into_iter().fold(HashMap::new(), |mut entries, item| {
61        let Data { payload, moniker, .. } = item;
62        if let Some(new_map) = payload.and_then(extract_json_map) {
63            match entries.entry(moniker) {
64                Entry::Occupied(mut o) => {
65                    let existing_payload = o.get_mut();
66                    if let Value::Object(existing_payload_map) = existing_payload {
67                        existing_payload_map.extend(new_map);
68                    }
69                }
70                Entry::Vacant(v) => {
71                    v.insert(Value::Object(new_map));
72                }
73            }
74        }
75        entries
76    }))
77}
78
79fn save_data_for_tag(
80    inspect_data: Vec<Data<Inspect>>,
81    timestamps: Timestamps,
82    tag: &Tag,
83    selectors: &[fdiagnostics::Selector],
84    max_bytes: usize,
85) -> PersistSchema {
86    let mut filtered_datas = vec![];
87    for data in inspect_data {
88        match data.filter(selectors) {
89            Ok(Some(data)) => filtered_datas.push(data),
90            Ok(None) => {}
91            Err(e) => return PersistSchema::error(timestamps, format!("Filter error: {e}")),
92        }
93    }
94
95    if filtered_datas.is_empty() {
96        return PersistSchema::error(timestamps, format!("No data available for tag '{tag}'"));
97    }
98    // We may have multiple entries with the same moniker. Fold those together into a single entry.
99    let entries = condensed_map_of_data(filtered_datas);
100    let data_length = match serde_json::to_string(&entries) {
101        Ok(string) => string.len(),
102        Err(e) => {
103            return PersistSchema::error(timestamps, format!("Unexpected serialize error: {e}"));
104        }
105    };
106    if data_length > max_bytes {
107        let error_description = format!("Data too big: {data_length} > max length {max_bytes}");
108        return PersistSchema::error(timestamps, error_description);
109    }
110    PersistSchema {
111        timestamps,
112        payload: PersistPayload::Data(PersistData { data_length, entries: entries.0 }),
113    }
114}
115
116fn utc_now() -> i64 {
117    let now_utc = chrono::prelude::Utc::now(); // Consider using SystemTime::now()?
118    now_utc.timestamp() * 1_000_000_000 + now_utc.timestamp_subsec_nanos() as i64
119}
120
121pub(crate) async fn fetch_and_save<'a, P>(
122    services_state: &'a HashMap<ServiceName, HashMap<Tag, TagState>>,
123    pending: P,
124) -> Result<(), Error>
125where
126    P: IntoIterator<Item = (&'a ServiceName, &'a Vec<Tag>)>,
127    P::IntoIter: Clone,
128{
129    let pending_services = pending.into_iter().filter_map(|(service, tags)| {
130        services_state
131            .get(service)
132            .or_else(|| {
133                warn!("Skipping fetch request for unknown service \"{service}\"");
134                None
135            })
136            .map(move |service_state| {
137                let tag_states = tags.iter().filter_map(move |tag| {
138                    service_state.get(tag).or_else(|| {
139                        warn!("Skipping fetch request for unknown tag \"{tag}\" (service \"{service}\")");
140                        None
141                    }).map(|state| (tag, state))
142                });
143                (service, tag_states)
144            })
145    });
146
147    let selectors: Vec<fdiagnostics::Selector> = pending_services
148        .clone()
149        .flat_map(|(_, tags)| tags)
150        .flat_map(|(_, tag_state)| tag_state.selectors.clone())
151        // TODO(https://fxbug.dev/438817180): Use `.peekable()` to avoid unnecessary allocations.
152        .collect();
153
154    if selectors.is_empty() {
155        bail!("Nothing to fetch! This shouldn't ever happen; please file a bug");
156    }
157
158    let proxy = fuchsia_component::client::connect_to_protocol_at_path::<
159        fdiagnostics::ArchiveAccessorMarker,
160    >(INSPECT_SERVICE_PATH)?;
161    let mut source = ArchiveReader::inspect();
162    source
163        .with_archive(proxy.clone())
164        .retry(RetryConfig::never())
165        .add_selectors(selectors.into_iter());
166
167    // Do the fetch and record the timestamps.
168    let before_utc = utc_now();
169    let before_monotonic = zx::MonotonicInstant::get().into_nanos();
170    let data = source.snapshot().await.context("Failed to fetch Inspect data")?;
171    let after_utc = utc_now();
172    let after_monotonic = zx::MonotonicInstant::get().into_nanos();
173    let timestamps = Timestamps { before_utc, before_monotonic, after_utc, after_monotonic };
174
175    // Process the data for each tag
176    for (service, pending_tags) in pending_services {
177        for (tag, state) in pending_tags {
178            let data_to_save = save_data_for_tag(
179                data.clone(),
180                timestamps.clone(),
181                tag,
182                &state.selectors,
183                state.max_bytes,
184            );
185            file_handler::write(service, tag, &data_to_save);
186        }
187    }
188
189    Ok(())
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use diagnostics_data::{InspectDataBuilder, InspectHandleName, Timestamp};
196    use diagnostics_hierarchy::hierarchy;
197    use serde_json::json;
198
199    #[fuchsia::test]
200    fn test_condense_empty() {
201        let empty_data = InspectDataBuilder::new(
202            "a/b/c/d".try_into().unwrap(),
203            "fuchsia-pkg://test",
204            Timestamp::from_nanos(123456i64),
205        )
206        .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
207        .build();
208        let empty_data_result = condensed_map_of_data([empty_data]);
209        let empty_vec_result = condensed_map_of_data([]);
210
211        let expected_map = HashMap::new();
212
213        pretty_assertions::assert_eq!(*empty_data_result, expected_map, "golden diff failed.");
214        pretty_assertions::assert_eq!(*empty_vec_result, expected_map, "golden diff failed.");
215    }
216
217    fn make_data(mut hierarchy: DiagnosticsHierarchy, moniker: &str) -> Data<Inspect> {
218        hierarchy.sort();
219        InspectDataBuilder::new(
220            moniker.try_into().unwrap(),
221            "fuchsia-pkg://test",
222            Timestamp::from_nanos(123456i64),
223        )
224        .with_hierarchy(hierarchy)
225        .with_name(InspectHandleName::filename("test_file_plz_ignore.inspect"))
226        .build()
227    }
228
229    #[fuchsia::test]
230    fn test_condense_one() {
231        let data = make_data(
232            hierarchy! {
233                root: {
234                    "x": "foo",
235                    "y": "bar",
236                }
237            },
238            "a/b/c/d",
239        );
240
241        let expected_json = json!({
242            "a/b/c/d": {
243                "x": "foo",
244                "y": "bar",
245            }
246        });
247
248        let result = condensed_map_of_data([data]);
249
250        pretty_assertions::assert_eq!(
251            serde_json::to_value(&result).unwrap(),
252            expected_json,
253            "golden diff failed."
254        );
255    }
256
257    #[fuchsia::test]
258    fn test_condense_several_with_merge() {
259        let data_abcd = make_data(
260            hierarchy! {
261                root: {
262                    "x": "foo",
263                    "y": "bar",
264                }
265            },
266            "a/b/c/d",
267        );
268        let data_efgh = make_data(
269            hierarchy! {
270                root: {
271                    "x": "ex",
272                    "y": "why",
273                }
274            },
275            "e/f/g/h",
276        );
277        let data_abcd2 = make_data(
278            hierarchy! {
279                root: {
280                    "x": "X",
281                    "z": "zebra",
282                }
283            },
284            "a/b/c/d",
285        );
286
287        let expected_json = json!({
288            "a/b/c/d": {
289                "x": "X",
290                "y": "bar",
291                "z": "zebra",
292            },
293            "e/f/g/h": {
294                "x": "ex",
295                "y": "why"
296            }
297        });
298
299        let result = condensed_map_of_data(vec![data_abcd, data_efgh, data_abcd2]);
300
301        pretty_assertions::assert_eq!(
302            serde_json::to_value(&result).unwrap(),
303            expected_json,
304            "golden diff failed."
305        );
306    }
307
308    const TIMESTAMPS: Timestamps =
309        Timestamps { after_monotonic: 200, after_utc: 111, before_monotonic: 100, before_utc: 110 };
310
311    fn parse_selectors(selectors: &'static [&'static str]) -> Vec<fdiagnostics::Selector> {
312        selectors
313            .iter()
314            .map(|s| selectors::parse_selector::<selectors::VerboseError>(s))
315            .collect::<Result<Vec<_>, _>>()
316            .unwrap()
317    }
318
319    #[fuchsia::test]
320    fn save_data_no_data() {
321        let tag = Tag::new("tag".to_string()).unwrap();
322        let result = save_data_for_tag(
323            vec![],
324            TIMESTAMPS.clone(),
325            &tag,
326            &parse_selectors(&["moniker:path:property"]),
327            1000,
328        );
329
330        assert_eq!(
331            serde_json::to_value(&result).unwrap(),
332            json!({
333                "@timestamps": {
334                    "after_monotonic": 200,
335                    "after_utc": 111,
336                    "before_monotonic": 100,
337                    "before_utc": 110,
338                },
339                ":error": {
340                    "description": "No data available for tag 'tag'",
341                },
342            })
343        );
344    }
345
346    #[fuchsia::test]
347    fn save_data_too_big() {
348        let tag = Tag::new("tag".to_string()).unwrap();
349        let data_abcd = make_data(
350            hierarchy! {
351                root: {
352                    "x": "foo",
353                    "y": "bar",
354                }
355            },
356            "a/b/c/d",
357        );
358
359        let result = save_data_for_tag(
360            vec![data_abcd],
361            TIMESTAMPS.clone(),
362            &tag,
363            &parse_selectors(&["a/b/c/d:root:y"]),
364            20,
365        );
366
367        assert_eq!(
368            serde_json::to_value(&result).unwrap(),
369            json!({
370                "@timestamps": {
371                    "after_monotonic": 200,
372                    "after_utc": 111,
373                    "before_monotonic": 100,
374                    "before_utc": 110,
375                },
376                ":error": {
377                    "description": "Data too big: 23 > max length 20",
378                },
379            })
380        );
381    }
382
383    #[fuchsia::test]
384    fn save_string_with_data() {
385        let tag = Tag::new("tag".to_string()).unwrap();
386        let data_abcd = make_data(
387            hierarchy! {
388                root: {
389                    "x": "foo",
390                    "y": "bar",
391                }
392            },
393            "a/b/c/d",
394        );
395        let data_efgh = make_data(
396            hierarchy! {
397                root: {
398                    "x": "ex",
399                    "y": "why",
400                }
401            },
402            "e/f/g/h",
403        );
404        let data_abcd2 = make_data(
405            hierarchy! {
406                root: {
407                    "x": "X",
408                    "z": "zebra",
409                }
410            },
411            "a/b/c/d",
412        );
413
414        let result = save_data_for_tag(
415            vec![data_abcd, data_efgh, data_abcd2],
416            TIMESTAMPS.clone(),
417            &tag,
418            &parse_selectors(&["a/b/c/d:root:y"]),
419            1000,
420        );
421
422        assert_eq!(
423            serde_json::to_value(&result).unwrap(),
424            json!({
425                "@timestamps": {
426                    "after_monotonic": 200,
427                    "after_utc": 111,
428                    "before_monotonic": 100,
429                    "before_utc": 110,
430                },
431                "@persist_size": 23,
432                "a/b/c/d": {
433                    "y": "bar",
434                },
435            })
436        );
437    }
438}