diagnostics/task_metrics/
measurement.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::task_metrics::constants::*;
6use core::cmp::Reverse;
7use fuchsia_inspect::{self as inspect, ArrayProperty};
8
9use injectable_time::TimeSource;
10use std::cmp::{max, Eq, Ord, PartialEq, PartialOrd};
11use std::collections::BinaryHeap;
12use std::ops::{AddAssign, SubAssign};
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Default, PartialOrd, Eq, Ord, PartialEq)]
16pub struct Measurement {
17    timestamp: zx::BootInstant,
18    cpu_time: zx::MonotonicDuration,
19    queue_time: zx::MonotonicDuration,
20}
21
22impl Measurement {
23    pub fn empty(timestamp: zx::BootInstant) -> Self {
24        Self {
25            timestamp,
26            cpu_time: zx::MonotonicDuration::from_nanos(0),
27            queue_time: zx::MonotonicDuration::from_nanos(0),
28        }
29    }
30
31    pub fn clone_with_time(m: &Self, timestamp: zx::BootInstant) -> Self {
32        Self { timestamp, cpu_time: *m.cpu_time(), queue_time: *m.queue_time() }
33    }
34
35    /// The measured cpu time.
36    pub fn cpu_time(&self) -> &zx::MonotonicDuration {
37        &self.cpu_time
38    }
39
40    /// The measured queue time.
41    pub fn queue_time(&self) -> &zx::MonotonicDuration {
42        &self.queue_time
43    }
44
45    /// Time when the measurement was taken.
46    pub fn timestamp(&self) -> &zx::BootInstant {
47        &self.timestamp
48    }
49
50    fn can_merge(&self, other: &Self) -> bool {
51        u128::from(self.timestamp().into_nanos().abs_diff(other.timestamp().into_nanos()))
52            <= MEASUREMENT_EPSILON.as_nanos()
53    }
54}
55
56impl AddAssign<&Measurement> for Measurement {
57    fn add_assign(&mut self, other: &Measurement) {
58        self.cpu_time += other.cpu_time;
59        self.queue_time += other.queue_time;
60    }
61}
62
63impl SubAssign<&Measurement> for Measurement {
64    fn sub_assign(&mut self, other: &Measurement) {
65        self.cpu_time -= other.cpu_time;
66        self.queue_time -= other.queue_time;
67    }
68}
69
70impl From<zx::TaskRuntimeInfo> for Measurement {
71    fn from(info: zx::TaskRuntimeInfo) -> Self {
72        Measurement::from_runtime_info(info, zx::BootInstant::get())
73    }
74}
75
76impl Measurement {
77    pub(crate) fn from_runtime_info(info: zx::TaskRuntimeInfo, timestamp: zx::BootInstant) -> Self {
78        Self {
79            timestamp,
80            cpu_time: zx::MonotonicDuration::from_nanos(info.cpu_time),
81            queue_time: zx::MonotonicDuration::from_nanos(info.queue_time),
82        }
83    }
84}
85
86#[derive(Debug)]
87enum MostRecentMeasurement {
88    Init,
89    Measurement(Measurement),
90    PostInvalidationMeasurement,
91}
92
93impl MostRecentMeasurement {
94    fn update(&mut self, incoming: Option<Measurement>) {
95        let this = std::mem::replace(self, Self::Init);
96        *self = match (this, incoming) {
97            (Self::Init, Some(m)) => Self::Measurement(m),
98            (_, None) => Self::PostInvalidationMeasurement,
99            (Self::Measurement(m1), Some(m2)) => Self::Measurement(max(m1, m2)),
100            (Self::PostInvalidationMeasurement, _) => Self::PostInvalidationMeasurement,
101        }
102    }
103
104    fn combine(&mut self, incoming: Self) {
105        let this = std::mem::replace(self, Self::Init);
106        *self = match (this, incoming) {
107            (Self::Init, other)
108            | (Self::PostInvalidationMeasurement, other)
109            | (other, Self::PostInvalidationMeasurement)
110            | (other, Self::Init) => other,
111            (Self::Measurement(m1), Self::Measurement(m2)) => Self::Measurement(max(m1, m2)),
112        }
113    }
114}
115
116/// MeasurementsQueue is a priority queue with a maximum size. It guarantees that there will be
117/// at most `max_measurements` true measurements and post invalidation measurements.
118///
119/// A "true" measurement is an instance of `Measurement`. A post invalidation measurement is a
120/// counter incremented by `MeasurementsQueue::post_invalidation_insertion`, tracking how many
121/// measurements would have been taken if the owning task wasn't invalid. The goal is to keep
122/// a record of measurements for `max_measurements` minutes.
123///
124/// The queue is prioritized by `Measurement`'s `Ord` impl such that the oldest
125/// measurements are dropped first when `max_period` is exceeded. No two measurements should have
126/// the same timestamp.
127#[derive(Debug)]
128pub struct MeasurementsQueue {
129    values: BinaryHeap<Reverse<Measurement>>,
130    // outer option refers to initialization
131    most_recent_measurement: MostRecentMeasurement,
132    ts: Arc<dyn TimeSource + Send + Sync>,
133    max_period: zx::BootDuration,
134    max_measurements: usize,
135}
136
137/// Merge two queues together.
138///
139/// `AddAssign` sets `self.post_invalidation_measurements` to the minimum
140/// of the values of the two queues.
141impl AddAssign<Self> for MeasurementsQueue {
142    fn add_assign(&mut self, other: Self) {
143        // collect the measurements into an owning vector, arbitrarily ordered
144        let mut rhs_values = other.values.into_vec();
145        let mut new_heap = BinaryHeap::new();
146
147        while let Some(Reverse(mut lhs)) = self.values.pop() {
148            rhs_values = rhs_values
149                .into_iter()
150                .filter_map(|Reverse(rhs)| {
151                    if lhs.can_merge(&rhs) {
152                        lhs += &rhs;
153                        None
154                    } else {
155                        Some(Reverse(rhs))
156                    }
157                })
158                .collect();
159
160            new_heap.push(Reverse(lhs));
161        }
162
163        for leftover in rhs_values {
164            new_heap.push(leftover);
165        }
166
167        self.values = new_heap;
168        self.most_recent_measurement.combine(other.most_recent_measurement);
169        self.clean_stale();
170    }
171}
172
173impl MeasurementsQueue {
174    pub fn new(max_measurements: usize, ts: Arc<dyn TimeSource + Send + Sync>) -> Self {
175        Self {
176            values: BinaryHeap::new(),
177            most_recent_measurement: MostRecentMeasurement::Init,
178            ts,
179            max_period: (CPU_SAMPLE_PERIOD * max_measurements as u32).into(),
180            max_measurements,
181        }
182    }
183
184    /// Insert a new measurement into the priority queue.
185    /// Measurements must have distinct timestamps.
186    pub fn insert(&mut self, measurement: Measurement) {
187        self.insert_internal(Some(measurement));
188    }
189
190    /// Insert a false measurement, typically after the invalidation of a task.
191    pub fn insert_post_invalidation(&mut self) {
192        self.insert_internal(None);
193    }
194
195    fn insert_internal(&mut self, measurement_wrapper: Option<Measurement>) {
196        self.most_recent_measurement.update(measurement_wrapper.clone());
197
198        if let Some(measurement) = measurement_wrapper {
199            self.values.push(Reverse(measurement));
200        }
201
202        self.clean_stale();
203    }
204
205    fn clean_stale(&mut self) {
206        let now = zx::BootInstant::from_nanos(self.ts.now());
207        while let Some(Reverse(oldest)) = self.values.peek() {
208            if (*oldest.timestamp() > now - self.max_period)
209                && self.values.len() <= self.max_measurements
210            {
211                return;
212            }
213
214            self.values.pop();
215        }
216    }
217
218    #[cfg(test)]
219    pub fn true_measurement_count(&self) -> usize {
220        self.values.len()
221    }
222
223    /// Sorted from newest to oldest:
224    /// Index: Timestamp
225    /// 0: t + N
226    /// 1: t+ (N-1)
227    /// 2: t+ (N-2)
228    /// ...
229    /// N: t
230    pub fn iter_sorted(&self) -> impl DoubleEndedIterator<Item = Measurement> {
231        self.values.clone().into_sorted_vec().into_iter().map(|Reverse(v)| v).into_iter()
232    }
233
234    /// Checks whether or not there are any true measurements. This says nothing
235    /// about the number of post invalidation measurements.
236    pub fn no_true_measurements(&self) -> bool {
237        self.values.is_empty()
238    }
239
240    /// Access the youngest true Measurement in the queue.
241    /// Returns `None` if there are `post_invalidation_measurements`, or if there
242    /// are no true measurements.
243    pub fn most_recent_measurement(&self) -> Option<&'_ Measurement> {
244        match self.most_recent_measurement {
245            MostRecentMeasurement::Init | MostRecentMeasurement::PostInvalidationMeasurement => {
246                None
247            }
248            MostRecentMeasurement::Measurement(ref v) => Some(v),
249        }
250    }
251
252    pub fn record_to_node(&self, node: &inspect::Node) {
253        // gather measurements ordered oldest -> newest
254        let count = self.values.len();
255        let timestamps = node.create_int_array(TIMESTAMPS, count);
256        let cpu_times = node.create_int_array(CPU_TIMES, count);
257        let queue_times = node.create_int_array(QUEUE_TIMES, count);
258        for (i, measurement) in self.iter_sorted().rev().enumerate() {
259            timestamps.set(i, measurement.timestamp.into_nanos());
260            cpu_times.set(i, measurement.cpu_time.into_nanos());
261            queue_times.set(i, measurement.queue_time.into_nanos());
262        }
263        node.record(timestamps);
264        node.record(cpu_times);
265        node.record(queue_times);
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use injectable_time::FakeTime;
273    use std::time::Duration;
274    use zx::{BootInstant, MonotonicDuration};
275
276    fn insert_default(q: &mut MeasurementsQueue, clock: &FakeTime) {
277        q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
278        clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
279    }
280
281    fn insert_measurement(q: &mut MeasurementsQueue, clock: &FakeTime, value: Measurement) {
282        q.insert(value);
283        clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
284    }
285
286    #[fuchsia::test]
287    fn insert_to_measurements_queue() {
288        let clock = FakeTime::new();
289        let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
290        q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
291        clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
292
293        assert_eq!(1, q.true_measurement_count());
294
295        for _ in 0..COMPONENT_CPU_MAX_SAMPLES * 2 {
296            q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
297            clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
298        }
299
300        assert_eq!(COMPONENT_CPU_MAX_SAMPLES, q.true_measurement_count());
301    }
302
303    #[fuchsia::test]
304    fn test_back() {
305        let clock = FakeTime::new();
306        let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
307
308        insert_default(&mut q, &clock);
309        insert_default(&mut q, &clock);
310        insert_default(&mut q, &clock);
311        insert_default(&mut q, &clock);
312
313        let now = clock.now();
314        insert_default(&mut q, &clock);
315
316        assert_eq!(now, q.most_recent_measurement().unwrap().timestamp().into_nanos());
317
318        q.insert_post_invalidation();
319
320        assert!(q.most_recent_measurement().is_none());
321    }
322
323    #[fuchsia::test]
324    fn post_invalidation_pushes_true_measurements_out() {
325        let clock = FakeTime::new();
326        let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
327
328        assert!(q.no_true_measurements());
329        assert!(q.most_recent_measurement().is_none());
330        assert_eq!(0, q.true_measurement_count());
331
332        for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
333            insert_default(&mut q, &clock);
334        }
335
336        assert!(!q.no_true_measurements());
337        assert!(q.most_recent_measurement().is_some());
338        assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
339
340        for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
341            q.insert_post_invalidation();
342            clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
343        }
344
345        assert!(!q.no_true_measurements());
346        assert!(q.most_recent_measurement().is_none());
347        assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
348
349        for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
350            q.insert_post_invalidation();
351            clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
352        }
353
354        assert!(q.no_true_measurements());
355        assert!(q.most_recent_measurement().is_none());
356        assert_eq!(0, q.true_measurement_count());
357    }
358
359    #[fuchsia::test]
360    fn add_assign() {
361        // the two queues are shifted apart by two seconds
362        let clock1 = FakeTime::new();
363        let clock2 = FakeTime::new();
364        clock2.set_ticks(Duration::from_secs(2).as_nanos() as i64);
365
366        let mut q1 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock1.clone()));
367        let mut q2 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock2.clone()));
368
369        let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
370        let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
371        m1.cpu_time = Duration::from_secs(1).into();
372        m2.cpu_time = Duration::from_secs(3).into();
373        insert_measurement(&mut q1, &clock1, m1);
374        insert_measurement(&mut q2, &clock2, m2);
375
376        for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
377            let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
378            let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
379            m1.cpu_time = Duration::from_secs(1).into();
380            m2.cpu_time = Duration::from_secs(3).into();
381            insert_measurement(&mut q1, &clock1, m1);
382            insert_measurement(&mut q2, &clock2, m2);
383        }
384
385        q1 += q2;
386
387        let expected: MonotonicDuration = Duration::from_secs(4).into();
388        for m in q1.iter_sorted() {
389            assert_eq!(&expected, m.cpu_time());
390        }
391    }
392
393    #[fuchsia::test]
394    fn add_assign_missing_matches() {
395        let clock1 = FakeTime::new();
396        let clock2 = FakeTime::new();
397        clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
398
399        let max_values = 5;
400
401        let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
402        let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
403
404        let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
405        let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
406        m1.cpu_time = Duration::from_secs(1).into();
407        m2.cpu_time = Duration::from_secs(3).into();
408        insert_measurement(&mut q1, &clock1, m1);
409        insert_measurement(&mut q2, &clock2, m2);
410
411        for _ in 1..max_values {
412            let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
413            let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
414            m1.cpu_time = Duration::from_secs(1).into();
415            m2.cpu_time = Duration::from_secs(3).into();
416            insert_measurement(&mut q1, &clock1, m1);
417            insert_measurement(&mut q2, &clock2, m2);
418        }
419
420        // t   q1  q2
421        // -------------
422        // 0   1
423        // -------------
424        // 60  1
425        // -------------
426        // 120 1
427        // 125     3
428        // -------------
429        // 180 1
430        // 185     3
431        // -------------
432        // 240 1
433        // 245     3
434        // -------------
435        // 300
436        // 305     3
437        // -------------
438        // 360
439        // 365     3
440        // -------------
441        // 420
442        // 425
443
444        // the merged clock needs to have the largest time; in this case,
445        // it's known that queue_clock2 is "more recent"
446        clock1.set_ticks(clock2.now());
447        q1 += q2;
448
449        let sorted = q1.values.into_sorted_vec();
450        let actual = sorted.iter().map(|Reverse(m)| m).collect::<Vec<_>>();
451
452        let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
453        assert_eq!(&d(3), actual[0].cpu_time());
454        assert_eq!(&d(3), actual[1].cpu_time());
455        assert_eq!(&d(4), actual[2].cpu_time());
456        assert_eq!(&d(4), actual[3].cpu_time());
457        assert_eq!(max_values - 1, actual.len());
458    }
459
460    #[fuchsia::test]
461    fn add_assign_post_invalidation() {
462        let clock1 = FakeTime::new();
463        let clock2 = FakeTime::new();
464        clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
465
466        let max_values = 5;
467
468        let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
469        let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
470
471        for _ in 0..max_values {
472            let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
473            let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
474            m1.cpu_time = Duration::from_secs(1).into();
475            m2.cpu_time = Duration::from_secs(3).into();
476            insert_measurement(&mut q1, &clock1, m1);
477            insert_measurement(&mut q2, &clock2, m2);
478        }
479
480        q1.insert_post_invalidation();
481        q2.insert_post_invalidation();
482
483        clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
484        q1.insert_post_invalidation();
485
486        clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
487        q1.insert_post_invalidation();
488
489        // t   q1  q2
490        // -------------
491        // 60  1
492        // -------------
493        // 120 1
494        // -------------
495        // 180 1
496        // 185     3
497        // -------------
498        // 240 1
499        // 245     3
500        // -------------
501        // 300 1
502        // 305     3
503        // -------------
504        // 360 p
505        // 365     3
506        // -------------
507        // 420 p
508        // 425     3
509        // -------------
510        // 480 p
511        // 485     p
512        // -------------
513
514        q1 += q2;
515
516        let sorted = q1.values.into_sorted_vec();
517        let actual = sorted.into_iter().map(|Reverse(m)| m).collect::<Vec<_>>();
518
519        let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
520        assert_eq!(&d(3), actual[0].cpu_time());
521        assert_eq!(&d(3), actual[1].cpu_time());
522        assert_eq!(&d(4), actual[2].cpu_time());
523        assert_eq!(&d(4), actual[2].cpu_time());
524        assert_eq!(4, actual.len());
525    }
526
527    #[fuchsia::test]
528    fn size_limited_to_max_no_matter_duration() {
529        let max_values = 20;
530        let mut q = MeasurementsQueue::new(max_values, Arc::new(FakeTime::new()));
531
532        for _ in 0..(max_values + 100) {
533            q.insert(Measurement::empty(BootInstant::get()));
534        }
535
536        assert_eq!(max_values, q.true_measurement_count());
537    }
538}