1use crate::fetcher::{FetchCommand, Fetcher};
6use fuchsia_async::{self as fasync, TaskGroup};
7use fuchsia_sync::Mutex;
8
9use persistence_config::{Config, ServiceName, Tag, TagConfig};
10use std::collections::{BTreeMap, HashMap};
11use std::sync::Arc;
12
13#[derive(Clone)]
18pub(crate) struct Scheduler {
19 state: Arc<Mutex<State>>,
21}
22
23struct State {
24 fetcher: Fetcher,
25 services: HashMap<ServiceName, HashMap<Tag, TagState>>,
26 tasks: TaskGroup,
27}
28
29struct TagState {
30 backoff: zx::MonotonicDuration,
31 state: FetchState,
32 last_fetched: zx::MonotonicInstant,
33}
34
35impl Scheduler {
36 pub(crate) fn new(fetcher: Fetcher, config: &Config) -> Self {
37 let mut services = HashMap::new();
38 for (service, tags) in config {
39 let mut tag_states = HashMap::new();
40 for (tag, tag_config) in tags {
41 let TagConfig { min_seconds_between_fetch, .. } = tag_config;
42 let backoff = zx::MonotonicDuration::from_seconds(*min_seconds_between_fetch);
43 let tag_state = TagState {
44 backoff,
45 state: FetchState::Idle,
46 last_fetched: zx::MonotonicInstant::INFINITE_PAST,
47 };
48 tag_states.insert(tag.clone(), tag_state);
49 }
50 services.insert(service.clone(), tag_states);
51 }
52 let state = State { fetcher, services, tasks: TaskGroup::new() };
53 Scheduler { state: Arc::new(Mutex::new(state)) }
54 }
55
56 pub(crate) fn schedule(&self, service: &ServiceName, tags: Vec<Tag>) {
60 let now = zx::MonotonicInstant::get();
62 let mut state = self.state.lock();
63 let Some(service_info) = state.services.get_mut(service) else {
64 return;
65 };
66
67 let mut now_tags = vec![];
72 let mut later_tags: BTreeMap<zx::MonotonicInstant, Vec<Tag>> = BTreeMap::new();
73 for tag in tags {
74 let Some(tag_state) = service_info.get_mut(&tag) else {
75 return;
76 };
77 if matches!(tag_state.state, FetchState::Pending) {
78 continue;
79 }
80 if tag_state.last_fetched + tag_state.backoff <= now {
81 now_tags.push(tag);
82 tag_state.last_fetched = now;
83 } else {
84 let next_fetch = tag_state.last_fetched + tag_state.backoff;
85 tag_state.last_fetched = next_fetch;
86 tag_state.state = FetchState::Pending;
87 later_tags.entry(next_fetch).or_default().push(tag);
88 }
89 }
90 if !now_tags.is_empty() {
91 state.fetcher.send(FetchCommand { service: service.clone(), tags: now_tags });
92 }
93
94 while let Some((next_fetch, tags)) = later_tags.pop_first() {
96 self.enqueue(&mut state, next_fetch, FetchCommand { service: service.clone(), tags });
97 }
98 }
99
100 fn enqueue(&self, state: &mut State, time: zx::MonotonicInstant, command: FetchCommand) {
101 let this = self.clone();
102 let mut fetcher = state.fetcher.clone();
103 state.tasks.spawn(async move {
104 fasync::Timer::new(time).await;
105 {
106 let mut state = this.state.lock();
107 let Some(tag_states) = state.services.get_mut(&command.service) else {
108 return;
109 };
110 for tag in command.tags.iter() {
111 tag_states.get_mut(tag).unwrap().state = FetchState::Idle;
112 }
113 }
114 fetcher.send(command);
115 });
116 }
117}
118
119enum FetchState {
125 Pending,
126 Idle,
127}