diagnostics/task_metrics/
measurement.rs1use crate::task_metrics::constants::*;
6use fuchsia_inspect::{self as inspect, ArrayProperty};
7
8use injectable_time::TimeSource;
9use std::cmp::{Eq, Ord, PartialEq, PartialOrd, max};
10use std::collections::VecDeque;
11use std::ops::{AddAssign, SubAssign};
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Default, PartialOrd, Eq, Ord, PartialEq)]
15pub struct Measurement {
16 timestamp: zx::BootInstant,
17 cpu_time: zx::MonotonicDuration,
18 queue_time: zx::MonotonicDuration,
19}
20
21impl Measurement {
22 pub fn empty(timestamp: zx::BootInstant) -> Self {
23 Self {
24 timestamp,
25 cpu_time: zx::MonotonicDuration::from_nanos(0),
26 queue_time: zx::MonotonicDuration::from_nanos(0),
27 }
28 }
29
30 pub fn clone_with_time(m: &Self, timestamp: zx::BootInstant) -> Self {
31 Self { timestamp, cpu_time: *m.cpu_time(), queue_time: *m.queue_time() }
32 }
33
34 pub fn cpu_time(&self) -> &zx::MonotonicDuration {
36 &self.cpu_time
37 }
38
39 pub fn queue_time(&self) -> &zx::MonotonicDuration {
41 &self.queue_time
42 }
43
44 pub fn timestamp(&self) -> &zx::BootInstant {
46 &self.timestamp
47 }
48
49 fn can_merge(&self, other: &Self) -> bool {
50 u128::from(self.timestamp().into_nanos().abs_diff(other.timestamp().into_nanos()))
51 <= MEASUREMENT_EPSILON.as_nanos()
52 }
53}
54
55impl AddAssign<&Measurement> for Measurement {
56 fn add_assign(&mut self, other: &Measurement) {
57 self.cpu_time += other.cpu_time;
58 self.queue_time += other.queue_time;
59 }
60}
61
62impl SubAssign<&Measurement> for Measurement {
63 fn sub_assign(&mut self, other: &Measurement) {
64 self.cpu_time -= other.cpu_time;
65 self.queue_time -= other.queue_time;
66 }
67}
68
69impl From<zx::TaskRuntimeInfo> for Measurement {
70 fn from(info: zx::TaskRuntimeInfo) -> Self {
71 Measurement::from_runtime_info(info, zx::BootInstant::get())
72 }
73}
74
75impl Measurement {
76 pub(crate) fn from_runtime_info(info: zx::TaskRuntimeInfo, timestamp: zx::BootInstant) -> Self {
77 Self {
78 timestamp,
79 cpu_time: zx::MonotonicDuration::from_nanos(info.cpu_time),
80 queue_time: zx::MonotonicDuration::from_nanos(info.queue_time),
81 }
82 }
83}
84
85#[derive(Debug)]
86enum MostRecentMeasurement {
87 Init,
88 Measurement(Measurement),
89 PostInvalidationMeasurement,
90}
91
92impl MostRecentMeasurement {
93 fn update(&mut self, incoming: Option<Measurement>) {
94 let this = std::mem::replace(self, Self::Init);
95 *self = match (this, incoming) {
96 (Self::Init, Some(m)) => Self::Measurement(m),
97 (_, None) => Self::PostInvalidationMeasurement,
98 (Self::Measurement(m1), Some(m2)) => Self::Measurement(max(m1, m2)),
99 (Self::PostInvalidationMeasurement, _) => Self::PostInvalidationMeasurement,
100 }
101 }
102
103 fn combine(&mut self, incoming: Self) {
104 let this = std::mem::replace(self, Self::Init);
105 *self = match (this, incoming) {
106 (Self::Measurement(m1), Self::Measurement(m2)) => Self::Measurement(max(m1, m2)),
107 (Self::Measurement(m), _) | (_, Self::Measurement(m)) => Self::Measurement(m),
108 (Self::PostInvalidationMeasurement, _) | (_, Self::PostInvalidationMeasurement) => {
109 Self::PostInvalidationMeasurement
110 }
111 _ => Self::Init,
112 }
113 }
114}
115
116impl AddAssign<Self> for MeasurementsQueue {
121 fn add_assign(&mut self, mut other: Self) {
122 let mut merged = VecDeque::with_capacity(self.max_measurements);
124
125 while let (Some(lhs), Some(rhs)) = (self.values.front(), other.values.front()) {
128 if lhs.can_merge(rhs) {
129 let mut l = self.values.pop_front().unwrap();
131 let r = other.values.pop_front().unwrap();
132 l += &r;
133 merged.push_back(l);
134 } else if lhs.timestamp < rhs.timestamp {
135 merged.push_back(self.values.pop_front().unwrap());
137 } else {
138 merged.push_back(other.values.pop_front().unwrap());
140 }
141 }
142
143 merged.extend(self.values.drain(..));
146 merged.extend(other.values.drain(..));
147
148 self.values = merged;
149 self.most_recent_measurement.combine(other.most_recent_measurement);
150 self.clean_stale();
151 }
152}
153
154#[derive(Debug)]
168pub struct MeasurementsQueue {
169 values: VecDeque<Measurement>,
170 most_recent_measurement: MostRecentMeasurement,
171 ts: Arc<dyn TimeSource + Send + Sync>,
172 max_period: zx::BootDuration,
173 max_measurements: usize,
174}
175
176impl MeasurementsQueue {
177 pub fn new(max_measurements: usize, ts: Arc<dyn TimeSource + Send + Sync>) -> Self {
178 Self {
179 values: VecDeque::new(),
180 most_recent_measurement: MostRecentMeasurement::Init,
181 ts,
182 max_period: (CPU_SAMPLE_PERIOD * max_measurements as u32).into(),
183 max_measurements,
184 }
185 }
186
187 pub fn insert(&mut self, measurement: Measurement) {
188 self.insert_internal(Some(measurement));
189 }
190
191 pub fn insert_post_invalidation(&mut self) {
193 self.insert_internal(None);
194 }
195
196 pub fn no_true_measurements(&self) -> bool {
199 self.values.is_empty()
200 }
201
202 pub fn most_recent_measurement(&self) -> Option<&'_ Measurement> {
206 match self.most_recent_measurement {
207 MostRecentMeasurement::Init | MostRecentMeasurement::PostInvalidationMeasurement => {
208 None
209 }
210 MostRecentMeasurement::Measurement(ref v) => Some(v),
211 }
212 }
213
214 #[cfg(test)]
215 pub fn true_measurement_count(&self) -> usize {
216 self.values.len()
217 }
218
219 fn insert_internal(&mut self, measurement_wrapper: Option<Measurement>) {
220 self.most_recent_measurement.update(measurement_wrapper.clone());
221
222 if let Some(measurement) = measurement_wrapper {
223 self.values.push_back(measurement);
224 }
225
226 self.clean_stale();
227 }
228
229 fn clean_stale(&mut self) {
230 let now = zx::BootInstant::from_nanos(self.ts.now());
231
232 while let Some(oldest) = self.values.front() {
233 if (*oldest.timestamp() > now - self.max_period)
234 && self.values.len() <= self.max_measurements
235 {
236 return;
237 }
238 self.values.pop_front();
239 }
240 }
241
242 pub fn iter_sorted(&self) -> impl DoubleEndedIterator<Item = &Measurement> {
243 self.values.iter().rev()
244 }
245
246 pub fn record_to_node(&self, node: &inspect::Node) {
247 let count = self.values.len();
248 let timestamps = node.create_int_array(TIMESTAMPS, count);
249 let cpu_times = node.create_int_array(CPU_TIMES, count);
250 let queue_times = node.create_int_array(QUEUE_TIMES, count);
251
252 for (i, measurement) in self.iter_sorted().rev().enumerate() {
253 timestamps.set(i, measurement.timestamp.into_nanos());
254 cpu_times.set(i, measurement.cpu_time.into_nanos());
255 queue_times.set(i, measurement.queue_time.into_nanos());
256 }
257 node.record(timestamps);
258 node.record(cpu_times);
259 node.record(queue_times);
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use injectable_time::FakeTime;
267 use std::time::Duration;
268 use zx::{BootInstant, MonotonicDuration};
269
270 fn insert_default(q: &mut MeasurementsQueue, clock: &FakeTime) {
271 q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
272 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
273 }
274
275 fn insert_measurement(q: &mut MeasurementsQueue, clock: &FakeTime, value: Measurement) {
276 q.insert(value);
277 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
278 }
279
280 #[fuchsia::test]
281 fn insert_to_measurements_queue() {
282 let clock = FakeTime::new();
283 let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
284 q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
285 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
286
287 assert_eq!(1, q.true_measurement_count());
288
289 for _ in 0..COMPONENT_CPU_MAX_SAMPLES * 2 {
290 q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
291 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
292 }
293
294 assert_eq!(COMPONENT_CPU_MAX_SAMPLES, q.true_measurement_count());
295 }
296
297 #[fuchsia::test]
298 fn test_back() {
299 let clock = FakeTime::new();
300 let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
301
302 insert_default(&mut q, &clock);
303 insert_default(&mut q, &clock);
304 insert_default(&mut q, &clock);
305 insert_default(&mut q, &clock);
306
307 let now = clock.now();
308 insert_default(&mut q, &clock);
309
310 assert_eq!(now, q.most_recent_measurement().unwrap().timestamp().into_nanos());
311
312 q.insert_post_invalidation();
313
314 assert!(q.most_recent_measurement().is_none());
315 }
316
317 #[fuchsia::test]
318 fn post_invalidation_pushes_true_measurements_out() {
319 let clock = FakeTime::new();
320 let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
321
322 assert!(q.no_true_measurements());
323 assert!(q.most_recent_measurement().is_none());
324 assert_eq!(0, q.true_measurement_count());
325
326 for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
327 insert_default(&mut q, &clock);
328 }
329
330 assert!(!q.no_true_measurements());
331 assert!(q.most_recent_measurement().is_some());
332 assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
333
334 for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
335 q.insert_post_invalidation();
336 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
337 }
338
339 assert!(!q.no_true_measurements());
340 assert!(q.most_recent_measurement().is_none());
341 assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
342
343 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
344 q.insert_post_invalidation();
345 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
346 }
347
348 assert!(q.no_true_measurements());
349 assert!(q.most_recent_measurement().is_none());
350 assert_eq!(0, q.true_measurement_count());
351 }
352
353 #[fuchsia::test]
354 fn add_assign() {
355 let clock1 = FakeTime::new();
357 let clock2 = FakeTime::new();
358 clock2.set_ticks(Duration::from_secs(2).as_nanos() as i64);
359
360 let mut q1 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock1.clone()));
361 let mut q2 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock2.clone()));
362
363 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
364 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
365 m1.cpu_time = Duration::from_secs(1).into();
366 m2.cpu_time = Duration::from_secs(3).into();
367 insert_measurement(&mut q1, &clock1, m1);
368 insert_measurement(&mut q2, &clock2, m2);
369
370 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
371 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
372 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
373 m1.cpu_time = Duration::from_secs(1).into();
374 m2.cpu_time = Duration::from_secs(3).into();
375 insert_measurement(&mut q1, &clock1, m1);
376 insert_measurement(&mut q2, &clock2, m2);
377 }
378
379 q1 += q2;
380
381 let expected: MonotonicDuration = Duration::from_secs(4).into();
382 for m in q1.iter_sorted() {
383 assert_eq!(&expected, m.cpu_time());
384 }
385 }
386
387 #[fuchsia::test]
388 fn add_assign_missing_matches() {
389 let clock1 = FakeTime::new();
390 let clock2 = FakeTime::new();
391 clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
392
393 let max_values = 5;
394
395 let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
396 let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
397
398 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
399 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
400 m1.cpu_time = Duration::from_secs(1).into();
401 m2.cpu_time = Duration::from_secs(3).into();
402 insert_measurement(&mut q1, &clock1, m1);
403 insert_measurement(&mut q2, &clock2, m2);
404
405 for _ in 1..max_values {
406 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
407 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
408 m1.cpu_time = Duration::from_secs(1).into();
409 m2.cpu_time = Duration::from_secs(3).into();
410 insert_measurement(&mut q1, &clock1, m1);
411 insert_measurement(&mut q2, &clock2, m2);
412 }
413
414 clock1.set_ticks(clock2.now());
441 q1 += q2;
442
443 let actual: Vec<&Measurement> = q1.iter_sorted().collect();
444
445 let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
446 assert_eq!(&d(3), actual[0].cpu_time());
447 assert_eq!(&d(3), actual[1].cpu_time());
448 assert_eq!(&d(4), actual[2].cpu_time());
449 assert_eq!(&d(4), actual[3].cpu_time());
450 assert_eq!(max_values - 1, actual.len());
451 }
452
453 #[fuchsia::test]
454 fn add_assign_post_invalidation() {
455 let clock1 = FakeTime::new();
456 let clock2 = FakeTime::new();
457 clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
458
459 let max_values = 5;
460
461 let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
462 let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
463
464 for _ in 0..max_values {
465 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
466 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
467 m1.cpu_time = Duration::from_secs(1).into();
468 m2.cpu_time = Duration::from_secs(3).into();
469 insert_measurement(&mut q1, &clock1, m1);
470 insert_measurement(&mut q2, &clock2, m2);
471 }
472
473 q1.insert_post_invalidation();
474 q2.insert_post_invalidation();
475
476 clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
477 q1.insert_post_invalidation();
478
479 clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
480 q1.insert_post_invalidation();
481
482 q1 += q2;
508
509 let actual: Vec<&Measurement> = q1.iter_sorted().collect();
510
511 let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
512 assert_eq!(&d(3), actual[0].cpu_time());
513 assert_eq!(&d(3), actual[1].cpu_time());
514 assert_eq!(&d(4), actual[2].cpu_time());
515 assert_eq!(&d(4), actual[2].cpu_time());
516 assert_eq!(4, actual.len());
517 }
518
519 #[fuchsia::test]
520 fn size_limited_to_max_no_matter_duration() {
521 let max_values = 20;
522 let mut q = MeasurementsQueue::new(max_values, Arc::new(FakeTime::new()));
523
524 for _ in 0..(max_values + 100) {
525 q.insert(Measurement::empty(BootInstant::get()));
526 }
527
528 assert_eq!(max_values, q.true_measurement_count());
529 }
530}