1use crate::task_metrics::component_stats::ComponentStats;
6use crate::task_metrics::constants::*;
7use crate::task_metrics::measurement::{Measurement, MeasurementsQueue};
8use crate::task_metrics::runtime_stats_source::{
9 ComponentStartedInfo, RuntimeStatsContainer, RuntimeStatsSource,
10};
11use crate::task_metrics::task_info::{create_cpu_histogram, TaskInfo};
12use async_trait::async_trait;
13use errors::ModelError;
14use fidl_fuchsia_component_runner::Task as DiagnosticsTask;
15use fuchsia_async as fasync;
16use fuchsia_inspect::{self as inspect, ArrayProperty, HistogramProperty};
17use fuchsia_sync::Mutex;
18use futures::channel::{mpsc, oneshot};
19use futures::{FutureExt, StreamExt};
20use hooks::{Event, EventPayload, EventType, HasEventType, Hook, HooksRegistration};
21use injectable_time::{BootInstant, TimeSource};
22use log::warn;
23use moniker::{ExtendedMoniker, Moniker};
24use std::collections::{BTreeMap, VecDeque};
25use std::fmt::Debug;
26use std::sync::{Arc, Weak};
27use zx::{self as zx, sys as zx_sys, HandleBased};
28
29macro_rules! maybe_return {
30 ($e:expr) => {
31 match $e {
32 None => return,
33 Some(v) => v,
34 }
35 };
36}
37
38const MAX_INSPECT_SIZE : usize = 2 * 1024 * 1024 ;
39
40const AGGREGATE_SAMPLES: &'static str = "@aggregated";
41
42pub struct ComponentTreeStats<T: RuntimeStatsSource + Debug> {
44 tree: Mutex<BTreeMap<ExtendedMoniker, Arc<Mutex<ComponentStats<T>>>>>,
46
47 tasks: Mutex<BTreeMap<zx_sys::zx_koid_t, Weak<Mutex<TaskInfo<T>>>>>,
50
51 node: inspect::Node,
53
54 histograms_node: inspect::Node,
56
57 processing_times: inspect::IntExponentialHistogramProperty,
59
60 sampler_task: Mutex<Option<fasync::Task<()>>>,
62
63 totals: Mutex<AggregatedStats>,
65
66 _wait_diagnostics_drain: fasync::Task<()>,
67
68 diagnostics_waiter_task_sender: mpsc::UnboundedSender<fasync::Task<()>>,
69
70 time_source: Arc<dyn TimeSource + Send + Sync>,
71
72 aggregated_dead_task_data: Mutex<MeasurementsQueue>,
76
77 exited_measurements: Mutex<Measurement>,
79}
80
81impl<T: 'static + RuntimeStatsSource + Debug + Send + Sync> ComponentTreeStats<T> {
82 pub fn new(node: inspect::Node) -> Arc<Self> {
83 Self::new_with_timesource(node, Arc::new(BootInstant::new()))
84 }
85
86 fn new_with_timesource(
87 node: inspect::Node,
88 time_source: Arc<dyn TimeSource + Send + Sync>,
89 ) -> Arc<Self> {
90 let processing_times = node.create_int_exponential_histogram(
91 "processing_times_ns",
92 inspect::ExponentialHistogramParams {
93 floor: 1000,
94 initial_step: 1000,
95 step_multiplier: 2,
96 buckets: 16,
97 },
98 );
99
100 let histograms_node = node.create_child("histograms");
101 let totals = Mutex::new(AggregatedStats::new());
102 let (snd, rcv) = mpsc::unbounded();
103 let this = Arc::new(Self {
104 tree: Mutex::new(BTreeMap::new()),
105 tasks: Mutex::new(BTreeMap::new()),
106 node,
107 histograms_node,
108 processing_times,
109 sampler_task: Mutex::new(None),
110 totals,
111 diagnostics_waiter_task_sender: snd,
112 _wait_diagnostics_drain: fasync::Task::spawn(async move {
113 rcv.for_each_concurrent(None, |rx| async move { rx.await }).await;
114 }),
115 time_source: time_source.clone(),
116 aggregated_dead_task_data: Mutex::new(MeasurementsQueue::new(
117 COMPONENT_CPU_MAX_SAMPLES,
118 time_source,
119 )),
120 exited_measurements: Mutex::new(Measurement::default()),
121 });
122
123 let weak_self = Arc::downgrade(&this);
124
125 let weak_self_for_fut = weak_self.clone();
126 this.node.record_lazy_child("measurements", move || {
127 let weak_self_clone = weak_self_for_fut.clone();
128 async move {
129 if let Some(this) = weak_self_clone.upgrade() {
130 Ok(this.write_measurements_to_inspect())
131 } else {
132 Ok(inspect::Inspector::default())
133 }
134 }
135 .boxed()
136 });
137
138 let weak_self_clone_for_fut = weak_self.clone();
139 this.node.record_lazy_child("recent_usage", move || {
140 let weak_self_clone = weak_self_clone_for_fut.clone();
141 async move {
142 if let Some(this) = weak_self_clone.upgrade() {
143 Ok(this.write_recent_usage_to_inspect())
144 } else {
145 Ok(inspect::Inspector::default())
146 }
147 }
148 .boxed()
149 });
150 let weak_self_for_fut = weak_self;
151 this.node.record_lazy_child("@total", move || {
152 let weak_self_clone = weak_self_for_fut.clone();
153 async move {
154 if let Some(this) = weak_self_clone.upgrade() {
155 Ok(this.write_totals_to_inspect())
156 } else {
157 Ok(inspect::Inspector::default())
158 }
159 }
160 .boxed()
161 });
162
163 this
164 }
165
166 pub fn start_measuring(self: &Arc<Self>) {
169 let weak_self = Arc::downgrade(self);
170 self.measure();
171 *(self.sampler_task.lock()) = Some(fasync::Task::spawn(async move {
172 loop {
173 fasync::Timer::new(CPU_SAMPLE_PERIOD).await;
174 match weak_self.upgrade() {
175 None => break,
176 Some(this) => {
177 this.measure();
178 }
179 }
180 }
181 }));
182 }
183
184 fn track_ready(&self, moniker: ExtendedMoniker, task: T) {
186 let histogram = create_cpu_histogram(&self.histograms_node, &moniker);
187 if let Ok(task_info) = TaskInfo::try_from(task, Some(histogram), self.time_source.clone()) {
188 let koid = task_info.koid();
189 let arc_task_info = Arc::new(Mutex::new(task_info));
190 let mut stats = ComponentStats::new();
191 stats.add_task(arc_task_info.clone());
192 let stats = Arc::new(Mutex::new(stats));
193 self.tree.lock().insert(moniker, stats);
194 self.tasks.lock().insert(koid, Arc::downgrade(&arc_task_info));
195 }
196 }
197
198 fn write_measurements_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
199 let inspector =
200 inspect::Inspector::new(inspect::InspectorConfig::default().size(MAX_INSPECT_SIZE));
201 let components = inspector.root().create_child("components");
202 let (component_count, task_count) = self.write_measurements(&components);
203 self.write_aggregate_measurements(&components);
204 inspector.root().record_uint("component_count", component_count);
205 inspector.root().record_uint("task_count", task_count);
206 inspector.root().record(components);
207
208 let stats_node = inspect::stats::StatsNode::new(&inspector);
209 stats_node.record_data_to(inspector.root());
210
211 inspector
212 }
213
214 fn write_recent_usage_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
215 let inspector = inspect::Inspector::default();
216 self.totals.lock().write_recents_to(inspector.root());
217 inspector
218 }
219
220 fn write_totals_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
221 let inspector = inspect::Inspector::default();
222 self.totals.lock().write_totals_to(inspector.root());
223 inspector
224 }
225
226 fn write_aggregate_measurements(&self, components_node: &inspect::Node) {
227 let locked_aggregate = self.aggregated_dead_task_data.lock();
228 if locked_aggregate.no_true_measurements() {
229 return;
230 }
231
232 let aggregate = components_node.create_child(&*AGGREGATE_SAMPLES);
233 locked_aggregate.record_to_node(&aggregate);
234 components_node.record(aggregate);
235 }
236
237 fn write_measurements(&self, node: &inspect::Node) -> (u64, u64) {
238 let mut task_count = 0;
239 let tree = self.tree.lock();
240 for (moniker, stats) in tree.iter() {
241 let stats_guard = stats.lock();
242 let key = match moniker {
243 ExtendedMoniker::ComponentManager => moniker.to_string(),
244 ExtendedMoniker::ComponentInstance(m) => {
245 if *m == Moniker::root() {
246 "<root>".to_string()
247 } else {
248 m.to_string()
249 }
250 }
251 };
252 let child = node.create_child(key);
253 task_count += stats_guard.record_to_node(&child);
254 node.record(child);
255 }
256 (tree.len() as u64, task_count)
257 }
258
259 pub fn measure(self: &Arc<Self>) {
263 let start = zx::BootInstant::get();
264
265 let stats = self
267 .tree
268 .lock()
269 .iter()
270 .map(|(k, v)| (k.clone(), Arc::downgrade(&v)))
271 .collect::<Vec<_>>();
272 let mut locked_exited_measurements = self.exited_measurements.lock();
273 let mut aggregated = Measurement::clone_with_time(&*locked_exited_measurements, start);
274 let mut stats_to_remove = vec![];
275 let mut koids_to_remove = vec![];
276 for (moniker, weak_stats) in stats.into_iter() {
277 if let Some(stats) = weak_stats.upgrade() {
278 let mut stat_guard = stats.lock();
279 aggregated += &stat_guard.measure();
281 aggregated += &stat_guard.measure_tracked_dead_tasks();
282 let (mut stale_koids, exited_cpu_of_deleted) = stat_guard.clean_stale();
283 aggregated += &exited_cpu_of_deleted;
284 *locked_exited_measurements += &exited_cpu_of_deleted;
285 koids_to_remove.append(&mut stale_koids);
286 if !stat_guard.is_alive() {
287 stats_to_remove.push(moniker);
288 }
289 }
290 }
291
292 let mut stats = self.tree.lock();
294 for moniker in stats_to_remove {
295 if let Some(stat) = stats.get(&moniker) {
298 if !stat.lock().is_alive() {
299 stats.remove(&moniker);
300 }
301 }
302 }
303
304 let mut tasks = self.tasks.lock();
305 for koid in koids_to_remove {
306 tasks.remove(&koid);
307 }
308
309 self.totals.lock().insert(aggregated);
310 self.processing_times.insert((zx::BootInstant::get() - start).into_nanos());
311 }
312
313 fn prune_dead_tasks(self: &Arc<Self>, max_dead_tasks: usize) {
314 let mut all_dead_tasks = BTreeMap::new();
315 for (moniker, component) in self.tree.lock().iter() {
316 let dead_tasks = component.lock().gather_dead_tasks();
317 for (timestamp, task) in dead_tasks {
318 all_dead_tasks.insert(timestamp, (task, moniker.clone()));
319 }
320 }
321
322 if all_dead_tasks.len() <= max_dead_tasks {
323 return;
324 }
325
326 let total = all_dead_tasks.len();
327 let to_remove = all_dead_tasks.iter().take(total - (max_dead_tasks / 2));
328
329 let mut koids_to_remove = vec![];
330 let mut aggregate_data = self.aggregated_dead_task_data.lock();
331 for (_, (unlocked_task, _)) in to_remove {
332 let mut task = unlocked_task.lock();
333 if let Ok(measurements) = task.take_measurements_queue() {
334 koids_to_remove.push(task.koid());
335 *aggregate_data += measurements;
336 }
337 }
338
339 let mut stats_to_remove = vec![];
340 for (moniker, stats) in self.tree.lock().iter() {
341 let mut stat_guard = stats.lock();
342 stat_guard.remove_by_koids(&koids_to_remove);
343 if !stat_guard.is_alive() {
344 stats_to_remove.push(moniker.clone());
345 }
346 }
347
348 let mut stats = self.tree.lock();
349 for moniker in stats_to_remove {
350 if let Some(stat) = stats.get(&moniker) {
353 if !stat.lock().is_alive() {
354 stats.remove(&moniker);
355 }
356 }
357 }
358
359 let mut tasks = self.tasks.lock();
360 for koid in koids_to_remove {
361 tasks.remove(&koid);
362 }
363 }
364
365 fn on_component_started<P, C>(self: &Arc<Self>, moniker: &Moniker, runtime: &P)
366 where
367 P: ComponentStartedInfo<C, T>,
368 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
369 {
370 if let Some(receiver) = runtime.get_receiver() {
371 let task = fasync::Task::spawn(Self::diagnostics_waiter_task(
372 Arc::downgrade(&self),
373 moniker.clone().into(),
374 receiver,
375 runtime.start_time(),
376 ));
377 let _ = self.diagnostics_waiter_task_sender.unbounded_send(task);
378 }
379 }
380
381 async fn diagnostics_waiter_task<C>(
382 weak_self: Weak<Self>,
383 moniker: ExtendedMoniker,
384 receiver: oneshot::Receiver<C>,
385 start_time: zx::BootInstant,
386 ) where
387 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
388 {
389 let mut source = maybe_return!(receiver.await.ok());
390 let this = maybe_return!(weak_self.upgrade());
391 let mut tree_lock = this.tree.lock();
392 let stats = tree_lock
393 .entry(moniker.clone())
394 .or_insert_with(|| Arc::new(Mutex::new(ComponentStats::new())));
395 let histogram = create_cpu_histogram(&this.histograms_node, &moniker);
396 let mut task_info =
397 maybe_return!(source.take_component_task().and_then(|task| TaskInfo::try_from(
398 task,
399 Some(histogram),
400 this.time_source.clone()
401 )
402 .ok()));
403
404 let parent_koid = source
405 .take_parent_task()
406 .and_then(|task| TaskInfo::try_from(task, None, this.time_source.clone()).ok())
407 .map(|task| task.koid());
408 let koid = task_info.koid();
409
410 task_info.record_measurement_with_start_time(start_time);
416 task_info.measure_if_no_parent();
417
418 let mut task_guard = this.tasks.lock();
419
420 let task_info = match parent_koid {
421 None => {
422 Arc::new(Mutex::new(task_info))
425 }
426 Some(parent_koid) => {
427 task_info.has_parent_task = true;
428 let task_info = Arc::new(Mutex::new(task_info));
429 if let Some(parent) = task_guard.get(&parent_koid).and_then(|p| p.upgrade()) {
430 let mut parent_guard = parent.lock();
431 parent_guard.add_child(Arc::downgrade(&task_info));
432 }
433 task_info
434 }
435 };
436 task_guard.insert(koid, Arc::downgrade(&task_info));
437 stats.lock().add_task(task_info);
438 drop(task_guard);
439 drop(tree_lock);
440 this.prune_dead_tasks(MAX_DEAD_TASKS);
441 }
442}
443
444impl ComponentTreeStats<DiagnosticsTask> {
445 pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
446 vec![HooksRegistration::new(
447 "ComponentTreeStats",
448 vec![EventType::Started],
449 Arc::downgrade(self) as Weak<dyn Hook>,
450 )]
451 }
452
453 pub fn track_component_manager_stats(&self) {
455 match fuchsia_runtime::job_default().duplicate_handle(zx::Rights::SAME_RIGHTS) {
456 Ok(job) => {
457 self.track_ready(ExtendedMoniker::ComponentManager, DiagnosticsTask::Job(job));
458 }
459 Err(err) => warn!(
460 "Failed to duplicate component manager job. Not tracking its own stats: {:?}",
461 err
462 ),
463 }
464 }
465}
466
467#[async_trait]
468impl Hook for ComponentTreeStats<DiagnosticsTask> {
469 async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
470 let target_moniker = event
471 .target_moniker
472 .unwrap_instance_moniker_or(ModelError::UnexpectedComponentManagerMoniker)?;
473 match event.event_type() {
474 EventType::Started => {
475 if let EventPayload::Started { runtime, .. } = &event.payload {
476 self.on_component_started(target_moniker, runtime);
477 }
478 }
479 _ => {}
480 }
481 Ok(())
482 }
483}
484
485struct AggregatedStats {
486 measurements: VecDeque<Measurement>,
488}
489
490impl AggregatedStats {
491 fn new() -> Self {
492 Self { measurements: VecDeque::with_capacity(COMPONENT_CPU_MAX_SAMPLES) }
493 }
494
495 fn insert(&mut self, measurement: Measurement) {
496 while self.measurements.len() >= COMPONENT_CPU_MAX_SAMPLES {
497 self.measurements.pop_front();
498 }
499 self.measurements.push_back(measurement);
500 }
501
502 fn write_totals_to(&self, node: &inspect::Node) {
503 let count = self.measurements.len();
504 let timestamps = node.create_int_array(TIMESTAMPS, count);
505 let cpu_times = node.create_int_array(CPU_TIMES, count);
506 let queue_times = node.create_int_array(QUEUE_TIMES, count);
507 for (i, measurement) in self.measurements.iter().enumerate() {
508 timestamps.set(i, measurement.timestamp().into_nanos());
509 cpu_times.set(i, measurement.cpu_time().into_nanos());
510 queue_times.set(i, measurement.queue_time().into_nanos());
511 }
512 node.record(timestamps);
513 node.record(cpu_times);
514 node.record(queue_times);
515 }
516
517 fn write_recents_to(&self, node: &inspect::Node) {
518 if self.measurements.is_empty() {
519 return;
520 }
521 if self.measurements.len() >= 2 {
522 let measurement = self.measurements.get(self.measurements.len() - 2).unwrap();
523 node.record_int("previous_cpu_time", measurement.cpu_time().into_nanos());
524 node.record_int("previous_queue_time", measurement.queue_time().into_nanos());
525 node.record_int("previous_timestamp", measurement.timestamp().into_nanos());
526 }
527 let measurement = self.measurements.get(self.measurements.len() - 1).unwrap();
528 node.record_int("recent_cpu_time", measurement.cpu_time().into_nanos());
529 node.record_int("recent_queue_time", measurement.queue_time().into_nanos());
530 node.record_int("recent_timestamp", measurement.timestamp().into_nanos());
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use crate::task_metrics::testing::{FakeDiagnosticsContainer, FakeRuntime, FakeTask};
538 use diagnostics_assertions::{assert_data_tree, AnyProperty};
539 use diagnostics_hierarchy::DiagnosticsHierarchy;
540 use fuchsia_inspect::DiagnosticsHierarchyGetter;
541
542 use injectable_time::{FakeTime, IncrementingFakeTime};
543
544 #[fuchsia::test]
545 async fn total_tracks_cpu_after_termination() {
546 let inspector = inspect::Inspector::default();
547 let clock = Arc::new(FakeTime::new());
548 let stats = ComponentTreeStats::new_with_timesource(
549 inspector.root().create_child("stats"),
550 clock.clone(),
551 );
552
553 let mut previous_task_count = 0;
554 for i in 0..10 {
555 clock.add_ticks(1);
556 let component_task = FakeTask::new(
557 i as u64,
558 create_measurements_vec_for_fake_task(COMPONENT_CPU_MAX_SAMPLES as i64 * 3, 2, 4),
559 );
560
561 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
562 let fake_runtime =
563 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
564 stats.on_component_started(&moniker, &fake_runtime);
565
566 loop {
567 let current = stats.tree.lock().len();
568 if current != previous_task_count {
569 previous_task_count = current;
570 break;
571 }
572 fasync::Timer::new(fasync::MonotonicInstant::after(
573 zx::MonotonicDuration::from_millis(100i64),
574 ))
575 .await;
576 }
577 }
578
579 assert_eq!(stats.tasks.lock().len(), 10);
580 assert_eq!(stats.tree.lock().len(), 10);
581
582 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES - 2 {
583 stats.measure();
584 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
585 }
586
587 {
590 let totals = stats.totals.lock();
591 let recent_measurement = totals
592 .measurements
593 .get(totals.measurements.len() - 1)
594 .expect("there's at least one measurement");
595 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
596 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
597
598 let previous_measurement = totals
599 .measurements
600 .get(totals.measurements.len() - 2)
601 .expect("there's a previous measurement");
602 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
603 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320,);
604 }
605
606 for i in 0..10 {
608 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
609 for task in
610 stats.tree.lock().get(&moniker.into()).unwrap().lock().tasks_mut().iter_mut()
611 {
612 task.lock().force_terminate().await;
613 clock.add_ticks(1);
616 }
617 }
618
619 {
621 let totals = stats.totals.lock();
622 let recent_measurement = totals
623 .measurements
624 .get(totals.measurements.len() - 1)
625 .expect("there's at least one measurement");
626 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
627 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
628
629 let previous_measurement = totals
630 .measurements
631 .get(totals.measurements.len() - 2)
632 .expect("there's a previous measurement");
633 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
634 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320);
635 }
636
637 stats.measure();
639 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
640
641 {
642 let totals = stats.totals.lock();
643 let recent_measurement = totals
644 .measurements
645 .get(totals.measurements.len() - 1)
646 .expect("there's at least one measurement");
647 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
648 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
649
650 let previous_measurement = totals
651 .measurements
652 .get(totals.measurements.len() - 2)
653 .expect("there's a previous measurement");
654 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1180);
655 assert_eq!(previous_measurement.queue_time().into_nanos(), 2360);
656 }
657
658 stats.measure();
659 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
660
661 {
662 let totals = stats.totals.lock();
663 let recent_measurement = totals
664 .measurements
665 .get(totals.measurements.len() - 1)
666 .expect("there's at least one measurement");
667 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
668 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
669
670 let previous_measurement = totals
671 .measurements
672 .get(totals.measurements.len() - 2)
673 .expect("there's a previous measurement");
674 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
675 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
676 }
677
678 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
680 stats.measure();
681 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
682 }
683
684 assert_eq!(stats.tasks.lock().len(), 0);
686 assert_eq!(stats.tree.lock().len(), 0);
687
688 {
690 let totals = stats.totals.lock();
691 let recent_measurement = totals
692 .measurements
693 .get(totals.measurements.len() - 1)
694 .expect("there's at least one measurement");
695 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
696 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
697
698 let previous_measurement = totals
699 .measurements
700 .get(totals.measurements.len() - 2)
701 .expect("there's a previous measurement");
702 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
703 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
704 }
705 }
706
707 #[fuchsia::test]
708 async fn components_are_deleted_when_all_tasks_are_gone() {
709 let inspector = inspect::Inspector::default();
710 let clock = Arc::new(FakeTime::new());
711 let stats = ComponentTreeStats::new_with_timesource(
712 inspector.root().create_child("stats"),
713 clock.clone(),
714 );
715 let moniker: Moniker = ["a"].try_into().unwrap();
716 let moniker: ExtendedMoniker = moniker.into();
717 stats.track_ready(moniker.clone(), FakeTask::default());
718 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES {
719 stats.measure();
720 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
721 }
722 assert_eq!(stats.tree.lock().len(), 1);
723 assert_eq!(stats.tasks.lock().len(), 1);
724 assert_eq!(
725 stats.tree.lock().get(&moniker).unwrap().lock().total_measurements(),
726 COMPONENT_CPU_MAX_SAMPLES
727 );
728
729 for task in stats.tree.lock().get(&moniker).unwrap().lock().tasks_mut().iter_mut() {
731 task.lock().force_terminate().await;
732 clock.add_ticks(1);
733 }
734
735 for i in 0..COMPONENT_CPU_MAX_SAMPLES {
737 stats.measure();
738 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
739 assert_eq!(
740 stats.tree.lock().get(&moniker).unwrap().lock().total_measurements(),
741 COMPONENT_CPU_MAX_SAMPLES - i,
742 );
743 }
744 stats.measure();
745 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
746 assert!(stats.tree.lock().get(&moniker).is_none());
747 assert_eq!(stats.tree.lock().len(), 0);
748 assert_eq!(stats.tasks.lock().len(), 0);
749 }
750
751 fn create_measurements_vec_for_fake_task(
752 num_measurements: i64,
753 init_cpu: i64,
754 init_queue: i64,
755 ) -> Vec<zx::TaskRuntimeInfo> {
756 let mut v = vec![];
757 for i in 0..num_measurements {
758 v.push(zx::TaskRuntimeInfo {
759 cpu_time: i * init_cpu,
760 queue_time: i * init_queue,
761 ..zx::TaskRuntimeInfo::default()
762 });
763 }
764
765 v
766 }
767
768 #[fuchsia::test]
769 async fn dead_tasks_are_pruned() {
770 let clock = Arc::new(FakeTime::new());
771 let inspector = inspect::Inspector::default();
772 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
773 inspector.root().create_child("stats"),
774 clock.clone(),
775 ));
776
777 let mut previous_task_count = 0;
778 for i in 0..(MAX_DEAD_TASKS * 2) {
779 clock.add_ticks(1);
780 let component_task =
781 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(300, 2, 4));
782
783 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
784 let fake_runtime =
785 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
786 stats.on_component_started(&moniker, &fake_runtime);
787
788 loop {
789 let current = stats.tree.lock().len();
790 if current != previous_task_count {
791 previous_task_count = current;
792 break;
793 }
794 fasync::Timer::new(fasync::MonotonicInstant::after(
795 zx::MonotonicDuration::from_millis(100i64),
796 ))
797 .await;
798 }
799
800 for task in
801 stats.tree.lock().get(&moniker.into()).unwrap().lock().tasks_mut().iter_mut()
802 {
803 task.lock().force_terminate().await;
804 clock.add_ticks(1);
805 }
806 }
807
808 let task_count = stats.tasks.lock().len();
809 let moniker_count = stats.tree.lock().len();
810 assert_eq!(task_count, 88);
811 assert_eq!(moniker_count, 88);
812 }
813
814 #[fuchsia::test]
815 async fn aggregated_data_available_inspect() {
816 let max_dead_tasks = 4;
817 let clock = Arc::new(FakeTime::new());
818 let inspector = inspect::Inspector::default();
819 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
820 inspector.root().create_child("stats"),
821 clock.clone(),
822 ));
823
824 let mut moniker_list = vec![];
825 for i in 0..(max_dead_tasks * 2) {
826 clock.add_ticks(1);
827 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
828 moniker_list.push(moniker.clone());
829 let component_task =
830 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(5, 1, 1));
831 stats.track_ready(moniker.into(), component_task);
832 }
833
834 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
835 stats.measure();
836 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
837 stats.measure();
838 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
839 stats.measure();
840
841 assert_data_tree!(inspector, root: {
842 stats: contains {
843 measurements: contains {
844 components: {
845 "moniker-0": contains {},
846 "moniker-1": contains {},
847 "moniker-2": contains {},
848 "moniker-3": contains {},
849 "moniker-4": contains {},
850 "moniker-5": contains {},
851 "moniker-6": contains {},
852 "moniker-7": contains {},
853 }
854 }
855 }
856 });
857
858 for moniker in moniker_list {
859 for task in stats
860 .tree
861 .lock()
862 .get(&moniker.clone().into())
863 .unwrap()
864 .lock()
865 .tasks_mut()
866 .iter_mut()
867 {
868 task.lock().force_terminate().await;
869 clock.add_ticks(1);
872 }
873 }
874
875 stats.prune_dead_tasks(max_dead_tasks);
876
877 let hierarchy = inspector.get_diagnostics_hierarchy().await;
878 assert_data_tree!(inspector, root: {
879 stats: contains {
880 measurements: contains {
881 components: {
882 "@aggregated": {
883 "timestamps": AnyProperty,
884 "cpu_times": vec![0i64, 6i64, 12i64],
885 "queue_times": vec![0i64, 6i64, 12i64],
886 },
887 "moniker-6": contains {},
888 "moniker-7": contains {},
889 }
890 }
891 }
892 });
893 let (timestamps, _, _) = get_data(&hierarchy, "@aggregated", None);
894 assert_eq!(timestamps.len(), 3);
895 assert!(timestamps[1] > timestamps[0]);
896 assert!(timestamps[2] > timestamps[1]);
897 }
898
899 #[fuchsia::test]
900 async fn total_holds_sum_of_stats() {
901 let inspector = inspect::Inspector::default();
902 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
903 stats.measure();
904 stats.track_ready(
905 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
906 FakeTask::new(
907 1,
908 vec![
909 zx::TaskRuntimeInfo {
910 cpu_time: 2,
911 queue_time: 4,
912 ..zx::TaskRuntimeInfo::default()
913 },
914 zx::TaskRuntimeInfo {
915 cpu_time: 6,
916 queue_time: 8,
917 ..zx::TaskRuntimeInfo::default()
918 },
919 ],
920 ),
921 );
922 stats.track_ready(
923 ExtendedMoniker::ComponentInstance(["b"].try_into().unwrap()),
924 FakeTask::new(
925 2,
926 vec![
927 zx::TaskRuntimeInfo {
928 cpu_time: 1,
929 queue_time: 3,
930 ..zx::TaskRuntimeInfo::default()
931 },
932 zx::TaskRuntimeInfo {
933 cpu_time: 5,
934 queue_time: 7,
935 ..zx::TaskRuntimeInfo::default()
936 },
937 ],
938 ),
939 );
940
941 stats.measure();
942 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
943 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
944 assert_eq!(timestamps.len(), 2);
945 assert_eq!(cpu_times, vec![0, 2 + 1]);
946 assert_eq!(queue_times, vec![0, 4 + 3]);
947
948 stats.measure();
949 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
950 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
951 assert_eq!(timestamps.len(), 3);
952 assert_eq!(cpu_times, vec![0, 2 + 1, 6 + 5]);
953 assert_eq!(queue_times, vec![0, 4 + 3, 8 + 7]);
954 }
955
956 #[fuchsia::test]
957 async fn recent_usage() {
958 let inspector = inspect::Inspector::default();
960 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
961 stats.measure();
962
963 stats.track_ready(
964 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
965 FakeTask::new(
966 1,
967 vec![
968 zx::TaskRuntimeInfo {
969 cpu_time: 2,
970 queue_time: 4,
971 ..zx::TaskRuntimeInfo::default()
972 },
973 zx::TaskRuntimeInfo {
974 cpu_time: 6,
975 queue_time: 8,
976 ..zx::TaskRuntimeInfo::default()
977 },
978 ],
979 ),
980 );
981 stats.track_ready(
982 ExtendedMoniker::ComponentInstance(["b"].try_into().unwrap()),
983 FakeTask::new(
984 2,
985 vec![
986 zx::TaskRuntimeInfo {
987 cpu_time: 1,
988 queue_time: 3,
989 ..zx::TaskRuntimeInfo::default()
990 },
991 zx::TaskRuntimeInfo {
992 cpu_time: 5,
993 queue_time: 7,
994 ..zx::TaskRuntimeInfo::default()
995 },
996 ],
997 ),
998 );
999
1000 stats.measure();
1001 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1002
1003 assert_data_tree!(&hierarchy, root: contains {
1006 stats: contains {
1007 recent_usage: {
1008 previous_cpu_time: 0i64,
1009 previous_queue_time: 0i64,
1010 previous_timestamp: AnyProperty,
1011 recent_cpu_time: 2 + 1i64,
1012 recent_queue_time: 4 + 3i64,
1013 recent_timestamp: AnyProperty,
1014 }
1015 }
1016 });
1017
1018 let initial_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1020 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
1021 assert_eq!(timestamps.len(), 2);
1022 assert_eq!(timestamps[1], initial_timestamp);
1023 assert_eq!(cpu_times, vec![0, 2 + 1]);
1024 assert_eq!(queue_times, vec![0, 4 + 3]);
1025
1026 stats.measure();
1028 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1029
1030 assert_data_tree!(&hierarchy, root: contains {
1032 stats: contains {
1033 recent_usage: {
1034 previous_cpu_time: 2 + 1i64,
1035 previous_queue_time: 4 + 3i64,
1036 previous_timestamp: initial_timestamp,
1037 recent_cpu_time: 6 + 5i64,
1038 recent_queue_time: 8 + 7i64,
1039 recent_timestamp: AnyProperty,
1040 }
1041 }
1042 });
1043
1044 let recent_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1046 assert!(recent_timestamp > initial_timestamp);
1047 }
1048
1049 #[fuchsia::test]
1050 async fn component_stats_are_available_in_inspect() {
1051 let inspector = inspect::Inspector::default();
1052 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
1053 stats.track_ready(
1054 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
1055 FakeTask::new(
1056 1,
1057 vec![
1058 zx::TaskRuntimeInfo {
1059 cpu_time: 2,
1060 queue_time: 4,
1061 ..zx::TaskRuntimeInfo::default()
1062 },
1063 zx::TaskRuntimeInfo {
1064 cpu_time: 6,
1065 queue_time: 8,
1066 ..zx::TaskRuntimeInfo::default()
1067 },
1068 ],
1069 ),
1070 );
1071
1072 stats.measure();
1073
1074 let hierarchy = inspector.get_diagnostics_hierarchy().await;
1075 assert_data_tree!(hierarchy, root: {
1076 stats: contains {
1077 measurements: contains {
1078 components: {
1079 "a": {
1080 "1": {
1081 timestamps: AnyProperty,
1082 cpu_times: vec![2i64],
1083 queue_times: vec![4i64],
1084 }
1085 }
1086 }
1087 }
1088 }
1089 });
1090 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1091 assert_eq!(timestamps.len(), 1);
1092
1093 stats.measure();
1095
1096 let hierarchy = inspector.get_diagnostics_hierarchy().await;
1097 assert_data_tree!(hierarchy, root: {
1098 stats: contains {
1099 measurements: contains {
1100 components: {
1101 "a": {
1102 "1": {
1103 timestamps: AnyProperty,
1104 cpu_times: vec![2i64, 6],
1105 queue_times: vec![4i64, 8],
1106 }
1107 }
1108 }
1109 }
1110 }
1111 });
1112 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1113 assert_eq!(timestamps.len(), 2);
1114 assert!(timestamps[1] > timestamps[0]);
1115 }
1116
1117 #[fuchsia::test]
1118 async fn on_started_handles_parent_task() {
1119 let inspector = inspect::Inspector::default();
1120 let clock = Arc::new(FakeTime::new());
1121 clock.add_ticks(20);
1124 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
1125 inspector.root().create_child("stats"),
1126 clock.clone(),
1127 ));
1128 let parent_task = FakeTask::new(
1129 1,
1130 vec![
1131 zx::TaskRuntimeInfo {
1132 cpu_time: 20,
1133 queue_time: 40,
1134 ..zx::TaskRuntimeInfo::default()
1135 },
1136 zx::TaskRuntimeInfo {
1137 cpu_time: 60,
1138 queue_time: 80,
1139 ..zx::TaskRuntimeInfo::default()
1140 },
1141 ],
1142 );
1143 let component_task = FakeTask::new(
1144 2,
1145 vec![
1146 zx::TaskRuntimeInfo {
1147 cpu_time: 2,
1148 queue_time: 4,
1149 ..zx::TaskRuntimeInfo::default()
1150 },
1151 zx::TaskRuntimeInfo {
1152 cpu_time: 6,
1153 queue_time: 8,
1154 ..zx::TaskRuntimeInfo::default()
1155 },
1156 ],
1157 );
1158
1159 let fake_runtime = FakeRuntime::new_with_start_times(
1160 FakeDiagnosticsContainer::new(parent_task.clone(), None),
1161 IncrementingFakeTime::new(3, std::time::Duration::from_nanos(5)),
1162 );
1163 stats.on_component_started(&Moniker::try_from(["parent"]).unwrap(), &fake_runtime);
1164
1165 let fake_runtime = FakeRuntime::new_with_start_times(
1166 FakeDiagnosticsContainer::new(component_task, Some(parent_task)),
1167 IncrementingFakeTime::new(8, std::time::Duration::from_nanos(5)),
1168 );
1169 stats.on_component_started(&Moniker::try_from(["child"]).unwrap(), &fake_runtime);
1170
1171 loop {
1174 if stats.tree.lock().len() == 2 {
1175 break;
1176 }
1177 fasync::Timer::new(fasync::MonotonicInstant::after(
1178 zx::MonotonicDuration::from_millis(100i64),
1179 ))
1180 .await;
1181 }
1182
1183 assert_data_tree!(inspector, root: {
1184 stats: contains {
1185 measurements: contains {
1186 components: {
1187 "parent": {
1188 "1": {
1189 "timestamps": AnyProperty,
1190 "cpu_times": vec![0i64, 20],
1191 "queue_times": vec![0i64, 40],
1192 },
1193 },
1194 "child": {
1195 "2": {
1196 "timestamps": AnyProperty,
1197 "cpu_times": vec![0i64, 2],
1198 "queue_times": vec![0i64, 4],
1199 }
1200 }
1201 }
1202 }
1203 }
1204 });
1205 }
1206
1207 #[fuchsia::test]
1208 async fn child_tasks_garbage_collection() {
1209 let inspector = inspect::Inspector::default();
1210 let clock = Arc::new(FakeTime::new());
1211 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
1212 inspector.root().create_child("stats"),
1213 clock.clone(),
1214 ));
1215 let parent_task = FakeTask::new(
1216 1,
1217 vec![
1218 zx::TaskRuntimeInfo {
1219 cpu_time: 20,
1220 queue_time: 40,
1221 ..zx::TaskRuntimeInfo::default()
1222 },
1223 zx::TaskRuntimeInfo {
1224 cpu_time: 60,
1225 queue_time: 80,
1226 ..zx::TaskRuntimeInfo::default()
1227 },
1228 ],
1229 );
1230 let component_task = FakeTask::new(
1231 2,
1232 vec![zx::TaskRuntimeInfo {
1233 cpu_time: 2,
1234 queue_time: 4,
1235 ..zx::TaskRuntimeInfo::default()
1236 }],
1237 );
1238 let fake_parent_runtime =
1239 FakeRuntime::new(FakeDiagnosticsContainer::new(parent_task.clone(), None));
1240 stats.on_component_started(&Moniker::try_from(["parent"]).unwrap(), &fake_parent_runtime);
1241
1242 let child_moniker = Moniker::try_from(["child"]).unwrap();
1243 let fake_runtime =
1244 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, Some(parent_task)));
1245 stats.on_component_started(&child_moniker, &fake_runtime);
1246
1247 loop {
1250 if stats.tree.lock().len() == 2 {
1251 break;
1252 }
1253 fasync::Timer::new(fasync::MonotonicInstant::after(
1254 zx::MonotonicDuration::from_millis(100i64),
1255 ))
1256 .await;
1257 }
1258
1259 assert_eq!(stats.tree.lock().len(), 2);
1260 assert_eq!(stats.tasks.lock().len(), 2);
1261
1262 let extended_moniker = child_moniker.into();
1263 for task in stats.tree.lock().get(&extended_moniker).unwrap().lock().tasks_mut() {
1265 task.lock().force_terminate().await;
1266 clock.add_ticks(1);
1267 }
1268
1269 stats.measure();
1271 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1272
1273 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
1275 stats.measure();
1276 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1277 }
1278
1279 stats.measure();
1281
1282 assert!(stats.tree.lock().get(&extended_moniker).is_none());
1284 assert_eq!(stats.tree.lock().len(), 1);
1285 assert_eq!(stats.tasks.lock().len(), 1);
1286 }
1287
1288 fn get_recent_property(hierarchy: &DiagnosticsHierarchy, name: &str) -> i64 {
1289 hierarchy.get_property_by_path(&vec!["stats", "recent_usage", name]).unwrap().int().unwrap()
1290 }
1291
1292 fn get_data(
1293 hierarchy: &DiagnosticsHierarchy,
1294 moniker: &str,
1295 task: Option<&str>,
1296 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1297 let mut path = vec!["stats", "measurements", "components", moniker];
1298 if let Some(task) = task {
1299 path.push(task);
1300 }
1301 get_data_at(&hierarchy, &path)
1302 }
1303
1304 fn get_data_at(
1305 hierarchy: &DiagnosticsHierarchy,
1306 path: &[&str],
1307 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1308 let node = hierarchy.get_child_by_path(&path).expect("found stats node");
1309 let cpu_times = node
1310 .get_property("cpu_times")
1311 .expect("found cpu")
1312 .int_array()
1313 .expect("cpu are ints")
1314 .raw_values();
1315 let queue_times = node
1316 .get_property("queue_times")
1317 .expect("found queue")
1318 .int_array()
1319 .expect("queue are ints")
1320 .raw_values();
1321 let timestamps = node
1322 .get_property("timestamps")
1323 .expect("found timestamps")
1324 .int_array()
1325 .expect("timestamps are ints")
1326 .raw_values();
1327 (timestamps.into_owned(), cpu_times.into_owned(), queue_times.into_owned())
1328 }
1329}