1use crate::fetcher;
6use fuchsia_sync::Mutex;
7use log::{error, warn};
8use persistence_config::{Config, ServiceName, Tag, TagConfig};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11use {
12 fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_diagnostics_persist as fpersist,
13 fuchsia_async as fasync,
14};
15
16const INSPECT_PREFIX: &str = "INSPECT:";
22
23#[derive(Clone)]
26pub(crate) struct Scheduler {
27 scope: fasync::ScopeHandle,
28
29 service_state: Arc<HashMap<ServiceName, HashMap<Tag, TagState>>>,
34
35 fetch_schedule: Arc<Mutex<BTreeMap<zx::MonotonicInstant, ServiceTags>>>,
37}
38
39type ServiceTags = Vec<(ServiceName, Tag)>;
40
41pub(crate) struct TagState {
44 pub selectors: Vec<fdiagnostics::Selector>,
45 pub max_bytes: usize,
46 backoff: zx::MonotonicDuration,
47 state: Mutex<TagFetchState>,
48}
49
50impl TryFrom<&TagConfig> for TagState {
51 type Error = selectors::Error;
52 fn try_from(value: &TagConfig) -> Result<Self, Self::Error> {
53 Ok(Self {
54 selectors: value
55 .selectors
56 .iter()
57 .filter_map(strip_inspect_prefix)
58 .map(selectors::parse_verbose)
59 .collect::<Result<Vec<_>, _>>()?,
60 max_bytes: value.max_bytes,
61 backoff: zx::MonotonicDuration::from_seconds(value.min_seconds_between_fetch),
62 state: Mutex::new(TagFetchState::ReadyAfter(zx::MonotonicInstant::INFINITE_PAST)),
63 })
64 }
65}
66
67fn strip_inspect_prefix(selector: &String) -> Option<&str> {
68 if &selector[..INSPECT_PREFIX.len()] == INSPECT_PREFIX {
69 Some(&selector[INSPECT_PREFIX.len()..])
70 } else {
71 warn!("Selector does not begin with \"INSPECT:\": {selector}");
72 None
73 }
74}
75
76#[derive(thiserror::Error, Debug)]
78pub(crate) enum Error {
79 #[error("unknown service name \"{0}\"")]
80 UnknownService(ServiceName),
81 #[error("unknown tag name \"{tag}\" for service \"{service}\"")]
82 UnknownTag { service: ServiceName, tag: Tag },
83 #[error("invalid tag name \"{0}\" must match [a-z][a-z-]*")]
84 InvalidTag(String),
85 #[error("invalid selector")]
86 InvalidSelector(#[from] selectors::Error),
87}
88
89impl From<Error> for fpersist::PersistResult {
90 fn from(value: Error) -> Self {
91 match value {
92 Error::UnknownService(_) => Self::BadName,
93 Error::UnknownTag { .. } => Self::BadName,
94 Error::InvalidTag(_) => Self::BadName,
95 Error::InvalidSelector(_) => Self::InternalError,
96 }
97 }
98}
99
100impl Scheduler {
101 pub(crate) fn new(scope: fasync::ScopeHandle, config: &Config) -> Result<Self, Error> {
102 let mut service_state = HashMap::with_capacity(config.len());
103 for (service, tags) in config {
104 let mut tag_state = HashMap::with_capacity(tags.len());
105 for (tag, tag_config) in tags {
106 tag_state.insert(tag.clone(), tag_config.try_into()?);
107 }
108 service_state.insert(service.clone(), tag_state);
109 }
110 Ok(Scheduler {
111 scope,
112 service_state: Arc::new(service_state),
113 fetch_schedule: Default::default(),
114 })
115 }
116
117 pub(crate) fn schedule(
121 &self,
122 service: &ServiceName,
123 tags: impl IntoIterator<Item = String>,
124 ) -> Vec<Result<(), Error>> {
125 let now = zx::MonotonicInstant::get();
127 let Some(tag_states) = self.service_state.get(service) else {
128 return tags.into_iter().map(|_| Err(Error::UnknownService(service.clone()))).collect();
129 };
130
131 let (response, now_tags) = {
136 let mut now_tags = Vec::new();
137 let mut schedule = self.fetch_schedule.lock();
138 let response: Vec<Result<(), Error>> = tags
139 .into_iter()
140 .map(|tag_raw| {
141 let tag = Tag::new(tag_raw.clone()).map_err(|_| Error::InvalidTag(tag_raw))?;
142 tag_states
143 .get(&tag)
144 .ok_or_else(|| Error::UnknownTag {
145 service: service.clone(),
146 tag: tag.clone(),
147 })
148 .map(|tag_state| {
149 let mut state = tag_state.state.lock();
150 match *state {
151 TagFetchState::ReadyAfter(wait_until) => {
152 if wait_until <= now {
153 now_tags.push(tag);
155 *state = TagFetchState::ReadyAfter(now + tag_state.backoff);
156 } else {
157 schedule
159 .entry(wait_until)
160 .or_default()
161 .push((service.clone(), tag));
162 *state = TagFetchState::Scheduled;
163 }
164 }
165 TagFetchState::Scheduled => {
166 }
168 }
169 })
170 })
171 .collect();
172 (response, now_tags)
173 };
174
175 if !now_tags.is_empty() {
176 let service_state = self.service_state.clone();
177 let service = service.clone();
178 self.scope.spawn(async move {
179 let pending = [(&service, &now_tags)];
180 if let Err(e) = fetcher::fetch_and_save(&service_state, pending).await {
181 error!("Failed to fetch inspect: {e:?}");
182 }
183 });
184 }
185
186 self.schedule_next_batch();
187
188 response
189 }
190
191 fn schedule_next_batch(&self) {
193 let Some(next_fetch) = self.fetch_schedule.lock().first_entry().map(|e| *e.key()) else {
194 return;
196 };
197
198 let schedule = self.fetch_schedule.clone();
200 let service_state = self.service_state.clone();
201 self.scope.spawn(async move {
202 let wait = next_fetch - zx::MonotonicInstant::get();
203 fasync::Timer::new(wait).await;
204
205 let pending = {
207 let mut pending: HashMap<ServiceName, Vec<Tag>> = HashMap::new();
208 let mut schedule = schedule.lock();
209 let now = zx::MonotonicInstant::get();
210 while let Some(entry) = schedule.first_entry() {
211 if *entry.key() > now {
212 break;
213 }
214 for (service, tag) in entry.remove().into_iter() {
215 let TagState { state, backoff, .. } = service_state
216 .get(&service)
217 .expect("Missing service")
219 .get(&tag)
220 .expect("Missing tag");
222 *state.lock() = TagFetchState::ReadyAfter(now + *backoff);
223 pending.entry(service).or_default().push(tag);
224 }
225 }
226 pending
227 };
228
229 if pending.is_empty() {
230 return;
231 }
232
233 if let Err(e) = fetcher::fetch_and_save(&service_state, &pending).await {
234 error!("Failed to fetch pending tags from Inspect: {e:?}");
235 }
236 });
237 }
238}
239
240enum TagFetchState {
242 ReadyAfter(zx::MonotonicInstant),
244 Scheduled,
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 #[fuchsia::test]
253 fn test_selector_stripping() {
254 assert_eq!(
255 ["INSPECT:foo".to_string(), "oops:bar".to_string(), "INSPECT:baz".to_string()]
256 .iter()
257 .filter_map(strip_inspect_prefix)
258 .collect::<Vec<_>>(),
259 ["foo".to_string(), "baz".to_string()]
260 )
261 }
262}