Skip to main content

diagnostics/task_metrics/
measurement.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::task_metrics::constants::*;
6use fuchsia_inspect::{self as inspect, ArrayProperty};
7
8use injectable_time::TimeSource;
9use std::cmp::{Eq, Ord, PartialEq, PartialOrd, max};
10use std::collections::VecDeque;
11use std::ops::{AddAssign, SubAssign};
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Default, PartialOrd, Eq, Ord, PartialEq)]
15pub struct Measurement {
16    timestamp: zx::BootInstant,
17    cpu_time: zx::MonotonicDuration,
18    queue_time: zx::MonotonicDuration,
19}
20
21impl Measurement {
22    pub fn empty(timestamp: zx::BootInstant) -> Self {
23        Self {
24            timestamp,
25            cpu_time: zx::MonotonicDuration::from_nanos(0),
26            queue_time: zx::MonotonicDuration::from_nanos(0),
27        }
28    }
29
30    pub fn clone_with_time(m: &Self, timestamp: zx::BootInstant) -> Self {
31        Self { timestamp, cpu_time: *m.cpu_time(), queue_time: *m.queue_time() }
32    }
33
34    /// The measured cpu time.
35    pub fn cpu_time(&self) -> &zx::MonotonicDuration {
36        &self.cpu_time
37    }
38
39    /// The measured queue time.
40    pub fn queue_time(&self) -> &zx::MonotonicDuration {
41        &self.queue_time
42    }
43
44    /// Time when the measurement was taken.
45    pub fn timestamp(&self) -> &zx::BootInstant {
46        &self.timestamp
47    }
48
49    fn can_merge(&self, other: &Self) -> bool {
50        u128::from(self.timestamp().into_nanos().abs_diff(other.timestamp().into_nanos()))
51            <= MEASUREMENT_EPSILON.as_nanos()
52    }
53}
54
55impl AddAssign<&Measurement> for Measurement {
56    fn add_assign(&mut self, other: &Measurement) {
57        self.cpu_time += other.cpu_time;
58        self.queue_time += other.queue_time;
59    }
60}
61
62impl SubAssign<&Measurement> for Measurement {
63    fn sub_assign(&mut self, other: &Measurement) {
64        self.cpu_time -= other.cpu_time;
65        self.queue_time -= other.queue_time;
66    }
67}
68
69impl From<zx::TaskRuntimeInfo> for Measurement {
70    fn from(info: zx::TaskRuntimeInfo) -> Self {
71        Measurement::from_runtime_info(info, zx::BootInstant::get())
72    }
73}
74
75impl Measurement {
76    pub(crate) fn from_runtime_info(info: zx::TaskRuntimeInfo, timestamp: zx::BootInstant) -> Self {
77        Self {
78            timestamp,
79            cpu_time: zx::MonotonicDuration::from_nanos(info.cpu_time),
80            queue_time: zx::MonotonicDuration::from_nanos(info.queue_time),
81        }
82    }
83}
84
85#[derive(Debug)]
86enum MostRecentMeasurement {
87    Init,
88    Measurement(Measurement),
89    PostInvalidationMeasurement,
90}
91
92impl MostRecentMeasurement {
93    fn update(&mut self, incoming: Option<Measurement>) {
94        let this = std::mem::replace(self, Self::Init);
95        *self = match (this, incoming) {
96            (Self::Init, Some(m)) => Self::Measurement(m),
97            (_, None) => Self::PostInvalidationMeasurement,
98            (Self::Measurement(m1), Some(m2)) => Self::Measurement(max(m1, m2)),
99            (Self::PostInvalidationMeasurement, _) => Self::PostInvalidationMeasurement,
100        }
101    }
102
103    fn combine(&mut self, incoming: Self) {
104        let this = std::mem::replace(self, Self::Init);
105        *self = match (this, incoming) {
106            (Self::Measurement(m1), Self::Measurement(m2)) => Self::Measurement(max(m1, m2)),
107            (Self::Measurement(m), _) | (_, Self::Measurement(m)) => Self::Measurement(m),
108            (Self::PostInvalidationMeasurement, _) | (_, Self::PostInvalidationMeasurement) => {
109                Self::PostInvalidationMeasurement
110            }
111            _ => Self::Init,
112        }
113    }
114}
115
116/// Merge two queues together.
117///
118/// `AddAssign` sets `self.post_invalidation_measurements` to the minimum
119/// of the values of the two queues.
120impl AddAssign<Self> for MeasurementsQueue {
121    fn add_assign(&mut self, mut other: Self) {
122        // Pre-allocate our new merged queue
123        let mut merged = VecDeque::with_capacity(self.max_measurements);
124
125        // Because both deques are sorted oldest-to-newest, we just peek
126        // at the front of both and pull whichever is older!
127        while let (Some(lhs), Some(rhs)) = (self.values.front(), other.values.front()) {
128            if lhs.can_merge(rhs) {
129                // They match! Pop both, merge them, and push to the new queue.
130                let mut l = self.values.pop_front().unwrap();
131                let r = other.values.pop_front().unwrap();
132                l += &r;
133                merged.push_back(l);
134            } else if lhs.timestamp < rhs.timestamp {
135                // Left is older, pop it first
136                merged.push_back(self.values.pop_front().unwrap());
137            } else {
138                // Right is older, pop it first
139                merged.push_back(other.values.pop_front().unwrap());
140            }
141        }
142
143        // Once one of the queues is empty, just append everything left in the other.
144        // (Only one of these extends will actually do anything)
145        merged.extend(self.values.drain(..));
146        merged.extend(other.values.drain(..));
147
148        self.values = merged;
149        self.most_recent_measurement.combine(other.most_recent_measurement);
150        self.clean_stale();
151    }
152}
153
154/// `MeasurementsQueue` is a chronological ring buffer (sliding window) that maintains
155/// a limited history of CPU measurements. It guarantees that it will hold at most
156/// `max_measurements` entries.
157///
158/// The queue tracks two types of data:
159/// 1. "True" measurements: Concrete instances of `Measurement` taken while the task is valid.
160/// 2. Post-invalidation measurements: Placeholders inserted after a task becomes invalid.
161///    These represent missed samples to ensure the temporal history remains accurate.
162///
163/// Measurements are stored in strictly chronological order. As new measurements are pushed
164/// to the back of the queue, the oldest measurements are automatically popped from the front
165/// once they fall outside the `max_period` window or exceed the `max_measurements` capacity.
166/// No two measurements should have the same timestamp.
167#[derive(Debug)]
168pub struct MeasurementsQueue {
169    values: VecDeque<Measurement>,
170    most_recent_measurement: MostRecentMeasurement,
171    ts: Arc<dyn TimeSource + Send + Sync>,
172    max_period: zx::BootDuration,
173    max_measurements: usize,
174}
175
176impl MeasurementsQueue {
177    pub fn new(max_measurements: usize, ts: Arc<dyn TimeSource + Send + Sync>) -> Self {
178        Self {
179            values: VecDeque::new(),
180            most_recent_measurement: MostRecentMeasurement::Init,
181            ts,
182            max_period: (CPU_SAMPLE_PERIOD * max_measurements as u32).into(),
183            max_measurements,
184        }
185    }
186
187    pub fn insert(&mut self, measurement: Measurement) {
188        self.insert_internal(Some(measurement));
189    }
190
191    /// Insert a false measurement, typically after the invalidation of a task.
192    pub fn insert_post_invalidation(&mut self) {
193        self.insert_internal(None);
194    }
195
196    /// Checks whether or not there are any true measurements. This says nothing
197    /// about the number of post invalidation measurements.
198    pub fn no_true_measurements(&self) -> bool {
199        self.values.is_empty()
200    }
201
202    /// Access the youngest true Measurement in the queue.
203    /// Returns `None` if there are `post_invalidation_measurements`, or if there
204    /// are no true measurements.
205    pub fn most_recent_measurement(&self) -> Option<&'_ Measurement> {
206        match self.most_recent_measurement {
207            MostRecentMeasurement::Init | MostRecentMeasurement::PostInvalidationMeasurement => {
208                None
209            }
210            MostRecentMeasurement::Measurement(ref v) => Some(v),
211        }
212    }
213
214    #[cfg(test)]
215    pub fn true_measurement_count(&self) -> usize {
216        self.values.len()
217    }
218
219    fn insert_internal(&mut self, measurement_wrapper: Option<Measurement>) {
220        self.most_recent_measurement.update(measurement_wrapper.clone());
221
222        if let Some(measurement) = measurement_wrapper {
223            self.values.push_back(measurement);
224        }
225
226        self.clean_stale();
227    }
228
229    fn clean_stale(&mut self) {
230        let now = zx::BootInstant::from_nanos(self.ts.now());
231
232        while let Some(oldest) = self.values.front() {
233            if (*oldest.timestamp() > now - self.max_period)
234                && self.values.len() <= self.max_measurements
235            {
236                return;
237            }
238            self.values.pop_front();
239        }
240    }
241
242    pub fn iter_sorted(&self) -> impl DoubleEndedIterator<Item = &Measurement> {
243        self.values.iter().rev()
244    }
245
246    pub fn record_to_node(&self, node: &inspect::Node) {
247        let count = self.values.len();
248        let timestamps = node.create_int_array(TIMESTAMPS, count);
249        let cpu_times = node.create_int_array(CPU_TIMES, count);
250        let queue_times = node.create_int_array(QUEUE_TIMES, count);
251
252        for (i, measurement) in self.iter_sorted().rev().enumerate() {
253            timestamps.set(i, measurement.timestamp.into_nanos());
254            cpu_times.set(i, measurement.cpu_time.into_nanos());
255            queue_times.set(i, measurement.queue_time.into_nanos());
256        }
257        node.record(timestamps);
258        node.record(cpu_times);
259        node.record(queue_times);
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use injectable_time::FakeTime;
267    use std::time::Duration;
268    use zx::{BootInstant, MonotonicDuration};
269
270    fn insert_default(q: &mut MeasurementsQueue, clock: &FakeTime) {
271        q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
272        clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
273    }
274
275    fn insert_measurement(q: &mut MeasurementsQueue, clock: &FakeTime, value: Measurement) {
276        q.insert(value);
277        clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
278    }
279
280    #[fuchsia::test]
281    fn insert_to_measurements_queue() {
282        let clock = FakeTime::new();
283        let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
284        q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
285        clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
286
287        assert_eq!(1, q.true_measurement_count());
288
289        for _ in 0..COMPONENT_CPU_MAX_SAMPLES * 2 {
290            q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
291            clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
292        }
293
294        assert_eq!(COMPONENT_CPU_MAX_SAMPLES, q.true_measurement_count());
295    }
296
297    #[fuchsia::test]
298    fn test_back() {
299        let clock = FakeTime::new();
300        let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
301
302        insert_default(&mut q, &clock);
303        insert_default(&mut q, &clock);
304        insert_default(&mut q, &clock);
305        insert_default(&mut q, &clock);
306
307        let now = clock.now();
308        insert_default(&mut q, &clock);
309
310        assert_eq!(now, q.most_recent_measurement().unwrap().timestamp().into_nanos());
311
312        q.insert_post_invalidation();
313
314        assert!(q.most_recent_measurement().is_none());
315    }
316
317    #[fuchsia::test]
318    fn post_invalidation_pushes_true_measurements_out() {
319        let clock = FakeTime::new();
320        let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
321
322        assert!(q.no_true_measurements());
323        assert!(q.most_recent_measurement().is_none());
324        assert_eq!(0, q.true_measurement_count());
325
326        for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
327            insert_default(&mut q, &clock);
328        }
329
330        assert!(!q.no_true_measurements());
331        assert!(q.most_recent_measurement().is_some());
332        assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
333
334        for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
335            q.insert_post_invalidation();
336            clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
337        }
338
339        assert!(!q.no_true_measurements());
340        assert!(q.most_recent_measurement().is_none());
341        assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
342
343        for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
344            q.insert_post_invalidation();
345            clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
346        }
347
348        assert!(q.no_true_measurements());
349        assert!(q.most_recent_measurement().is_none());
350        assert_eq!(0, q.true_measurement_count());
351    }
352
353    #[fuchsia::test]
354    fn add_assign() {
355        // the two queues are shifted apart by two seconds
356        let clock1 = FakeTime::new();
357        let clock2 = FakeTime::new();
358        clock2.set_ticks(Duration::from_secs(2).as_nanos() as i64);
359
360        let mut q1 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock1.clone()));
361        let mut q2 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock2.clone()));
362
363        let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
364        let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
365        m1.cpu_time = Duration::from_secs(1).into();
366        m2.cpu_time = Duration::from_secs(3).into();
367        insert_measurement(&mut q1, &clock1, m1);
368        insert_measurement(&mut q2, &clock2, m2);
369
370        for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
371            let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
372            let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
373            m1.cpu_time = Duration::from_secs(1).into();
374            m2.cpu_time = Duration::from_secs(3).into();
375            insert_measurement(&mut q1, &clock1, m1);
376            insert_measurement(&mut q2, &clock2, m2);
377        }
378
379        q1 += q2;
380
381        let expected: MonotonicDuration = Duration::from_secs(4).into();
382        for m in q1.iter_sorted() {
383            assert_eq!(&expected, m.cpu_time());
384        }
385    }
386
387    #[fuchsia::test]
388    fn add_assign_missing_matches() {
389        let clock1 = FakeTime::new();
390        let clock2 = FakeTime::new();
391        clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
392
393        let max_values = 5;
394
395        let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
396        let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
397
398        let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
399        let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
400        m1.cpu_time = Duration::from_secs(1).into();
401        m2.cpu_time = Duration::from_secs(3).into();
402        insert_measurement(&mut q1, &clock1, m1);
403        insert_measurement(&mut q2, &clock2, m2);
404
405        for _ in 1..max_values {
406            let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
407            let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
408            m1.cpu_time = Duration::from_secs(1).into();
409            m2.cpu_time = Duration::from_secs(3).into();
410            insert_measurement(&mut q1, &clock1, m1);
411            insert_measurement(&mut q2, &clock2, m2);
412        }
413
414        // t   q1  q2
415        // -------------
416        // 0   1
417        // -------------
418        // 60  1
419        // -------------
420        // 120 1
421        // 125     3
422        // -------------
423        // 180 1
424        // 185     3
425        // -------------
426        // 240 1
427        // 245     3
428        // -------------
429        // 300
430        // 305     3
431        // -------------
432        // 360
433        // 365     3
434        // -------------
435        // 420
436        // 425
437
438        // the merged clock needs to have the largest time; in this case,
439        // it's known that queue_clock2 is "more recent"
440        clock1.set_ticks(clock2.now());
441        q1 += q2;
442
443        let actual: Vec<&Measurement> = q1.iter_sorted().collect();
444
445        let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
446        assert_eq!(&d(3), actual[0].cpu_time());
447        assert_eq!(&d(3), actual[1].cpu_time());
448        assert_eq!(&d(4), actual[2].cpu_time());
449        assert_eq!(&d(4), actual[3].cpu_time());
450        assert_eq!(max_values - 1, actual.len());
451    }
452
453    #[fuchsia::test]
454    fn add_assign_post_invalidation() {
455        let clock1 = FakeTime::new();
456        let clock2 = FakeTime::new();
457        clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
458
459        let max_values = 5;
460
461        let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
462        let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
463
464        for _ in 0..max_values {
465            let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
466            let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
467            m1.cpu_time = Duration::from_secs(1).into();
468            m2.cpu_time = Duration::from_secs(3).into();
469            insert_measurement(&mut q1, &clock1, m1);
470            insert_measurement(&mut q2, &clock2, m2);
471        }
472
473        q1.insert_post_invalidation();
474        q2.insert_post_invalidation();
475
476        clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
477        q1.insert_post_invalidation();
478
479        clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
480        q1.insert_post_invalidation();
481
482        // t   q1  q2
483        // -------------
484        // 60  1
485        // -------------
486        // 120 1
487        // -------------
488        // 180 1
489        // 185     3
490        // -------------
491        // 240 1
492        // 245     3
493        // -------------
494        // 300 1
495        // 305     3
496        // -------------
497        // 360 p
498        // 365     3
499        // -------------
500        // 420 p
501        // 425     3
502        // -------------
503        // 480 p
504        // 485     p
505        // -------------
506
507        q1 += q2;
508
509        let actual: Vec<&Measurement> = q1.iter_sorted().collect();
510
511        let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
512        assert_eq!(&d(3), actual[0].cpu_time());
513        assert_eq!(&d(3), actual[1].cpu_time());
514        assert_eq!(&d(4), actual[2].cpu_time());
515        assert_eq!(&d(4), actual[2].cpu_time());
516        assert_eq!(4, actual.len());
517    }
518
519    #[fuchsia::test]
520    fn size_limited_to_max_no_matter_duration() {
521        let max_values = 20;
522        let mut q = MeasurementsQueue::new(max_values, Arc::new(FakeTime::new()));
523
524        for _ in 0..(max_values + 100) {
525            q.insert(Measurement::empty(BootInstant::get()));
526        }
527
528        assert_eq!(max_values, q.true_measurement_count());
529    }
530}