diagnostics/task_metrics/
measurement.rs1use crate::task_metrics::constants::*;
6use core::cmp::Reverse;
7use fuchsia_inspect::{self as inspect, ArrayProperty};
8
9use injectable_time::TimeSource;
10use std::cmp::{max, Eq, Ord, PartialEq, PartialOrd};
11use std::collections::BinaryHeap;
12use std::ops::{AddAssign, SubAssign};
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Default, PartialOrd, Eq, Ord, PartialEq)]
16pub struct Measurement {
17 timestamp: zx::BootInstant,
18 cpu_time: zx::MonotonicDuration,
19 queue_time: zx::MonotonicDuration,
20}
21
22impl Measurement {
23 pub fn empty(timestamp: zx::BootInstant) -> Self {
24 Self {
25 timestamp,
26 cpu_time: zx::MonotonicDuration::from_nanos(0),
27 queue_time: zx::MonotonicDuration::from_nanos(0),
28 }
29 }
30
31 pub fn clone_with_time(m: &Self, timestamp: zx::BootInstant) -> Self {
32 Self { timestamp, cpu_time: *m.cpu_time(), queue_time: *m.queue_time() }
33 }
34
35 pub fn cpu_time(&self) -> &zx::MonotonicDuration {
37 &self.cpu_time
38 }
39
40 pub fn queue_time(&self) -> &zx::MonotonicDuration {
42 &self.queue_time
43 }
44
45 pub fn timestamp(&self) -> &zx::BootInstant {
47 &self.timestamp
48 }
49
50 fn can_merge(&self, other: &Self) -> bool {
51 u128::from(self.timestamp().into_nanos().abs_diff(other.timestamp().into_nanos()))
52 <= MEASUREMENT_EPSILON.as_nanos()
53 }
54}
55
56impl AddAssign<&Measurement> for Measurement {
57 fn add_assign(&mut self, other: &Measurement) {
58 self.cpu_time += other.cpu_time;
59 self.queue_time += other.queue_time;
60 }
61}
62
63impl SubAssign<&Measurement> for Measurement {
64 fn sub_assign(&mut self, other: &Measurement) {
65 self.cpu_time -= other.cpu_time;
66 self.queue_time -= other.queue_time;
67 }
68}
69
70impl From<zx::TaskRuntimeInfo> for Measurement {
71 fn from(info: zx::TaskRuntimeInfo) -> Self {
72 Measurement::from_runtime_info(info, zx::BootInstant::get())
73 }
74}
75
76impl Measurement {
77 pub(crate) fn from_runtime_info(info: zx::TaskRuntimeInfo, timestamp: zx::BootInstant) -> Self {
78 Self {
79 timestamp,
80 cpu_time: zx::MonotonicDuration::from_nanos(info.cpu_time),
81 queue_time: zx::MonotonicDuration::from_nanos(info.queue_time),
82 }
83 }
84}
85
86#[derive(Debug)]
87enum MostRecentMeasurement {
88 Init,
89 Measurement(Measurement),
90 PostInvalidationMeasurement,
91}
92
93impl MostRecentMeasurement {
94 fn update(&mut self, incoming: Option<Measurement>) {
95 let this = std::mem::replace(self, Self::Init);
96 *self = match (this, incoming) {
97 (Self::Init, Some(m)) => Self::Measurement(m),
98 (_, None) => Self::PostInvalidationMeasurement,
99 (Self::Measurement(m1), Some(m2)) => Self::Measurement(max(m1, m2)),
100 (Self::PostInvalidationMeasurement, _) => Self::PostInvalidationMeasurement,
101 }
102 }
103
104 fn combine(&mut self, incoming: Self) {
105 let this = std::mem::replace(self, Self::Init);
106 *self = match (this, incoming) {
107 (Self::Init, other)
108 | (Self::PostInvalidationMeasurement, other)
109 | (other, Self::PostInvalidationMeasurement)
110 | (other, Self::Init) => other,
111 (Self::Measurement(m1), Self::Measurement(m2)) => Self::Measurement(max(m1, m2)),
112 }
113 }
114}
115
116#[derive(Debug)]
128pub struct MeasurementsQueue {
129 values: BinaryHeap<Reverse<Measurement>>,
130 most_recent_measurement: MostRecentMeasurement,
132 ts: Arc<dyn TimeSource + Send + Sync>,
133 max_period: zx::BootDuration,
134 max_measurements: usize,
135}
136
137impl AddAssign<Self> for MeasurementsQueue {
142 fn add_assign(&mut self, other: Self) {
143 let mut rhs_values = other.values.into_vec();
145 let mut new_heap = BinaryHeap::new();
146
147 while let Some(Reverse(mut lhs)) = self.values.pop() {
148 rhs_values = rhs_values
149 .into_iter()
150 .filter_map(|Reverse(rhs)| {
151 if lhs.can_merge(&rhs) {
152 lhs += &rhs;
153 None
154 } else {
155 Some(Reverse(rhs))
156 }
157 })
158 .collect();
159
160 new_heap.push(Reverse(lhs));
161 }
162
163 for leftover in rhs_values {
164 new_heap.push(leftover);
165 }
166
167 self.values = new_heap;
168 self.most_recent_measurement.combine(other.most_recent_measurement);
169 self.clean_stale();
170 }
171}
172
173impl MeasurementsQueue {
174 pub fn new(max_measurements: usize, ts: Arc<dyn TimeSource + Send + Sync>) -> Self {
175 Self {
176 values: BinaryHeap::new(),
177 most_recent_measurement: MostRecentMeasurement::Init,
178 ts,
179 max_period: (CPU_SAMPLE_PERIOD * max_measurements as u32).into(),
180 max_measurements,
181 }
182 }
183
184 pub fn insert(&mut self, measurement: Measurement) {
187 self.insert_internal(Some(measurement));
188 }
189
190 pub fn insert_post_invalidation(&mut self) {
192 self.insert_internal(None);
193 }
194
195 fn insert_internal(&mut self, measurement_wrapper: Option<Measurement>) {
196 self.most_recent_measurement.update(measurement_wrapper.clone());
197
198 if let Some(measurement) = measurement_wrapper {
199 self.values.push(Reverse(measurement));
200 }
201
202 self.clean_stale();
203 }
204
205 fn clean_stale(&mut self) {
206 let now = zx::BootInstant::from_nanos(self.ts.now());
207 while let Some(Reverse(oldest)) = self.values.peek() {
208 if (*oldest.timestamp() > now - self.max_period)
209 && self.values.len() <= self.max_measurements
210 {
211 return;
212 }
213
214 self.values.pop();
215 }
216 }
217
218 #[cfg(test)]
219 pub fn true_measurement_count(&self) -> usize {
220 self.values.len()
221 }
222
223 pub fn iter_sorted(&self) -> impl DoubleEndedIterator<Item = Measurement> {
231 self.values.clone().into_sorted_vec().into_iter().map(|Reverse(v)| v).into_iter()
232 }
233
234 pub fn no_true_measurements(&self) -> bool {
237 self.values.is_empty()
238 }
239
240 pub fn most_recent_measurement(&self) -> Option<&'_ Measurement> {
244 match self.most_recent_measurement {
245 MostRecentMeasurement::Init | MostRecentMeasurement::PostInvalidationMeasurement => {
246 None
247 }
248 MostRecentMeasurement::Measurement(ref v) => Some(v),
249 }
250 }
251
252 pub fn record_to_node(&self, node: &inspect::Node) {
253 let count = self.values.len();
255 let timestamps = node.create_int_array(TIMESTAMPS, count);
256 let cpu_times = node.create_int_array(CPU_TIMES, count);
257 let queue_times = node.create_int_array(QUEUE_TIMES, count);
258 for (i, measurement) in self.iter_sorted().rev().enumerate() {
259 timestamps.set(i, measurement.timestamp.into_nanos());
260 cpu_times.set(i, measurement.cpu_time.into_nanos());
261 queue_times.set(i, measurement.queue_time.into_nanos());
262 }
263 node.record(timestamps);
264 node.record(cpu_times);
265 node.record(queue_times);
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use injectable_time::FakeTime;
273 use std::time::Duration;
274 use zx::{BootInstant, MonotonicDuration};
275
276 fn insert_default(q: &mut MeasurementsQueue, clock: &FakeTime) {
277 q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
278 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
279 }
280
281 fn insert_measurement(q: &mut MeasurementsQueue, clock: &FakeTime, value: Measurement) {
282 q.insert(value);
283 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
284 }
285
286 #[fuchsia::test]
287 fn insert_to_measurements_queue() {
288 let clock = FakeTime::new();
289 let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
290 q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
291 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
292
293 assert_eq!(1, q.true_measurement_count());
294
295 for _ in 0..COMPONENT_CPU_MAX_SAMPLES * 2 {
296 q.insert(Measurement::empty(BootInstant::from_nanos(clock.now())));
297 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
298 }
299
300 assert_eq!(COMPONENT_CPU_MAX_SAMPLES, q.true_measurement_count());
301 }
302
303 #[fuchsia::test]
304 fn test_back() {
305 let clock = FakeTime::new();
306 let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
307
308 insert_default(&mut q, &clock);
309 insert_default(&mut q, &clock);
310 insert_default(&mut q, &clock);
311 insert_default(&mut q, &clock);
312
313 let now = clock.now();
314 insert_default(&mut q, &clock);
315
316 assert_eq!(now, q.most_recent_measurement().unwrap().timestamp().into_nanos());
317
318 q.insert_post_invalidation();
319
320 assert!(q.most_recent_measurement().is_none());
321 }
322
323 #[fuchsia::test]
324 fn post_invalidation_pushes_true_measurements_out() {
325 let clock = FakeTime::new();
326 let mut q = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock.clone()));
327
328 assert!(q.no_true_measurements());
329 assert!(q.most_recent_measurement().is_none());
330 assert_eq!(0, q.true_measurement_count());
331
332 for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
333 insert_default(&mut q, &clock);
334 }
335
336 assert!(!q.no_true_measurements());
337 assert!(q.most_recent_measurement().is_some());
338 assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
339
340 for _ in 0..COMPONENT_CPU_MAX_SAMPLES / 2 {
341 q.insert_post_invalidation();
342 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
343 }
344
345 assert!(!q.no_true_measurements());
346 assert!(q.most_recent_measurement().is_none());
347 assert_eq!(COMPONENT_CPU_MAX_SAMPLES / 2, q.true_measurement_count());
348
349 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
350 q.insert_post_invalidation();
351 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
352 }
353
354 assert!(q.no_true_measurements());
355 assert!(q.most_recent_measurement().is_none());
356 assert_eq!(0, q.true_measurement_count());
357 }
358
359 #[fuchsia::test]
360 fn add_assign() {
361 let clock1 = FakeTime::new();
363 let clock2 = FakeTime::new();
364 clock2.set_ticks(Duration::from_secs(2).as_nanos() as i64);
365
366 let mut q1 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock1.clone()));
367 let mut q2 = MeasurementsQueue::new(COMPONENT_CPU_MAX_SAMPLES, Arc::new(clock2.clone()));
368
369 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
370 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
371 m1.cpu_time = Duration::from_secs(1).into();
372 m2.cpu_time = Duration::from_secs(3).into();
373 insert_measurement(&mut q1, &clock1, m1);
374 insert_measurement(&mut q2, &clock2, m2);
375
376 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
377 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
378 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
379 m1.cpu_time = Duration::from_secs(1).into();
380 m2.cpu_time = Duration::from_secs(3).into();
381 insert_measurement(&mut q1, &clock1, m1);
382 insert_measurement(&mut q2, &clock2, m2);
383 }
384
385 q1 += q2;
386
387 let expected: MonotonicDuration = Duration::from_secs(4).into();
388 for m in q1.iter_sorted() {
389 assert_eq!(&expected, m.cpu_time());
390 }
391 }
392
393 #[fuchsia::test]
394 fn add_assign_missing_matches() {
395 let clock1 = FakeTime::new();
396 let clock2 = FakeTime::new();
397 clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
398
399 let max_values = 5;
400
401 let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
402 let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
403
404 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
405 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
406 m1.cpu_time = Duration::from_secs(1).into();
407 m2.cpu_time = Duration::from_secs(3).into();
408 insert_measurement(&mut q1, &clock1, m1);
409 insert_measurement(&mut q2, &clock2, m2);
410
411 for _ in 1..max_values {
412 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
413 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
414 m1.cpu_time = Duration::from_secs(1).into();
415 m2.cpu_time = Duration::from_secs(3).into();
416 insert_measurement(&mut q1, &clock1, m1);
417 insert_measurement(&mut q2, &clock2, m2);
418 }
419
420 clock1.set_ticks(clock2.now());
447 q1 += q2;
448
449 let sorted = q1.values.into_sorted_vec();
450 let actual = sorted.iter().map(|Reverse(m)| m).collect::<Vec<_>>();
451
452 let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
453 assert_eq!(&d(3), actual[0].cpu_time());
454 assert_eq!(&d(3), actual[1].cpu_time());
455 assert_eq!(&d(4), actual[2].cpu_time());
456 assert_eq!(&d(4), actual[3].cpu_time());
457 assert_eq!(max_values - 1, actual.len());
458 }
459
460 #[fuchsia::test]
461 fn add_assign_post_invalidation() {
462 let clock1 = FakeTime::new();
463 let clock2 = FakeTime::new();
464 clock2.set_ticks(Duration::from_secs(125).as_nanos() as i64);
465
466 let max_values = 5;
467
468 let mut q1 = MeasurementsQueue::new(max_values, Arc::new(clock1.clone()));
469 let mut q2 = MeasurementsQueue::new(max_values, Arc::new(clock2.clone()));
470
471 for _ in 0..max_values {
472 let mut m1 = Measurement::empty(BootInstant::from_nanos(clock1.now()));
473 let mut m2 = Measurement::empty(BootInstant::from_nanos(clock2.now()));
474 m1.cpu_time = Duration::from_secs(1).into();
475 m2.cpu_time = Duration::from_secs(3).into();
476 insert_measurement(&mut q1, &clock1, m1);
477 insert_measurement(&mut q2, &clock2, m2);
478 }
479
480 q1.insert_post_invalidation();
481 q2.insert_post_invalidation();
482
483 clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
484 q1.insert_post_invalidation();
485
486 clock1.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
487 q1.insert_post_invalidation();
488
489 q1 += q2;
515
516 let sorted = q1.values.into_sorted_vec();
517 let actual = sorted.into_iter().map(|Reverse(m)| m).collect::<Vec<_>>();
518
519 let d = |secs| -> MonotonicDuration { Duration::from_secs(secs).into() };
520 assert_eq!(&d(3), actual[0].cpu_time());
521 assert_eq!(&d(3), actual[1].cpu_time());
522 assert_eq!(&d(4), actual[2].cpu_time());
523 assert_eq!(&d(4), actual[2].cpu_time());
524 assert_eq!(4, actual.len());
525 }
526
527 #[fuchsia::test]
528 fn size_limited_to_max_no_matter_duration() {
529 let max_values = 20;
530 let mut q = MeasurementsQueue::new(max_values, Arc::new(FakeTime::new()));
531
532 for _ in 0..(max_values + 100) {
533 q.insert(Measurement::empty(BootInstant::get()));
534 }
535
536 assert_eq!(max_values, q.true_measurement_count());
537 }
538}