1use crate::task_metrics::component_stats::ComponentStats;
6use crate::task_metrics::constants::*;
7use crate::task_metrics::measurement::{Measurement, MeasurementsQueue};
8use crate::task_metrics::runtime_stats_source::{
9 ComponentStartedInfo, RuntimeStatsContainer, RuntimeStatsSource,
10};
11use crate::task_metrics::task_info::{create_cpu_histogram, TaskInfo};
12use async_trait::async_trait;
13use errors::ModelError;
14use fidl_fuchsia_component_runner::Task as DiagnosticsTask;
15use fuchsia_async as fasync;
16use fuchsia_inspect::{self as inspect, ArrayProperty, HistogramProperty};
17use futures::channel::{mpsc, oneshot};
18use futures::lock::Mutex;
19use futures::{FutureExt, StreamExt};
20use hooks::{Event, EventPayload, EventType, HasEventType, Hook, HooksRegistration};
21use injectable_time::{BootInstant, TimeSource};
22use log::warn;
23use moniker::{ExtendedMoniker, Moniker};
24use std::collections::{BTreeMap, VecDeque};
25use std::fmt::Debug;
26use std::sync::{Arc, Weak};
27use zx::{self as zx, sys as zx_sys, HandleBased};
28
29macro_rules! maybe_return {
30 ($e:expr) => {
31 match $e {
32 None => return,
33 Some(v) => v,
34 }
35 };
36}
37
38const MAX_INSPECT_SIZE : usize = 2 * 1024 * 1024 ;
39
40const AGGREGATE_SAMPLES: &'static str = "@aggregated";
41
42pub struct ComponentTreeStats<T: RuntimeStatsSource + Debug> {
44 tree: Mutex<BTreeMap<ExtendedMoniker, Arc<Mutex<ComponentStats<T>>>>>,
46
47 tasks: Mutex<BTreeMap<zx_sys::zx_koid_t, Weak<Mutex<TaskInfo<T>>>>>,
50
51 node: inspect::Node,
53
54 histograms_node: inspect::Node,
56
57 processing_times: inspect::IntExponentialHistogramProperty,
59
60 sampler_task: Mutex<Option<fasync::Task<()>>>,
62
63 totals: Mutex<AggregatedStats>,
65
66 _wait_diagnostics_drain: fasync::Task<()>,
67
68 diagnostics_waiter_task_sender: mpsc::UnboundedSender<fasync::Task<()>>,
69
70 time_source: Arc<dyn TimeSource + Send + Sync>,
71
72 aggregated_dead_task_data: Mutex<MeasurementsQueue>,
76
77 exited_measurements: Mutex<Measurement>,
79}
80
81impl<T: 'static + RuntimeStatsSource + Debug + Send + Sync> ComponentTreeStats<T> {
82 pub async fn new(node: inspect::Node) -> Arc<Self> {
83 Self::new_with_timesource(node, Arc::new(BootInstant::new())).await
84 }
85
86 async fn new_with_timesource(
87 node: inspect::Node,
88 time_source: Arc<dyn TimeSource + Send + Sync>,
89 ) -> Arc<Self> {
90 let processing_times = node.create_int_exponential_histogram(
91 "processing_times_ns",
92 inspect::ExponentialHistogramParams {
93 floor: 1000,
94 initial_step: 1000,
95 step_multiplier: 2,
96 buckets: 16,
97 },
98 );
99
100 let histograms_node = node.create_child("histograms");
101 let totals = Mutex::new(AggregatedStats::new());
102 let (snd, rcv) = mpsc::unbounded();
103 let this = Arc::new(Self {
104 tree: Mutex::new(BTreeMap::new()),
105 tasks: Mutex::new(BTreeMap::new()),
106 node,
107 histograms_node,
108 processing_times,
109 sampler_task: Mutex::new(None),
110 totals,
111 diagnostics_waiter_task_sender: snd,
112 _wait_diagnostics_drain: fasync::Task::spawn(async move {
113 rcv.for_each_concurrent(None, |rx| async move { rx.await }).await;
114 }),
115 time_source: time_source.clone(),
116 aggregated_dead_task_data: Mutex::new(MeasurementsQueue::new(
117 COMPONENT_CPU_MAX_SAMPLES,
118 time_source,
119 )),
120 exited_measurements: Mutex::new(Measurement::default()),
121 });
122
123 let weak_self = Arc::downgrade(&this);
124
125 let weak_self_for_fut = weak_self.clone();
126 this.node.record_lazy_child("measurements", move || {
127 let weak_self_clone = weak_self_for_fut.clone();
128 async move {
129 if let Some(this) = weak_self_clone.upgrade() {
130 Ok(this.write_measurements_to_inspect().await)
131 } else {
132 Ok(inspect::Inspector::default())
133 }
134 }
135 .boxed()
136 });
137
138 let weak_self_clone_for_fut = weak_self.clone();
139 this.node.record_lazy_child("recent_usage", move || {
140 let weak_self_clone = weak_self_clone_for_fut.clone();
141 async move {
142 if let Some(this) = weak_self_clone.upgrade() {
143 Ok(this.write_recent_usage_to_inspect().await)
144 } else {
145 Ok(inspect::Inspector::default())
146 }
147 }
148 .boxed()
149 });
150 let weak_self_for_fut = weak_self;
151 this.node.record_lazy_child("@total", move || {
152 let weak_self_clone = weak_self_for_fut.clone();
153 async move {
154 if let Some(this) = weak_self_clone.upgrade() {
155 Ok(this.write_totals_to_inspect().await)
156 } else {
157 Ok(inspect::Inspector::default())
158 }
159 }
160 .boxed()
161 });
162
163 this
164 }
165
166 pub async fn start_measuring(self: &Arc<Self>) {
169 let weak_self = Arc::downgrade(self);
170 self.measure().await;
171 *(self.sampler_task.lock().await) = Some(fasync::Task::spawn(async move {
172 loop {
173 fasync::Timer::new(CPU_SAMPLE_PERIOD).await;
174 match weak_self.upgrade() {
175 None => break,
176 Some(this) => {
177 this.measure().await;
178 }
179 }
180 }
181 }));
182 }
183
184 async fn track_ready(&self, moniker: ExtendedMoniker, task: T) {
186 let histogram = create_cpu_histogram(&self.histograms_node, &moniker);
187 if let Ok(task_info) = TaskInfo::try_from(task, Some(histogram), self.time_source.clone()) {
188 let koid = task_info.koid();
189 let arc_task_info = Arc::new(Mutex::new(task_info));
190 let mut stats = ComponentStats::new();
191 stats.add_task(arc_task_info.clone()).await;
192 let stats = Arc::new(Mutex::new(stats));
193 self.tree.lock().await.insert(moniker.clone(), stats);
194 self.tasks.lock().await.insert(koid, Arc::downgrade(&arc_task_info));
195 }
196 }
197
198 async fn write_measurements_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
199 let inspector =
200 inspect::Inspector::new(inspect::InspectorConfig::default().size(MAX_INSPECT_SIZE));
201 let components = inspector.root().create_child("components");
202 let (component_count, task_count) = self.write_measurements(&components).await;
203 self.write_aggregate_measurements(&components).await;
204 inspector.root().record_uint("component_count", component_count);
205 inspector.root().record_uint("task_count", task_count);
206 inspector.root().record(components);
207
208 let stats_node = inspect::stats::StatsNode::new(&inspector);
209 stats_node.record_data_to(inspector.root());
210
211 inspector
212 }
213
214 async fn write_recent_usage_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
215 let inspector = inspect::Inspector::default();
216 self.totals.lock().await.write_recents_to(inspector.root());
217 inspector
218 }
219
220 async fn write_totals_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
221 let inspector = inspect::Inspector::default();
222 self.totals.lock().await.write_totals_to(inspector.root());
223 inspector
224 }
225
226 async fn write_aggregate_measurements(&self, components_node: &inspect::Node) {
227 let locked_aggregate = self.aggregated_dead_task_data.lock().await;
228 if locked_aggregate.no_true_measurements() {
229 return;
230 }
231
232 let aggregate = components_node.create_child(&*AGGREGATE_SAMPLES);
233 locked_aggregate.record_to_node(&aggregate);
234 components_node.record(aggregate);
235 }
236
237 async fn write_measurements(&self, node: &inspect::Node) -> (u64, u64) {
238 let mut task_count = 0;
239 let tree = self.tree.lock().await;
240 for (moniker, stats) in tree.iter() {
241 let stats_guard = stats.lock().await;
242 let key = match moniker {
243 ExtendedMoniker::ComponentManager => moniker.to_string(),
244 ExtendedMoniker::ComponentInstance(m) => {
245 if *m == Moniker::root() {
246 "<root>".to_string()
247 } else {
248 m.to_string()
249 }
250 }
251 };
252 let child = node.create_child(key);
253 task_count += stats_guard.record_to_node(&child).await;
254 node.record(child);
255 }
256 (tree.len() as u64, task_count)
257 }
258
259 pub async fn measure(self: &Arc<Self>) {
263 let start = zx::BootInstant::get();
264
265 let stats = self
267 .tree
268 .lock()
269 .await
270 .iter()
271 .map(|(k, v)| (k.clone(), Arc::downgrade(&v)))
272 .collect::<Vec<_>>();
273 let mut locked_exited_measurements = self.exited_measurements.lock().await;
274 let mut aggregated = Measurement::clone_with_time(&*locked_exited_measurements, start);
275 let mut stats_to_remove = vec![];
276 let mut koids_to_remove = vec![];
277 for (moniker, weak_stats) in stats.into_iter() {
278 if let Some(stats) = weak_stats.upgrade() {
279 let mut stat_guard = stats.lock().await;
280 aggregated += &stat_guard.measure().await;
282 aggregated += &stat_guard.measure_tracked_dead_tasks().await;
283 let (mut stale_koids, exited_cpu_of_deleted) = stat_guard.clean_stale().await;
284 aggregated += &exited_cpu_of_deleted;
285 *locked_exited_measurements += &exited_cpu_of_deleted;
286 koids_to_remove.append(&mut stale_koids);
287 if !stat_guard.is_alive().await {
288 stats_to_remove.push(moniker);
289 }
290 }
291 }
292
293 let mut stats = self.tree.lock().await;
295 for moniker in stats_to_remove {
296 if let Some(stat) = stats.get(&moniker) {
299 if !stat.lock().await.is_alive().await {
300 stats.remove(&moniker);
301 }
302 }
303 }
304
305 let mut tasks = self.tasks.lock().await;
306 for koid in koids_to_remove {
307 tasks.remove(&koid);
308 }
309
310 self.totals.lock().await.insert(aggregated);
311 self.processing_times.insert((zx::BootInstant::get() - start).into_nanos());
312 }
313
314 async fn prune_dead_tasks(self: &Arc<Self>, max_dead_tasks: usize) {
315 let mut all_dead_tasks = BTreeMap::new();
316 for (moniker, component) in self.tree.lock().await.iter() {
317 let dead_tasks = component.lock().await.gather_dead_tasks().await;
318 for (timestamp, task) in dead_tasks {
319 all_dead_tasks.insert(timestamp, (task, moniker.clone()));
320 }
321 }
322
323 if all_dead_tasks.len() <= max_dead_tasks {
324 return;
325 }
326
327 let total = all_dead_tasks.len();
328 let to_remove = all_dead_tasks.iter().take(total - (max_dead_tasks / 2));
329
330 let mut koids_to_remove = vec![];
331 let mut aggregate_data = self.aggregated_dead_task_data.lock().await;
332 for (_, (unlocked_task, _)) in to_remove {
333 let mut task = unlocked_task.lock().await;
334 if let Ok(measurements) = task.take_measurements_queue().await {
335 koids_to_remove.push(task.koid());
336 *aggregate_data += measurements;
337 }
338 }
339
340 let mut stats_to_remove = vec![];
341 for (moniker, stats) in self.tree.lock().await.iter() {
342 let mut stat_guard = stats.lock().await;
343 stat_guard.remove_by_koids(&koids_to_remove).await;
344 if !stat_guard.is_alive().await {
345 stats_to_remove.push(moniker.clone());
346 }
347 }
348
349 let mut stats = self.tree.lock().await;
350 for moniker in stats_to_remove {
351 if let Some(stat) = stats.get(&moniker) {
354 if !stat.lock().await.is_alive().await {
355 stats.remove(&moniker);
356 }
357 }
358 }
359
360 let mut tasks = self.tasks.lock().await;
361 for koid in koids_to_remove {
362 tasks.remove(&koid);
363 }
364 }
365
366 async fn on_component_started<P, C>(self: &Arc<Self>, moniker: &Moniker, runtime: &P)
367 where
368 P: ComponentStartedInfo<C, T>,
369 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
370 {
371 if let Some(receiver) = runtime.get_receiver().await {
372 let task = fasync::Task::spawn(Self::diagnostics_waiter_task(
373 Arc::downgrade(&self),
374 moniker.clone().into(),
375 receiver,
376 runtime.start_time(),
377 ));
378 let _ = self.diagnostics_waiter_task_sender.unbounded_send(task);
379 }
380 }
381
382 async fn diagnostics_waiter_task<C>(
383 weak_self: Weak<Self>,
384 moniker: ExtendedMoniker,
385 receiver: oneshot::Receiver<C>,
386 start_time: zx::BootInstant,
387 ) where
388 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
389 {
390 let mut source = maybe_return!(receiver.await.ok());
391 let this = maybe_return!(weak_self.upgrade());
392 let mut tree_lock = this.tree.lock().await;
393 let stats = tree_lock
394 .entry(moniker.clone())
395 .or_insert_with(|| Arc::new(Mutex::new(ComponentStats::new())));
396 let histogram = create_cpu_histogram(&this.histograms_node, &moniker);
397 let mut task_info =
398 maybe_return!(source.take_component_task().and_then(|task| TaskInfo::try_from(
399 task,
400 Some(histogram),
401 this.time_source.clone()
402 )
403 .ok()));
404
405 let parent_koid = source
406 .take_parent_task()
407 .and_then(|task| TaskInfo::try_from(task, None, this.time_source.clone()).ok())
408 .map(|task| task.koid());
409 let koid = task_info.koid();
410
411 task_info.record_measurement_with_start_time(start_time);
417 task_info.measure_if_no_parent().await;
418
419 let mut task_guard = this.tasks.lock().await;
420
421 let task_info = match parent_koid {
422 None => {
423 Arc::new(Mutex::new(task_info))
426 }
427 Some(parent_koid) => {
428 task_info.has_parent_task = true;
429 let task_info = Arc::new(Mutex::new(task_info));
430 if let Some(parent) = task_guard.get(&parent_koid).and_then(|p| p.upgrade()) {
431 let mut parent_guard = parent.lock().await;
432 parent_guard.add_child(Arc::downgrade(&task_info));
433 }
434 task_info
435 }
436 };
437 task_guard.insert(koid, Arc::downgrade(&task_info));
438 stats.lock().await.add_task(task_info).await;
439 drop(task_guard);
440 drop(tree_lock);
441 this.prune_dead_tasks(MAX_DEAD_TASKS).await;
442 }
443}
444
445impl ComponentTreeStats<DiagnosticsTask> {
446 pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
447 vec![HooksRegistration::new(
448 "ComponentTreeStats",
449 vec![EventType::Started],
450 Arc::downgrade(self) as Weak<dyn Hook>,
451 )]
452 }
453
454 pub async fn track_component_manager_stats(&self) {
456 match fuchsia_runtime::job_default().duplicate_handle(zx::Rights::SAME_RIGHTS) {
457 Ok(job) => {
458 self.track_ready(ExtendedMoniker::ComponentManager, DiagnosticsTask::Job(job))
459 .await;
460 }
461 Err(err) => warn!(
462 "Failed to duplicate component manager job. Not tracking its own stats: {:?}",
463 err
464 ),
465 }
466 }
467}
468
469#[async_trait]
470impl Hook for ComponentTreeStats<DiagnosticsTask> {
471 async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
472 let target_moniker = event
473 .target_moniker
474 .unwrap_instance_moniker_or(ModelError::UnexpectedComponentManagerMoniker)?;
475 match event.event_type() {
476 EventType::Started => {
477 if let EventPayload::Started { runtime, .. } = &event.payload {
478 self.on_component_started(target_moniker, runtime).await;
479 }
480 }
481 _ => {}
482 }
483 Ok(())
484 }
485}
486
487struct AggregatedStats {
488 measurements: VecDeque<Measurement>,
490}
491
492impl AggregatedStats {
493 fn new() -> Self {
494 Self { measurements: VecDeque::with_capacity(COMPONENT_CPU_MAX_SAMPLES) }
495 }
496
497 fn insert(&mut self, measurement: Measurement) {
498 while self.measurements.len() >= COMPONENT_CPU_MAX_SAMPLES {
499 self.measurements.pop_front();
500 }
501 self.measurements.push_back(measurement);
502 }
503
504 fn write_totals_to(&self, node: &inspect::Node) {
505 let count = self.measurements.len();
506 let timestamps = node.create_int_array(TIMESTAMPS, count);
507 let cpu_times = node.create_int_array(CPU_TIMES, count);
508 let queue_times = node.create_int_array(QUEUE_TIMES, count);
509 for (i, measurement) in self.measurements.iter().enumerate() {
510 timestamps.set(i, measurement.timestamp().into_nanos());
511 cpu_times.set(i, measurement.cpu_time().into_nanos());
512 queue_times.set(i, measurement.queue_time().into_nanos());
513 }
514 node.record(timestamps);
515 node.record(cpu_times);
516 node.record(queue_times);
517 }
518
519 fn write_recents_to(&self, node: &inspect::Node) {
520 if self.measurements.is_empty() {
521 return;
522 }
523 if self.measurements.len() >= 2 {
524 let measurement = self.measurements.get(self.measurements.len() - 2).unwrap();
525 node.record_int("previous_cpu_time", measurement.cpu_time().into_nanos());
526 node.record_int("previous_queue_time", measurement.queue_time().into_nanos());
527 node.record_int("previous_timestamp", measurement.timestamp().into_nanos());
528 }
529 let measurement = self.measurements.get(self.measurements.len() - 1).unwrap();
530 node.record_int("recent_cpu_time", measurement.cpu_time().into_nanos());
531 node.record_int("recent_queue_time", measurement.queue_time().into_nanos());
532 node.record_int("recent_timestamp", measurement.timestamp().into_nanos());
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use crate::task_metrics::testing::{FakeDiagnosticsContainer, FakeRuntime, FakeTask};
540 use diagnostics_assertions::{assert_data_tree, AnyProperty};
541 use diagnostics_hierarchy::DiagnosticsHierarchy;
542 use fuchsia_inspect::DiagnosticsHierarchyGetter;
543
544 use injectable_time::{FakeTime, IncrementingFakeTime};
545
546 #[fuchsia::test]
547 async fn total_tracks_cpu_after_termination() {
548 let inspector = inspect::Inspector::default();
549 let clock = Arc::new(FakeTime::new());
550 let stats = ComponentTreeStats::new_with_timesource(
551 inspector.root().create_child("stats"),
552 clock.clone(),
553 )
554 .await;
555
556 let mut previous_task_count = 0;
557 for i in 0..10 {
558 clock.add_ticks(1);
559 let component_task = FakeTask::new(
560 i as u64,
561 create_measurements_vec_for_fake_task(COMPONENT_CPU_MAX_SAMPLES as i64 * 3, 2, 4),
562 );
563
564 let moniker = Moniker::try_from(vec![format!("moniker-{}", i).as_ref()]).unwrap();
565 let fake_runtime =
566 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
567 stats.on_component_started(&moniker, &fake_runtime).await;
568
569 loop {
570 let current = stats.tree.lock().await.len();
571 if current != previous_task_count {
572 previous_task_count = current;
573 break;
574 }
575 fasync::Timer::new(fasync::MonotonicInstant::after(
576 zx::MonotonicDuration::from_millis(100i64),
577 ))
578 .await;
579 }
580 }
581
582 assert_eq!(stats.tasks.lock().await.len(), 10);
583 assert_eq!(stats.tree.lock().await.len(), 10);
584
585 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES - 2 {
586 stats.measure().await;
587 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
588 }
589
590 {
593 let totals = stats.totals.lock().await;
594 let recent_measurement = totals
595 .measurements
596 .get(totals.measurements.len() - 1)
597 .expect("there's at least one measurement");
598 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
599 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
600
601 let previous_measurement = totals
602 .measurements
603 .get(totals.measurements.len() - 2)
604 .expect("there's a previous measurement");
605 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
606 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320,);
607 }
608
609 for i in 0..10 {
611 let moniker = Moniker::try_from(vec![format!("moniker-{}", i).as_ref()]).unwrap();
612 for task in stats
613 .tree
614 .lock()
615 .await
616 .get(&moniker.into())
617 .unwrap()
618 .lock()
619 .await
620 .tasks_mut()
621 .iter_mut()
622 {
623 task.lock().await.force_terminate().await;
624 clock.add_ticks(1);
627 }
628 }
629
630 {
632 let totals = stats.totals.lock().await;
633 let recent_measurement = totals
634 .measurements
635 .get(totals.measurements.len() - 1)
636 .expect("there's at least one measurement");
637 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
638 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
639
640 let previous_measurement = totals
641 .measurements
642 .get(totals.measurements.len() - 2)
643 .expect("there's a previous measurement");
644 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
645 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320);
646 }
647
648 stats.measure().await;
650 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
651
652 {
653 let totals = stats.totals.lock().await;
654 let recent_measurement = totals
655 .measurements
656 .get(totals.measurements.len() - 1)
657 .expect("there's at least one measurement");
658 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
659 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
660
661 let previous_measurement = totals
662 .measurements
663 .get(totals.measurements.len() - 2)
664 .expect("there's a previous measurement");
665 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1180);
666 assert_eq!(previous_measurement.queue_time().into_nanos(), 2360);
667 }
668
669 stats.measure().await;
670 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
671
672 {
673 let totals = stats.totals.lock().await;
674 let recent_measurement = totals
675 .measurements
676 .get(totals.measurements.len() - 1)
677 .expect("there's at least one measurement");
678 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
679 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
680
681 let previous_measurement = totals
682 .measurements
683 .get(totals.measurements.len() - 2)
684 .expect("there's a previous measurement");
685 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
686 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
687 }
688
689 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
691 stats.measure().await;
692 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
693 }
694
695 assert_eq!(stats.tasks.lock().await.len(), 0);
697 assert_eq!(stats.tree.lock().await.len(), 0);
698
699 {
701 let totals = stats.totals.lock().await;
702 let recent_measurement = totals
703 .measurements
704 .get(totals.measurements.len() - 1)
705 .expect("there's at least one measurement");
706 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
707 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
708
709 let previous_measurement = totals
710 .measurements
711 .get(totals.measurements.len() - 2)
712 .expect("there's a previous measurement");
713 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
714 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
715 }
716 }
717
718 #[fuchsia::test]
719 async fn components_are_deleted_when_all_tasks_are_gone() {
720 let inspector = inspect::Inspector::default();
721 let clock = Arc::new(FakeTime::new());
722 let stats = ComponentTreeStats::new_with_timesource(
723 inspector.root().create_child("stats"),
724 clock.clone(),
725 )
726 .await;
727 let moniker: Moniker = vec!["a"].try_into().unwrap();
728 let moniker: ExtendedMoniker = moniker.into();
729 stats.track_ready(moniker.clone(), FakeTask::default()).await;
730 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES {
731 stats.measure().await;
732 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
733 }
734 assert_eq!(stats.tree.lock().await.len(), 1);
735 assert_eq!(stats.tasks.lock().await.len(), 1);
736 assert_eq!(
737 stats.tree.lock().await.get(&moniker).unwrap().lock().await.total_measurements().await,
738 COMPONENT_CPU_MAX_SAMPLES
739 );
740
741 for task in
743 stats.tree.lock().await.get(&moniker).unwrap().lock().await.tasks_mut().iter_mut()
744 {
745 task.lock().await.force_terminate().await;
746 clock.add_ticks(1);
747 }
748
749 for i in 0..COMPONENT_CPU_MAX_SAMPLES {
751 stats.measure().await;
752 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
753 assert_eq!(
754 stats
755 .tree
756 .lock()
757 .await
758 .get(&moniker)
759 .unwrap()
760 .lock()
761 .await
762 .total_measurements()
763 .await,
764 COMPONENT_CPU_MAX_SAMPLES - i,
765 );
766 }
767 stats.measure().await;
768 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
769 assert!(stats.tree.lock().await.get(&moniker).is_none());
770 assert_eq!(stats.tree.lock().await.len(), 0);
771 assert_eq!(stats.tasks.lock().await.len(), 0);
772 }
773
774 fn create_measurements_vec_for_fake_task(
775 num_measurements: i64,
776 init_cpu: i64,
777 init_queue: i64,
778 ) -> Vec<zx::TaskRuntimeInfo> {
779 let mut v = vec![];
780 for i in 0..num_measurements {
781 v.push(zx::TaskRuntimeInfo {
782 cpu_time: i * init_cpu,
783 queue_time: i * init_queue,
784 ..zx::TaskRuntimeInfo::default()
785 });
786 }
787
788 v
789 }
790
791 #[fuchsia::test]
792 async fn dead_tasks_are_pruned() {
793 let clock = Arc::new(FakeTime::new());
794 let inspector = inspect::Inspector::default();
795 let stats = Arc::new(
796 ComponentTreeStats::new_with_timesource(
797 inspector.root().create_child("stats"),
798 clock.clone(),
799 )
800 .await,
801 );
802
803 let mut previous_task_count = 0;
804 for i in 0..(MAX_DEAD_TASKS * 2) {
805 clock.add_ticks(1);
806 let component_task =
807 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(300, 2, 4));
808
809 let moniker = Moniker::try_from(vec![format!("moniker-{}", i).as_ref()]).unwrap();
810 let fake_runtime =
811 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
812 stats.on_component_started(&moniker, &fake_runtime).await;
813
814 loop {
815 let current = stats.tree.lock().await.len();
816 if current != previous_task_count {
817 previous_task_count = current;
818 break;
819 }
820 fasync::Timer::new(fasync::MonotonicInstant::after(
821 zx::MonotonicDuration::from_millis(100i64),
822 ))
823 .await;
824 }
825
826 for task in stats
827 .tree
828 .lock()
829 .await
830 .get(&moniker.into())
831 .unwrap()
832 .lock()
833 .await
834 .tasks_mut()
835 .iter_mut()
836 {
837 task.lock().await.force_terminate().await;
838 clock.add_ticks(1);
839 }
840 }
841
842 let task_count = stats.tasks.lock().await.len();
843 let moniker_count = stats.tree.lock().await.len();
844 assert_eq!(task_count, 88);
845 assert_eq!(moniker_count, 88);
846 }
847
848 #[fuchsia::test]
849 async fn aggregated_data_available_inspect() {
850 let max_dead_tasks = 4;
851 let clock = Arc::new(FakeTime::new());
852 let inspector = inspect::Inspector::default();
853 let stats = Arc::new(
854 ComponentTreeStats::new_with_timesource(
855 inspector.root().create_child("stats"),
856 clock.clone(),
857 )
858 .await,
859 );
860
861 let mut moniker_list = vec![];
862 for i in 0..(max_dead_tasks * 2) {
863 clock.add_ticks(1);
864 let moniker = Moniker::try_from(vec![format!("moniker-{}", i).as_ref()]).unwrap();
865 moniker_list.push(moniker.clone());
866 let component_task =
867 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(5, 1, 1));
868 stats.track_ready(moniker.into(), component_task).await;
869 }
870
871 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
872 stats.measure().await;
873 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
874 stats.measure().await;
875 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
876 stats.measure().await;
877
878 assert_data_tree!(inspector, root: {
879 stats: contains {
880 measurements: contains {
881 components: {
882 "moniker-0": contains {},
883 "moniker-1": contains {},
884 "moniker-2": contains {},
885 "moniker-3": contains {},
886 "moniker-4": contains {},
887 "moniker-5": contains {},
888 "moniker-6": contains {},
889 "moniker-7": contains {},
890 }
891 }
892 }
893 });
894
895 for moniker in moniker_list {
896 for task in stats
897 .tree
898 .lock()
899 .await
900 .get(&moniker.clone().into())
901 .unwrap()
902 .lock()
903 .await
904 .tasks_mut()
905 .iter_mut()
906 {
907 task.lock().await.force_terminate().await;
908 clock.add_ticks(1);
911 }
912 }
913
914 stats.prune_dead_tasks(max_dead_tasks).await;
915
916 let hierarchy = inspector.get_diagnostics_hierarchy();
917 assert_data_tree!(inspector, root: {
918 stats: contains {
919 measurements: contains {
920 components: {
921 "@aggregated": {
922 "timestamps": AnyProperty,
923 "cpu_times": vec![0i64, 6i64, 12i64],
924 "queue_times": vec![0i64, 6i64, 12i64],
925 },
926 "moniker-6": contains {},
927 "moniker-7": contains {},
928 }
929 }
930 }
931 });
932 let (timestamps, _, _) = get_data(&hierarchy, "@aggregated", None);
933 assert_eq!(timestamps.len(), 3);
934 assert!(timestamps[1] > timestamps[0]);
935 assert!(timestamps[2] > timestamps[1]);
936 }
937
938 #[fuchsia::test]
939 async fn total_holds_sum_of_stats() {
940 let inspector = inspect::Inspector::default();
941 let stats = ComponentTreeStats::new(inspector.root().create_child("stats")).await;
942 stats.measure().await;
943 stats
944 .track_ready(
945 ExtendedMoniker::ComponentInstance(vec!["a"].try_into().unwrap()),
946 FakeTask::new(
947 1,
948 vec![
949 zx::TaskRuntimeInfo {
950 cpu_time: 2,
951 queue_time: 4,
952 ..zx::TaskRuntimeInfo::default()
953 },
954 zx::TaskRuntimeInfo {
955 cpu_time: 6,
956 queue_time: 8,
957 ..zx::TaskRuntimeInfo::default()
958 },
959 ],
960 ),
961 )
962 .await;
963 stats
964 .track_ready(
965 ExtendedMoniker::ComponentInstance(vec!["b"].try_into().unwrap()),
966 FakeTask::new(
967 2,
968 vec![
969 zx::TaskRuntimeInfo {
970 cpu_time: 1,
971 queue_time: 3,
972 ..zx::TaskRuntimeInfo::default()
973 },
974 zx::TaskRuntimeInfo {
975 cpu_time: 5,
976 queue_time: 7,
977 ..zx::TaskRuntimeInfo::default()
978 },
979 ],
980 ),
981 )
982 .await;
983
984 stats.measure().await;
985 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
986 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
987 assert_eq!(timestamps.len(), 2);
988 assert_eq!(cpu_times, vec![0, 2 + 1]);
989 assert_eq!(queue_times, vec![0, 4 + 3]);
990
991 stats.measure().await;
992 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
993 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
994 assert_eq!(timestamps.len(), 3);
995 assert_eq!(cpu_times, vec![0, 2 + 1, 6 + 5]);
996 assert_eq!(queue_times, vec![0, 4 + 3, 8 + 7]);
997 }
998
999 #[fuchsia::test]
1000 async fn recent_usage() {
1001 let inspector = inspect::Inspector::default();
1003 let stats = ComponentTreeStats::new(inspector.root().create_child("stats")).await;
1004 stats.measure().await;
1005
1006 stats
1007 .track_ready(
1008 ExtendedMoniker::ComponentInstance(vec!["a"].try_into().unwrap()),
1009 FakeTask::new(
1010 1,
1011 vec![
1012 zx::TaskRuntimeInfo {
1013 cpu_time: 2,
1014 queue_time: 4,
1015 ..zx::TaskRuntimeInfo::default()
1016 },
1017 zx::TaskRuntimeInfo {
1018 cpu_time: 6,
1019 queue_time: 8,
1020 ..zx::TaskRuntimeInfo::default()
1021 },
1022 ],
1023 ),
1024 )
1025 .await;
1026 stats
1027 .track_ready(
1028 ExtendedMoniker::ComponentInstance(vec!["b"].try_into().unwrap()),
1029 FakeTask::new(
1030 2,
1031 vec![
1032 zx::TaskRuntimeInfo {
1033 cpu_time: 1,
1034 queue_time: 3,
1035 ..zx::TaskRuntimeInfo::default()
1036 },
1037 zx::TaskRuntimeInfo {
1038 cpu_time: 5,
1039 queue_time: 7,
1040 ..zx::TaskRuntimeInfo::default()
1041 },
1042 ],
1043 ),
1044 )
1045 .await;
1046
1047 stats.measure().await;
1048 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1049
1050 assert_data_tree!(&hierarchy, root: contains {
1053 stats: contains {
1054 recent_usage: {
1055 previous_cpu_time: 0i64,
1056 previous_queue_time: 0i64,
1057 previous_timestamp: AnyProperty,
1058 recent_cpu_time: 2 + 1i64,
1059 recent_queue_time: 4 + 3i64,
1060 recent_timestamp: AnyProperty,
1061 }
1062 }
1063 });
1064
1065 let initial_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1067 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
1068 assert_eq!(timestamps.len(), 2);
1069 assert_eq!(timestamps[1], initial_timestamp);
1070 assert_eq!(cpu_times, vec![0, 2 + 1]);
1071 assert_eq!(queue_times, vec![0, 4 + 3]);
1072
1073 stats.measure().await;
1075 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1076
1077 assert_data_tree!(&hierarchy, root: contains {
1079 stats: contains {
1080 recent_usage: {
1081 previous_cpu_time: 2 + 1i64,
1082 previous_queue_time: 4 + 3i64,
1083 previous_timestamp: initial_timestamp,
1084 recent_cpu_time: 6 + 5i64,
1085 recent_queue_time: 8 + 7i64,
1086 recent_timestamp: AnyProperty,
1087 }
1088 }
1089 });
1090
1091 let recent_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1093 assert!(recent_timestamp > initial_timestamp);
1094 }
1095
1096 #[fuchsia::test]
1097 async fn component_stats_are_available_in_inspect() {
1098 let inspector = inspect::Inspector::default();
1099 let stats = ComponentTreeStats::new(inspector.root().create_child("stats")).await;
1100 stats
1101 .track_ready(
1102 ExtendedMoniker::ComponentInstance(vec!["a"].try_into().unwrap()),
1103 FakeTask::new(
1104 1,
1105 vec![
1106 zx::TaskRuntimeInfo {
1107 cpu_time: 2,
1108 queue_time: 4,
1109 ..zx::TaskRuntimeInfo::default()
1110 },
1111 zx::TaskRuntimeInfo {
1112 cpu_time: 6,
1113 queue_time: 8,
1114 ..zx::TaskRuntimeInfo::default()
1115 },
1116 ],
1117 ),
1118 )
1119 .await;
1120
1121 stats.measure().await;
1122
1123 let hierarchy = inspector.get_diagnostics_hierarchy();
1124 assert_data_tree!(hierarchy, root: {
1125 stats: contains {
1126 measurements: contains {
1127 components: {
1128 "a": {
1129 "1": {
1130 timestamps: AnyProperty,
1131 cpu_times: vec![2i64],
1132 queue_times: vec![4i64],
1133 }
1134 }
1135 }
1136 }
1137 }
1138 });
1139 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1140 assert_eq!(timestamps.len(), 1);
1141
1142 stats.measure().await;
1144
1145 let hierarchy = inspector.get_diagnostics_hierarchy();
1146 assert_data_tree!(hierarchy, root: {
1147 stats: contains {
1148 measurements: contains {
1149 components: {
1150 "a": {
1151 "1": {
1152 timestamps: AnyProperty,
1153 cpu_times: vec![2i64, 6],
1154 queue_times: vec![4i64, 8],
1155 }
1156 }
1157 }
1158 }
1159 }
1160 });
1161 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1162 assert_eq!(timestamps.len(), 2);
1163 assert!(timestamps[1] > timestamps[0]);
1164 }
1165
1166 #[fuchsia::test]
1167 async fn on_started_handles_parent_task() {
1168 let inspector = inspect::Inspector::default();
1169 let clock = Arc::new(FakeTime::new());
1170 clock.add_ticks(20);
1173 let stats = Arc::new(
1174 ComponentTreeStats::new_with_timesource(
1175 inspector.root().create_child("stats"),
1176 clock.clone(),
1177 )
1178 .await,
1179 );
1180 let parent_task = FakeTask::new(
1181 1,
1182 vec![
1183 zx::TaskRuntimeInfo {
1184 cpu_time: 20,
1185 queue_time: 40,
1186 ..zx::TaskRuntimeInfo::default()
1187 },
1188 zx::TaskRuntimeInfo {
1189 cpu_time: 60,
1190 queue_time: 80,
1191 ..zx::TaskRuntimeInfo::default()
1192 },
1193 ],
1194 );
1195 let component_task = FakeTask::new(
1196 2,
1197 vec![
1198 zx::TaskRuntimeInfo {
1199 cpu_time: 2,
1200 queue_time: 4,
1201 ..zx::TaskRuntimeInfo::default()
1202 },
1203 zx::TaskRuntimeInfo {
1204 cpu_time: 6,
1205 queue_time: 8,
1206 ..zx::TaskRuntimeInfo::default()
1207 },
1208 ],
1209 );
1210
1211 let fake_runtime = FakeRuntime::new_with_start_times(
1212 FakeDiagnosticsContainer::new(parent_task.clone(), None),
1213 IncrementingFakeTime::new(3, std::time::Duration::from_nanos(5)),
1214 );
1215 stats
1216 .on_component_started(&Moniker::try_from(vec!["parent"]).unwrap(), &fake_runtime)
1217 .await;
1218
1219 let fake_runtime = FakeRuntime::new_with_start_times(
1220 FakeDiagnosticsContainer::new(component_task, Some(parent_task)),
1221 IncrementingFakeTime::new(8, std::time::Duration::from_nanos(5)),
1222 );
1223 stats.on_component_started(&Moniker::try_from(vec!["child"]).unwrap(), &fake_runtime).await;
1224
1225 loop {
1228 if stats.tree.lock().await.len() == 2 {
1229 break;
1230 }
1231 fasync::Timer::new(fasync::MonotonicInstant::after(
1232 zx::MonotonicDuration::from_millis(100i64),
1233 ))
1234 .await;
1235 }
1236
1237 assert_data_tree!(inspector, root: {
1238 stats: contains {
1239 measurements: contains {
1240 components: {
1241 "parent": {
1242 "1": {
1243 "timestamps": AnyProperty,
1244 "cpu_times": vec![0i64, 20],
1245 "queue_times": vec![0i64, 40],
1246 },
1247 },
1248 "child": {
1249 "2": {
1250 "timestamps": AnyProperty,
1251 "cpu_times": vec![0i64, 2],
1252 "queue_times": vec![0i64, 4],
1253 }
1254 }
1255 }
1256 }
1257 }
1258 });
1259 }
1260
1261 #[fuchsia::test]
1262 async fn child_tasks_garbage_collection() {
1263 let inspector = inspect::Inspector::default();
1264 let clock = Arc::new(FakeTime::new());
1265 let stats = Arc::new(
1266 ComponentTreeStats::new_with_timesource(
1267 inspector.root().create_child("stats"),
1268 clock.clone(),
1269 )
1270 .await,
1271 );
1272 let parent_task = FakeTask::new(
1273 1,
1274 vec![
1275 zx::TaskRuntimeInfo {
1276 cpu_time: 20,
1277 queue_time: 40,
1278 ..zx::TaskRuntimeInfo::default()
1279 },
1280 zx::TaskRuntimeInfo {
1281 cpu_time: 60,
1282 queue_time: 80,
1283 ..zx::TaskRuntimeInfo::default()
1284 },
1285 ],
1286 );
1287 let component_task = FakeTask::new(
1288 2,
1289 vec![zx::TaskRuntimeInfo {
1290 cpu_time: 2,
1291 queue_time: 4,
1292 ..zx::TaskRuntimeInfo::default()
1293 }],
1294 );
1295 let fake_parent_runtime =
1296 FakeRuntime::new(FakeDiagnosticsContainer::new(parent_task.clone(), None));
1297 stats
1298 .on_component_started(&Moniker::try_from(vec!["parent"]).unwrap(), &fake_parent_runtime)
1299 .await;
1300
1301 let child_moniker = Moniker::try_from(vec!["child"]).unwrap();
1302 let fake_runtime =
1303 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, Some(parent_task)));
1304 stats.on_component_started(&child_moniker, &fake_runtime).await;
1305
1306 loop {
1309 if stats.tree.lock().await.len() == 2 {
1310 break;
1311 }
1312 fasync::Timer::new(fasync::MonotonicInstant::after(
1313 zx::MonotonicDuration::from_millis(100i64),
1314 ))
1315 .await;
1316 }
1317
1318 assert_eq!(stats.tree.lock().await.len(), 2);
1319 assert_eq!(stats.tasks.lock().await.len(), 2);
1320
1321 let extended_moniker = child_moniker.into();
1322 for task in stats.tree.lock().await.get(&extended_moniker).unwrap().lock().await.tasks_mut()
1324 {
1325 task.lock().await.force_terminate().await;
1326 clock.add_ticks(1);
1327 }
1328
1329 stats.measure().await;
1331 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1332
1333 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
1335 stats.measure().await;
1336 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1337 }
1338
1339 stats.measure().await;
1341
1342 assert!(stats.tree.lock().await.get(&extended_moniker).is_none());
1344 assert_eq!(stats.tree.lock().await.len(), 1);
1345 assert_eq!(stats.tasks.lock().await.len(), 1);
1346 }
1347
1348 fn get_recent_property(hierarchy: &DiagnosticsHierarchy, name: &str) -> i64 {
1349 hierarchy.get_property_by_path(&vec!["stats", "recent_usage", name]).unwrap().int().unwrap()
1350 }
1351
1352 fn get_data(
1353 hierarchy: &DiagnosticsHierarchy,
1354 moniker: &str,
1355 task: Option<&str>,
1356 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1357 let mut path = vec!["stats", "measurements", "components", moniker];
1358 if let Some(task) = task {
1359 path.push(task);
1360 }
1361 get_data_at(&hierarchy, &path)
1362 }
1363
1364 fn get_data_at(
1365 hierarchy: &DiagnosticsHierarchy,
1366 path: &[&str],
1367 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1368 let node = hierarchy.get_child_by_path(&path).expect("found stats node");
1369 let cpu_times = node
1370 .get_property("cpu_times")
1371 .expect("found cpu")
1372 .int_array()
1373 .expect("cpu are ints")
1374 .raw_values();
1375 let queue_times = node
1376 .get_property("queue_times")
1377 .expect("found queue")
1378 .int_array()
1379 .expect("queue are ints")
1380 .raw_values();
1381 let timestamps = node
1382 .get_property("timestamps")
1383 .expect("found timestamps")
1384 .int_array()
1385 .expect("timestamps are ints")
1386 .raw_values();
1387 (timestamps.into_owned(), cpu_times.into_owned(), queue_times.into_owned())
1388 }
1389}