persistence/
scheduler.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fetcher::{FetchCommand, Fetcher};
6use fuchsia_async::{self as fasync, TaskGroup};
7use fuchsia_sync::Mutex;
8
9use persistence_config::{Config, ServiceName, Tag, TagConfig};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13// This contains the logic to decide which tags to fetch at what times. It contains the state of
14// each tag (when last fetched, whether currently queued). When a request arrives via FIDL, it's
15// sent here and results in requests queued to the Fetcher.
16
17#[derive(Clone)]
18pub(crate) struct Scheduler {
19    // This is a global lock. Scheduler only does schedule() which is synchronous and quick.
20    state: Arc<Mutex<State>>,
21}
22
23struct State {
24    fetcher: Fetcher,
25    services: HashMap<ServiceName, HashMap<Tag, TagState>>,
26    tasks: TaskGroup,
27}
28
29struct TagState {
30    backoff: zx::MonotonicDuration,
31    state: FetchState,
32    last_fetched: zx::MonotonicInstant,
33}
34
35impl Scheduler {
36    pub(crate) fn new(fetcher: Fetcher, config: &Config) -> Self {
37        let mut services = HashMap::new();
38        for (service, tags) in config {
39            let mut tag_states = HashMap::new();
40            for (tag, tag_config) in tags {
41                let TagConfig { min_seconds_between_fetch, .. } = tag_config;
42                let backoff = zx::MonotonicDuration::from_seconds(*min_seconds_between_fetch);
43                let tag_state = TagState {
44                    backoff,
45                    state: FetchState::Idle,
46                    last_fetched: zx::MonotonicInstant::INFINITE_PAST,
47                };
48                tag_states.insert(tag.clone(), tag_state);
49            }
50            services.insert(service.clone(), tag_states);
51        }
52        let state = State { fetcher, services, tasks: TaskGroup::new() };
53        Scheduler { state: Arc::new(Mutex::new(state)) }
54    }
55
56    /// Gets a service name and a list of valid tags, and queues any fetches that are not already
57    /// pending. Updates the last-fetched time on any tag it queues, setting it equal to the later
58    /// of the current time and the time the fetch becomes possible.
59    pub(crate) fn schedule(&self, service: &ServiceName, tags: Vec<Tag>) {
60        // Every tag we process should use the same Now
61        let now = zx::MonotonicInstant::get();
62        let mut state = self.state.lock();
63        let Some(service_info) = state.services.get_mut(service) else {
64            return;
65        };
66
67        let mut now_tags = vec![];
68        let mut later_tags: Vec<(zx::MonotonicInstant, Tag)> = vec![];
69        for tag in tags {
70            let Some(tag_state) = service_info.get_mut(&tag) else {
71                return;
72            };
73            if matches!(tag_state.state, FetchState::Pending) {
74                continue;
75            }
76            if tag_state.last_fetched + tag_state.backoff <= now {
77                now_tags.push(tag);
78                tag_state.last_fetched = now;
79            } else {
80                let next_fetch = tag_state.last_fetched + tag_state.backoff;
81                tag_state.last_fetched = next_fetch;
82                tag_state.state = FetchState::Pending;
83                later_tags.push((next_fetch, tag));
84            }
85        }
86        if !now_tags.is_empty() {
87            state.fetcher.send(FetchCommand { service: service.clone(), tags: now_tags });
88        }
89        // later_tags may not all be fetchable at the same time. Batch the ones that are.
90        later_tags.sort_by(|a, b| a.0.cmp(&b.0));
91        while !later_tags.is_empty() {
92            // This is N^2 but N will be too small to matter.
93            let first_time = later_tags[0].0;
94            let mut first_tags = vec![];
95            let mut remaining_tags = vec![];
96            for (next_fetch, tag) in later_tags {
97                if next_fetch == first_time {
98                    first_tags.push(tag);
99                } else {
100                    remaining_tags.push((next_fetch, tag));
101                }
102            }
103            later_tags = remaining_tags;
104            self.enqueue(
105                &mut state,
106                first_time,
107                FetchCommand { service: service.clone(), tags: first_tags },
108            );
109        }
110    }
111
112    fn enqueue(&self, state: &mut State, time: zx::MonotonicInstant, command: FetchCommand) {
113        let this = self.clone();
114        let mut fetcher = state.fetcher.clone();
115        state.tasks.spawn(async move {
116            fasync::Timer::new(time).await;
117            {
118                let mut state = this.state.lock();
119                let Some(tag_states) = state.services.get_mut(&command.service) else {
120                    return;
121                };
122                for tag in command.tags.iter() {
123                    tag_states.get_mut(tag).unwrap().state = FetchState::Idle;
124                }
125            }
126            fetcher.send(command);
127        });
128    }
129}
130
131/// FetchState tells whether a tag is currently waiting to be dispatched or not. If it is, then
132/// another request to fetch that tag should cause no change. If it's not waiting, then it can
133/// either be fetched immediately (in which case its state stays Idle, but the last-fetched time
134/// will be updated to Now) or it will be queued (in which case its state is Pending and its
135/// last-fetched time will be set forward to the time it's going to be fetched).
136enum FetchState {
137    Pending,
138    Idle,
139}