persistence/
scheduler.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fetcher;
6use fuchsia_sync::Mutex;
7use log::{error, warn};
8use persistence_config::{Config, ServiceName, Tag, TagConfig};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11use {
12    fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_diagnostics_persist as fpersist,
13    fuchsia_async as fasync,
14};
15
16// This contains the logic to decide which tags to fetch at what times. It contains the state of
17// each tag (when last fetched, whether currently queued). When a request arrives via FIDL, it's
18// sent here and results in requests queued to the Fetcher.
19
20// Selectors for Inspect data must start with this exact string.
21const INSPECT_PREFIX: &str = "INSPECT:";
22
23/// Tracks when each tag was persisted last, as necessary for implementing
24/// debounce on [`TagConfig`]'s `min_seconds_between_fetch`.
25#[derive(Clone)]
26pub(crate) struct Scheduler {
27    scope: fasync::ScopeHandle,
28
29    /// Registry of all tags with additional metadata necessary for scheduling
30    /// fetches from Inspect.
31    ///
32    /// TODO(https://fxbug.dev/437989316): Save memory using Vec instead.
33    service_state: Arc<HashMap<ServiceName, HashMap<Tag, TagState>>>,
34
35    /// Collection of which tags are scheduled to be fetched from Inspect.
36    fetch_schedule: Arc<Mutex<BTreeMap<zx::MonotonicInstant, ServiceTags>>>,
37}
38
39type ServiceTags = Vec<(ServiceName, Tag)>;
40
41/// Compilation of [`TagConfig`] with additional tracking when this tag was last
42/// persisted.
43pub(crate) struct TagState {
44    pub selectors: Vec<fdiagnostics::Selector>,
45    pub max_bytes: usize,
46    backoff: zx::MonotonicDuration,
47    state: Mutex<TagFetchState>,
48}
49
50impl TryFrom<&TagConfig> for TagState {
51    type Error = selectors::Error;
52    fn try_from(value: &TagConfig) -> Result<Self, Self::Error> {
53        Ok(Self {
54            selectors: value
55                .selectors
56                .iter()
57                .filter_map(strip_inspect_prefix)
58                .map(selectors::parse_verbose)
59                .collect::<Result<Vec<_>, _>>()?,
60            max_bytes: value.max_bytes,
61            backoff: zx::MonotonicDuration::from_seconds(value.min_seconds_between_fetch),
62            state: Mutex::new(TagFetchState::ReadyAfter(zx::MonotonicInstant::INFINITE_PAST)),
63        })
64    }
65}
66
67fn strip_inspect_prefix(selector: &String) -> Option<&str> {
68    if &selector[..INSPECT_PREFIX.len()] == INSPECT_PREFIX {
69        Some(&selector[INSPECT_PREFIX.len()..])
70    } else {
71        warn!("Selector does not begin with \"INSPECT:\": {selector}");
72        None
73    }
74}
75
76/// Scheduler error.
77#[derive(thiserror::Error, Debug)]
78pub(crate) enum Error {
79    #[error("unknown service name \"{0}\"")]
80    UnknownService(ServiceName),
81    #[error("unknown tag name \"{tag}\" for service \"{service}\"")]
82    UnknownTag { service: ServiceName, tag: Tag },
83    #[error("invalid tag name \"{0}\" must match [a-z][a-z-]*")]
84    InvalidTag(String),
85    #[error("invalid selector")]
86    InvalidSelector(#[from] selectors::Error),
87}
88
89impl From<Error> for fpersist::PersistResult {
90    fn from(value: Error) -> Self {
91        match value {
92            Error::UnknownService(_) => Self::BadName,
93            Error::UnknownTag { .. } => Self::BadName,
94            Error::InvalidTag(_) => Self::BadName,
95            Error::InvalidSelector(_) => Self::InternalError,
96        }
97    }
98}
99
100impl Scheduler {
101    pub(crate) fn new(scope: fasync::ScopeHandle, config: &Config) -> Result<Self, Error> {
102        let mut service_state = HashMap::with_capacity(config.len());
103        for (service, tags) in config {
104            let mut tag_state = HashMap::with_capacity(tags.len());
105            for (tag, tag_config) in tags {
106                tag_state.insert(tag.clone(), tag_config.try_into()?);
107            }
108            service_state.insert(service.clone(), tag_state);
109        }
110        Ok(Scheduler {
111            scope,
112            service_state: Arc::new(service_state),
113            fetch_schedule: Default::default(),
114        })
115    }
116
117    /// Gets a service name and a list of valid tags, and queues any fetches that are not already
118    /// pending. Updates the last-fetched time on any tag it queues, setting it equal to the later
119    /// of the current time and the time the fetch becomes possible.
120    pub(crate) fn schedule(
121        &self,
122        service: &ServiceName,
123        tags: impl IntoIterator<Item = String>,
124    ) -> Vec<Result<(), Error>> {
125        // Every tag we process should use the same Now
126        let now = zx::MonotonicInstant::get();
127        let Some(tag_states) = self.service_state.get(service) else {
128            return tags.into_iter().map(|_| Err(Error::UnknownService(service.clone()))).collect();
129        };
130
131        // Filter tags that need to be fetch now from those that need to be
132        // fetched later. Group later tags by their next_fetch time using a
133        // b-tree, making it efficient to iterate over these batches in
134        // order of next_fetch time.
135        let (response, now_tags) = {
136            let mut now_tags = Vec::new();
137            let mut schedule = self.fetch_schedule.lock();
138            let response: Vec<Result<(), Error>> = tags
139                .into_iter()
140                .map(|tag_raw| {
141                    let tag = Tag::new(tag_raw.clone()).map_err(|_| Error::InvalidTag(tag_raw))?;
142                    tag_states
143                        .get(&tag)
144                        .ok_or_else(|| Error::UnknownTag {
145                            service: service.clone(),
146                            tag: tag.clone(),
147                        })
148                        .map(|tag_state| {
149                            let mut state = tag_state.state.lock();
150                            match *state {
151                                TagFetchState::ReadyAfter(wait_until) => {
152                                    if wait_until <= now {
153                                        // Debounce period has elapsed; fetch this tag immediately.
154                                        now_tags.push(tag);
155                                        *state = TagFetchState::ReadyAfter(now + tag_state.backoff);
156                                    } else {
157                                        // Debounce is still active; schedule this tag for later fetch.
158                                        schedule
159                                            .entry(wait_until)
160                                            .or_default()
161                                            .push((service.clone(), tag));
162                                        *state = TagFetchState::Scheduled;
163                                    }
164                                }
165                                TagFetchState::Scheduled => {
166                                    // This tag has already been scheduled; no action required.
167                                }
168                            }
169                        })
170                })
171                .collect();
172            (response, now_tags)
173        };
174
175        if !now_tags.is_empty() {
176            let service_state = self.service_state.clone();
177            let service = service.clone();
178            self.scope.spawn(async move {
179                let pending = [(&service, &now_tags)];
180                if let Err(e) = fetcher::fetch_and_save(&service_state, pending).await {
181                    error!("Failed to fetch inspect: {e:?}");
182                }
183            });
184        }
185
186        self.schedule_next_batch();
187
188        response
189    }
190
191    /// Spawn a task to check if there are any pending fetches.
192    fn schedule_next_batch(&self) {
193        let Some(next_fetch) = self.fetch_schedule.lock().first_entry().map(|e| *e.key()) else {
194            // No pending fetches; nothing to do.
195            return;
196        };
197
198        // Schedule a task to fetch them all at the same time.
199        let schedule = self.fetch_schedule.clone();
200        let service_state = self.service_state.clone();
201        self.scope.spawn(async move {
202            let wait = next_fetch - zx::MonotonicInstant::get();
203            fasync::Timer::new(wait).await;
204
205            // Collect pending tags to fetch from Inspect.
206            let pending = {
207                let mut pending: HashMap<ServiceName, Vec<Tag>> = HashMap::new();
208                let mut schedule = schedule.lock();
209                let now = zx::MonotonicInstant::get();
210                while let Some(entry) = schedule.first_entry() {
211                    if *entry.key() > now {
212                        break;
213                    }
214                    for (service, tag) in entry.remove().into_iter() {
215                        let TagState { state, backoff, .. } = service_state
216                            .get(&service)
217                            // SAFETY: Config cannot change during runtime.
218                            .expect("Missing service")
219                            .get(&tag)
220                            // SAFETY: Config cannot change during runtime.
221                            .expect("Missing tag");
222                        *state.lock() = TagFetchState::ReadyAfter(now + *backoff);
223                        pending.entry(service).or_default().push(tag);
224                    }
225                }
226                pending
227            };
228
229            if pending.is_empty() {
230                return;
231            }
232
233            if let Err(e) = fetcher::fetch_and_save(&service_state, &pending).await {
234                error!("Failed to fetch pending tags from Inspect: {e:?}");
235            }
236        });
237    }
238}
239
240/// Tracks when a tag is ready to be fetched again.
241enum TagFetchState {
242    /// Tag is ready to be fetched after this time.
243    ReadyAfter(zx::MonotonicInstant),
244    /// Tag is scheduled to be fetched.
245    Scheduled,
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    #[fuchsia::test]
253    fn test_selector_stripping() {
254        assert_eq!(
255            ["INSPECT:foo".to_string(), "oops:bar".to_string(), "INSPECT:baz".to_string()]
256                .iter()
257                .filter_map(strip_inspect_prefix)
258                .collect::<Vec<_>>(),
259            ["foo".to_string(), "baz".to_string()]
260        )
261    }
262}