1use crate::fetcher::{FetchCommand, Fetcher};
6use fuchsia_async::{self as fasync, TaskGroup};
7use fuchsia_sync::Mutex;
8
9use persistence_config::{Config, ServiceName, Tag, TagConfig};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13#[derive(Clone)]
18pub(crate) struct Scheduler {
19 state: Arc<Mutex<State>>,
21}
22
23struct State {
24 fetcher: Fetcher,
25 services: HashMap<ServiceName, HashMap<Tag, TagState>>,
26 tasks: TaskGroup,
27}
28
29struct TagState {
30 backoff: zx::MonotonicDuration,
31 state: FetchState,
32 last_fetched: zx::MonotonicInstant,
33}
34
35impl Scheduler {
36 pub(crate) fn new(fetcher: Fetcher, config: &Config) -> Self {
37 let mut services = HashMap::new();
38 for (service, tags) in config {
39 let mut tag_states = HashMap::new();
40 for (tag, tag_config) in tags {
41 let TagConfig { min_seconds_between_fetch, .. } = tag_config;
42 let backoff = zx::MonotonicDuration::from_seconds(*min_seconds_between_fetch);
43 let tag_state = TagState {
44 backoff,
45 state: FetchState::Idle,
46 last_fetched: zx::MonotonicInstant::INFINITE_PAST,
47 };
48 tag_states.insert(tag.clone(), tag_state);
49 }
50 services.insert(service.clone(), tag_states);
51 }
52 let state = State { fetcher, services, tasks: TaskGroup::new() };
53 Scheduler { state: Arc::new(Mutex::new(state)) }
54 }
55
56 pub(crate) fn schedule(&self, service: &ServiceName, tags: Vec<Tag>) {
60 let now = zx::MonotonicInstant::get();
62 let mut state = self.state.lock();
63 let Some(service_info) = state.services.get_mut(service) else {
64 return;
65 };
66
67 let mut now_tags = vec![];
68 let mut later_tags: Vec<(zx::MonotonicInstant, Tag)> = vec![];
69 for tag in tags {
70 let Some(tag_state) = service_info.get_mut(&tag) else {
71 return;
72 };
73 if matches!(tag_state.state, FetchState::Pending) {
74 continue;
75 }
76 if tag_state.last_fetched + tag_state.backoff <= now {
77 now_tags.push(tag);
78 tag_state.last_fetched = now;
79 } else {
80 let next_fetch = tag_state.last_fetched + tag_state.backoff;
81 tag_state.last_fetched = next_fetch;
82 tag_state.state = FetchState::Pending;
83 later_tags.push((next_fetch, tag));
84 }
85 }
86 if !now_tags.is_empty() {
87 state.fetcher.send(FetchCommand { service: service.clone(), tags: now_tags });
88 }
89 later_tags.sort_by(|a, b| a.0.cmp(&b.0));
91 while !later_tags.is_empty() {
92 let first_time = later_tags[0].0;
94 let mut first_tags = vec![];
95 let mut remaining_tags = vec![];
96 for (next_fetch, tag) in later_tags {
97 if next_fetch == first_time {
98 first_tags.push(tag);
99 } else {
100 remaining_tags.push((next_fetch, tag));
101 }
102 }
103 later_tags = remaining_tags;
104 self.enqueue(
105 &mut state,
106 first_time,
107 FetchCommand { service: service.clone(), tags: first_tags },
108 );
109 }
110 }
111
112 fn enqueue(&self, state: &mut State, time: zx::MonotonicInstant, command: FetchCommand) {
113 let this = self.clone();
114 let mut fetcher = state.fetcher.clone();
115 state.tasks.spawn(async move {
116 fasync::Timer::new(time).await;
117 {
118 let mut state = this.state.lock();
119 let Some(tag_states) = state.services.get_mut(&command.service) else {
120 return;
121 };
122 for tag in command.tags.iter() {
123 tag_states.get_mut(tag).unwrap().state = FetchState::Idle;
124 }
125 }
126 fetcher.send(command);
127 });
128 }
129}
130
131enum FetchState {
137 Pending,
138 Idle,
139}