Skip to main content

persistence/
scheduler.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::BUILD_CONFIG;
6use crate::fetcher::{PersistenceData, ServiceData, TagData};
7use crate::file_handler::{self, Timestamps};
8use anyhow::{Context, anyhow, bail};
9use fidl::endpoints::ControlHandle;
10use futures::{StreamExt, TryStreamExt};
11use hashbrown::HashMap;
12use itertools::Itertools;
13use log::{debug, error, warn};
14
15use fidl_fuchsia_diagnostics as fdiagnostics;
16use fuchsia_async as fasync;
17use persistence_config::{Config, ServiceName, Tag};
18use selectors::SelectorExt;
19use std::collections::VecDeque;
20use std::pin::pin;
21use std::sync::Arc;
22
23// This contains the logic to configure the Archivist to sample diagnostics data based on the
24// persistence configuration. It handles the `fuchsia.diagnostics.SampleSink` protocol to receive
25// signals when data is ready, reads the data, and persists it.
26
27/// Tracks when each tag was persisted last, as necessary for implementing
28/// debounce on [`TagConfig`]'s `min_seconds_between_fetch`.
29#[derive(Clone)]
30pub(crate) struct Scheduler {
31    /// Flat lookup table for corresponding OnSampleReady responses with
32    /// Persistence configured services/tags.
33    tag_info: Arc<Vec<QuickTagInfo>>,
34}
35
36/// Scheduler error.
37#[derive(thiserror::Error, Debug)]
38pub(crate) enum Error {
39    #[error("invalid selector")]
40    InvalidSelector(#[from] selectors::Error),
41    #[error("fidl error: {0:?}")]
42    Fidl(#[from] fidl::Error),
43    #[error("unable to configure Archivist sampling: {0:?}")]
44    UnableToSample(#[from] anyhow::Error),
45}
46
47impl Scheduler {
48    pub(crate) fn new(config: &Config) -> Self {
49        let tag_info = config
50            .clone()
51            .into_iter()
52            .flat_map(|(service, tags)| {
53                tags.into_iter().map(move |(tag, config)| QuickTagInfo {
54                    service: service.clone(),
55                    tag,
56                    max_bytes: config.max_bytes,
57                    selectors: config.selectors,
58                })
59            })
60            .collect::<Vec<_>>();
61
62        Self { tag_info: Arc::new(tag_info) }
63    }
64
65    pub(crate) async fn subscribe(
66        &self,
67        scope: fasync::ScopeHandle,
68        config: &Config,
69    ) -> Result<(), Error> {
70        let sample_datums = config
71            .values()
72            .flat_map(|tags| {
73                tags.values().flat_map(|tag| {
74                    tag.selectors
75                        .clone()
76                        .into_iter()
77                        .map(|selector| (selector, tag.min_seconds_between_fetch))
78                })
79            })
80            // Convert to SampleDatums.
81            .map(|(selector, min_seconds_between_fetch)| fdiagnostics::SampleDatum {
82                selector: Some(fdiagnostics::SelectorArgument::StructuredSelector(selector)),
83                strategy: Some(fdiagnostics::SampleStrategy::OnDiff),
84                interval_secs: Some(min_seconds_between_fetch),
85                ..Default::default()
86            })
87            .collect::<Vec<_>>();
88
89        if sample_datums.is_empty() {
90            warn!("No tags configured; skipping subscription to fuchsia.diagnostics.Sample");
91            return Ok(());
92        }
93
94        let sample =
95            fuchsia_component::client::connect_to_protocol::<fdiagnostics::SampleMarker>()?;
96
97        for chunk in sample_datums.chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize) {
98            sample.set(&fdiagnostics::SampleParameters {
99                data: Some(chunk.to_vec()),
100                ..Default::default()
101            })?;
102        }
103
104        let (client_end, sample_sink_stream) =
105            fidl::endpoints::create_request_stream::<fdiagnostics::SampleSinkMarker>();
106
107        // Start the SampleSink server before committing Sample configuration.
108        // Dropping the JoinHandle will detach it.
109        let scheduler = self.clone();
110        scope.spawn(async move {
111            if let Err(e) = scheduler.handle_sample_sink(sample_sink_stream).await {
112                error!("Error serving SampleSink: {e:?}");
113            }
114        });
115
116        sample
117            .commit(client_end)
118            .await?
119            .map_err(|e| anyhow!("failed to commit fuchsia.diagnostics.Sample config: {e:?}"))?;
120
121        Ok(())
122    }
123
124    pub(crate) async fn handle_sample_sink(
125        &self,
126        stream: fdiagnostics::SampleSinkRequestStream,
127    ) -> Result<(), anyhow::Error> {
128        let (stream, stalled) = detect_stall::until_stalled(stream, BUILD_CONFIG.stall_interval);
129        let mut stream = pin!(stream);
130        loop {
131            match stream.try_next().await {
132                Ok(Some(fdiagnostics::SampleSinkRequest::OnSampleReadied {
133                    event,
134                    control_handle,
135                })) => match event {
136                    fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
137                        batch_iter,
138                        seconds_since_start: _,
139                        __source_breaking: _,
140                    }) => {
141                        if let Some(iter) = batch_iter {
142                            if let Err(e) = self.handle_sample_ready(iter).await {
143                                warn!("Failed to process Sample: {e:?}");
144                            }
145                        } else {
146                            bail!("expected BatchIterator, got None");
147                        }
148                    }
149                    fdiagnostics::SampleSinkResult::Error(err) => {
150                        control_handle.shutdown();
151                        bail!("Failed receiving samples: {err:?}");
152                    }
153                    fdiagnostics::SampleSinkResult::__SourceBreaking { unknown_ordinal } => {
154                        control_handle.shutdown();
155                        bail!("unknown ordinal {unknown_ordinal}");
156                    }
157                },
158                Ok(Some(req)) => {
159                    warn!("Unknown SampleSinkRequest {req:?}");
160                }
161                Ok(None) => break,
162                Err(e) => bail!("Unexpected error handling SampleSinkRequest: {e}"),
163            }
164        }
165        if let Some(server_end) = stalled.await.context("FIDL error")? {
166            // Send the server endpoint back to the framework.
167            debug!("Escrowing fuchsia.diagnostics.SampleSink");
168            fuchsia_component::client::connect_channel_to_protocol_at_path(
169                server_end,
170                "/escrow/fuchsia.diagnostics.SampleSink",
171            )
172            .context("Failed to connect to fuchsia.diagnostics.SampleSink")?;
173        }
174        Ok(())
175    }
176
177    async fn handle_sample_ready(
178        &self,
179        iter: fidl::endpoints::ClientEnd<fdiagnostics::BatchIteratorMarker>,
180    ) -> Result<(), anyhow::Error> {
181        let timestamps = file_handler::Timestamps {
182            last_sample_boot: zx::BootInstant::get(),
183            last_sample_utc: fuchsia_runtime::utc_time(),
184        };
185
186        let proxy = Arc::new(iter.into_proxy());
187        let (snapshot, errs): (Vec<_>, Vec<_>) =
188            diagnostics_reader::drain_batch_iterator::<diagnostics_data::InspectData>(proxy)
189                .collect::<Vec<_>>()
190                .await
191                .into_iter()
192                .partition_result();
193        if !errs.is_empty() {
194            if snapshot.is_empty() {
195                bail!("failed reading all Inspect data: {errs:?}")
196            } else {
197                warn!("failed reading some Inspect data: {errs:?}")
198            }
199        }
200
201        let mut current_data = match file_handler::current_data().await {
202            Ok(Some(data)) => data,
203            Ok(None) => PersistenceData::default(),
204            Err(e) => {
205                error!("Current data corrupted, starting fresh: {e:?}");
206                PersistenceData::default()
207            }
208        };
209
210        for tag_info in self.tag_info.iter() {
211            for data in &snapshot {
212                if data.moniker.match_against_selectors(&tag_info.selectors).next().is_none() {
213                    continue;
214                }
215                match data.clone().filter(&tag_info.selectors) {
216                    Ok(Some(data)) => {
217                        modify_tag_data(&mut current_data, tag_info, &timestamps, |tag_data| {
218                            tag_data.merge(timestamps.clone(), data)
219                        })
220                    }
221                    Ok(None) => {}
222                    Err(e) => {
223                        modify_tag_data(&mut current_data, tag_info, &timestamps, |tag_data| {
224                            tag_data.add_error(e.to_string())
225                        })
226                    }
227                }
228            }
229        }
230
231        file_handler::write_current_data(&current_data).await
232    }
233}
234
235fn modify_tag_data<'a>(
236    data: &'a mut PersistenceData,
237    lookup: &'a QuickTagInfo,
238    timestamps: &Timestamps,
239    modify_fn: impl FnOnce(&mut TagData),
240) {
241    let QuickTagInfo { service, tag, max_bytes, selectors } = lookup;
242
243    let service_data = data.entry_ref(service).or_insert_with(ServiceData::default);
244    let tag_data = service_data.entry_ref(tag).or_insert_with(|| TagData {
245        max_bytes: *max_bytes,
246        total_bytes: 0,
247        timestamps: timestamps.clone(),
248        selectors: selectors.clone(),
249        data: HashMap::new(),
250        errors: VecDeque::new(),
251    });
252
253    modify_fn(tag_data)
254}
255
256/// Minimal set of information to perform quick lookups of tags.
257struct QuickTagInfo {
258    service: ServiceName,
259    tag: Tag,
260    max_bytes: usize,
261    selectors: Vec<fidl_fuchsia_diagnostics::Selector>,
262}