1#![warn(clippy::unwrap_used)]
5#![cfg_attr(test, allow(clippy::unwrap_used))]
6
7use anyhow::{anyhow, Error, Result};
8use attribution_processing::digest::{BucketDefinition, Digest};
9use attribution_processing::AttributionDataProvider;
10use fpressure::WatcherRequest;
11use fuchsia_async::{MonotonicDuration, MonotonicInstant, Task, WakeupTime};
12use fuchsia_inspect::{ArrayProperty, Inspector, Node};
13use fuchsia_inspect_contrib::nodes::BoundedListNode;
14use futures::{select, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
15use inspect_runtime::PublishedInspectController;
16use log::{debug, warn};
17use memory_monitor2_config::Config;
18use stalls::StallProvider;
19use std::sync::Arc;
20use std::u64;
21use {
22 fidl_fuchsia_kernel as fkernel, fidl_fuchsia_memorypressure as fpressure, fuchsia_inspect as _,
23 inspect_runtime as _,
24};
25
26pub struct ServiceTask {
29 _inspect_controller: PublishedInspectController,
30 _periodic_digest: Task<Result<(), anyhow::Error>>,
31}
32
33pub fn start_service(
36 attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
37 kernel_stats_proxy: fkernel::StatsProxy,
38 stall_provider: Arc<impl StallProvider>,
39 memory_monitor2_config: Config,
40 memorypressure_proxy: fpressure::ProviderProxy,
41 bucket_definitions: Arc<[BucketDefinition]>,
42) -> Result<ServiceTask> {
43 debug!("Start serving inspect tree.");
44
45 let inspector = fuchsia_inspect::component::inspector();
48
49 let inspect_controller =
51 inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
52 .ok_or_else(|| anyhow!("Failed to serve server handling `fuchsia.inspect.Tree`"))?;
53
54 build_inspect_tree(
55 kernel_stats_proxy.clone(),
56 stall_provider.clone(),
57 inspector,
58 &memory_monitor2_config,
59 );
60 let digest_service = digest_service(
61 memory_monitor2_config,
62 attribution_data_service,
63 stall_provider,
64 kernel_stats_proxy,
65 memorypressure_proxy,
66 bucket_definitions,
67 inspector.root().create_child("logger"),
68 )?;
69 Ok(ServiceTask { _inspect_controller: inspect_controller, _periodic_digest: digest_service })
70}
71
72fn build_inspect_tree(
73 kernel_stats_proxy: fkernel::StatsProxy,
74 stall_provider: Arc<impl StallProvider>,
75 inspector: &Inspector,
76 config: &Config,
77) {
78 inspector.root().record_child("config", |node| config.record_inspect(node));
80
81 {
83 let kernel_stats_proxy = kernel_stats_proxy.clone();
84 inspector.root().record_lazy_child("kmem_stats", move || {
85 let kernel_stats_proxy = kernel_stats_proxy.clone();
86 async move {
87 let inspector = Inspector::default();
88 let root = inspector.root();
89 let mem_stats = kernel_stats_proxy.get_memory_stats().await?;
90 mem_stats.total_bytes.map(|v| root.record_uint("total_bytes", v));
91 mem_stats.free_bytes.map(|v| root.record_uint("free_bytes", v));
92 mem_stats.free_loaned_bytes.map(|v| root.record_uint("free_loaned_bytes", v));
93 mem_stats.wired_bytes.map(|v| root.record_uint("wired_bytes", v));
94 mem_stats.total_heap_bytes.map(|v| root.record_uint("total_heap_bytes", v));
95 mem_stats.free_heap_bytes.map(|v| root.record_uint("free_heap_bytes", v));
96 mem_stats.vmo_bytes.map(|v| root.record_uint("vmo_bytes", v));
97 mem_stats.mmu_overhead_bytes.map(|v| root.record_uint("mmu_overhead_bytes", v));
98 mem_stats.ipc_bytes.map(|v| root.record_uint("ipc_bytes", v));
99 mem_stats.cache_bytes.map(|v| root.record_uint("cache_bytes", v));
100 mem_stats.slab_bytes.map(|v| root.record_uint("slab_bytes", v));
101 mem_stats.zram_bytes.map(|v| root.record_uint("zram_bytes", v));
102 mem_stats.other_bytes.map(|v| root.record_uint("other_bytes", v));
103 mem_stats
104 .vmo_reclaim_total_bytes
105 .map(|v| root.record_uint("vmo_reclaim_total_bytes", v));
106 mem_stats
107 .vmo_reclaim_newest_bytes
108 .map(|v| root.record_uint("vmo_reclaim_newest_bytes", v));
109 mem_stats
110 .vmo_reclaim_oldest_bytes
111 .map(|v| root.record_uint("vmo_reclaim_oldest_bytes", v));
112 mem_stats
113 .vmo_reclaim_disabled_bytes
114 .map(|v| root.record_uint("vmo_reclaim_disabled_bytes", v));
115 mem_stats
116 .vmo_discardable_locked_bytes
117 .map(|v| root.record_uint("vmo_discardable_locked_bytes", v));
118 mem_stats
119 .vmo_discardable_unlocked_bytes
120 .map(|v| root.record_uint("vmo_discardable_unlocked_bytes", v));
121 Ok(inspector)
122 }
123 .boxed()
124 })
125 };
126
127 {
128 inspector.root().record_lazy_child("kmem_stats_compression", move || {
129 let kernel_stats_proxy = kernel_stats_proxy.clone();
130 async move {
131 let inspector = Inspector::default();
132 let cmp_stats = kernel_stats_proxy.get_memory_stats_compression().await?;
133 cmp_stats
134 .uncompressed_storage_bytes
135 .map(|v| inspector.root().record_uint("uncompressed_storage_bytes", v));
136 cmp_stats
137 .compressed_storage_bytes
138 .map(|v| inspector.root().record_uint("compressed_storage_bytes", v));
139 cmp_stats
140 .compressed_fragmentation_bytes
141 .map(|v| inspector.root().record_uint("compressed_fragmentation_bytes", v));
142 cmp_stats
143 .compression_time
144 .map(|v| inspector.root().record_int("compression_time", v));
145 cmp_stats
146 .decompression_time
147 .map(|v| inspector.root().record_int("decompression_time", v));
148 cmp_stats
149 .total_page_compression_attempts
150 .map(|v| inspector.root().record_uint("total_page_compression_attempts", v));
151 cmp_stats
152 .failed_page_compression_attempts
153 .map(|v| inspector.root().record_uint("failed_page_compression_attempts", v));
154 cmp_stats
155 .total_page_decompressions
156 .map(|v| inspector.root().record_uint("total_page_decompressions", v));
157 cmp_stats
158 .compressed_page_evictions
159 .map(|v| inspector.root().record_uint("compressed_page_evictions", v));
160 cmp_stats
161 .eager_page_compressions
162 .map(|v| inspector.root().record_uint("eager_page_compressions", v));
163 cmp_stats
164 .memory_pressure_page_compressions
165 .map(|v| inspector.root().record_uint("memory_pressure_page_compressions", v));
166 cmp_stats
167 .critical_memory_page_compressions
168 .map(|v| inspector.root().record_uint("critical_memory_page_compressions", v));
169 cmp_stats
170 .pages_decompressed_unit_ns
171 .map(|v| inspector.root().record_uint("pages_decompressed_unit_ns", v));
172 cmp_stats.pages_decompressed_within_log_time.map(|v| {
173 let array =
174 inspector.root().create_uint_array("pages_decompressed_within_log_time", 8);
175 array.set(0, v[0]);
177 array.set(1, v[1]);
178 array.set(2, v[2]);
179 array.set(3, v[3]);
180 array.set(4, v[4]);
181 array.set(5, v[5]);
182 array.set(6, v[6]);
183 array.set(7, v[7]);
184 inspector.root().record(array);
185 });
186 Ok(inspector)
187 }
188 .boxed()
189 });
190 }
191
192 {
193 inspector.root().record_lazy_child("stalls", move || {
194 let stall_provider = stall_provider.clone();
195 async move {
196 let stall_info = stall_provider.get_stall_info()?;
197 let inspector = Inspector::default();
198 inspector.root().record_uint(
199 "some_ms",
200 stall_info.some.as_millis().try_into().unwrap_or(u64::MAX),
201 );
202 inspector.root().record_uint(
203 "full_ms",
204 stall_info.full.as_millis().try_into().unwrap_or(u64::MAX),
205 );
206 Ok(inspector)
207 }
208 .boxed()
209 });
210 }
211}
212
213fn pressure_to_deadline(level: fpressure::Level, config: &Config) -> MonotonicInstant {
214 MonotonicInstant::now()
215 + MonotonicDuration::from_seconds(match level {
216 fpressure::Level::Normal => config.normal_capture_delay_s,
217 fpressure::Level::Warning => config.warning_capture_delay_s,
218 fpressure::Level::Critical => config.critical_capture_delay_s,
219 } as i64)
220}
221
222fn digest_service(
223 memory_monitor2_config: Config,
224 attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
225 stall_provider: Arc<impl StallProvider>,
226 kernel_stats_proxy: fkernel::StatsProxy,
227 memorypressure_proxy: fpressure::ProviderProxy,
228 bucket_definitions: Arc<[BucketDefinition]>,
229 digest_node: Node,
230) -> Result<Task<Result<(), Error>>> {
231 let (watcher, watcher_stream) =
233 fidl::endpoints::create_request_stream::<fpressure::WatcherMarker>();
234 memorypressure_proxy.register_watcher(watcher)?;
235
236 Ok(fuchsia_async::Task::spawn(async move {
237 let mut buckets_list_node =
238 BoundedListNode::new(digest_node.create_child("measurements"), 100);
240 let buckets_names = std::cell::OnceCell::new();
241 let attribution_data_service = attribution_data_service;
242 let pressure_stream = watcher_stream.map_err(anyhow::Error::from);
243
244 let (request, mut pressure_stream) = pressure_stream.into_future().await;
246 let mut current_level = {
247 let WatcherRequest::OnLevelChanged { level, responder } = request.ok_or_else(|| {
248 anyhow::Error::msg(
249 "Unexpectedly exhausted pressure stream before receiving baseline pressure level",
250 )
251 })??;
252 responder.send()?;
253 level
254 };
255 let mut deadline = pressure_to_deadline(current_level, &memory_monitor2_config);
256 let mut timer = Box::pin(deadline.into_timer());
257 loop {
258 let () = select! {
261 pressure = pressure_stream.next() =>
264 match pressure.ok_or_else(|| anyhow::Error::msg("Unexpectedly exhausted pressure stream"))?? {
265 WatcherRequest::OnLevelChanged{level, responder} => {
266 responder.send()?;
267 if level == current_level { continue; }
269 current_level = level;
270 let new_deadline = pressure_to_deadline(level, &memory_monitor2_config);
271 if memory_monitor2_config.capture_on_pressure_change {
272 deadline = new_deadline;
274 timer = Box::pin(deadline.into_timer());
275 } else {
276 if deadline > new_deadline {
280 deadline = new_deadline;
281 timer = Box::pin(deadline.into_timer());
282 }
283 continue;
284 }
285 },
286 },
287 _ = timer => {
289 deadline = pressure_to_deadline(current_level, &memory_monitor2_config);
290 timer = Box::pin(deadline.into_timer());
291 },
292 };
293
294 let timestamp = zx::BootInstant::get();
295 let (kmem_stats, kmem_stats_compression) = try_join!(
297 kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
298 kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
299 )?;
300
301 let Digest { buckets } = Digest::compute(
303 &*attribution_data_service,
304 &kmem_stats,
305 &kmem_stats_compression,
306 &*bucket_definitions,
307 )?;
308
309 let _ = buckets_names.get_or_init(|| {
311 let buckets_names = digest_node.create_string_array("buckets", buckets.len());
313 for (i, attribution_processing::digest::Bucket { name, .. }) in
314 buckets.iter().enumerate()
315 {
316 buckets_names.set(i, name);
317 }
318 buckets_names
319 });
320
321 let stall_values = stall_provider.get_stall_info()?;
322
323 buckets_list_node.add_entry(|n| {
325 n.record_int("timestamp", timestamp.into_nanos());
326 let ia = n.create_uint_array("bucket_sizes", buckets.len());
327 for (i, b) in buckets.iter().enumerate() {
328 ia.set(i, b.size as u64);
329 }
330 n.record(ia);
331 n.record_child("stalls", |child| {
332 child.record_uint(
333 "some_ms",
334 stall_values.some.as_millis().try_into().unwrap_or(u64::MAX),
335 );
336 child.record_uint(
337 "full_ms",
338 stall_values.full.as_millis().try_into().unwrap_or(u64::MAX),
339 );
340 });
341 });
342 }
343 }))
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use attribution_processing::testing::FakeAttributionDataProvider;
350 use attribution_processing::{
351 Attribution, AttributionData, Principal, PrincipalDescription, PrincipalIdentifier,
352 PrincipalType, Resource, ResourceReference, ZXName,
353 };
354 use diagnostics_assertions::{assert_data_tree, NonZeroIntProperty};
355 use fuchsia_async::TestExecutor;
356 use futures::task::Poll;
357 use futures::TryStreamExt;
358 use stalls::MemoryStallMetrics;
359 use std::time::Duration;
360 use {
361 fidl_fuchsia_memory_attribution_plugin as fplugin,
362 fidl_fuchsia_memorypressure as fpressure, fuchsia_async as fasync,
363 };
364
365 fn get_attribution_data_provider() -> Arc<impl AttributionDataProvider + 'static> {
366 let attribution_data = AttributionData {
367 principals_vec: vec![Principal {
368 identifier: PrincipalIdentifier(1),
369 description: PrincipalDescription::Component("principal".to_owned()),
370 principal_type: PrincipalType::Runnable,
371 parent: None,
372 }],
373 resources_vec: vec![Resource {
374 koid: 10,
375 name_index: 0,
376 resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
377 parent: None,
378 private_committed_bytes: Some(1024),
379 private_populated_bytes: Some(2048),
380 scaled_committed_bytes: Some(1024),
381 scaled_populated_bytes: Some(2048),
382 total_committed_bytes: Some(1024),
383 total_populated_bytes: Some(2048),
384 ..Default::default()
385 }),
386 }],
387 resource_names: vec![ZXName::from_string_lossy("resource")],
388 attributions: vec![Attribution {
389 source: PrincipalIdentifier(1),
390 subject: PrincipalIdentifier(1),
391 resources: vec![ResourceReference::KernelObject(10)],
392 }],
393 };
394 Arc::new(FakeAttributionDataProvider { attribution_data })
395 }
396
397 async fn serve_kernel_stats(
398 mut request_stream: fkernel::StatsRequestStream,
399 ) -> Result<(), fidl::Error> {
400 while let Some(request) = request_stream.try_next().await? {
401 match request {
402 fkernel::StatsRequest::GetMemoryStats { responder } => {
403 responder
404 .send(&fkernel::MemoryStats {
405 total_bytes: Some(1),
406 free_bytes: Some(2),
407 wired_bytes: Some(3),
408 total_heap_bytes: Some(4),
409 free_heap_bytes: Some(5),
410 vmo_bytes: Some(6),
411 mmu_overhead_bytes: Some(7),
412 ipc_bytes: Some(8),
413 other_bytes: Some(9),
414 free_loaned_bytes: Some(10),
415 cache_bytes: Some(11),
416 slab_bytes: Some(12),
417 zram_bytes: Some(13),
418 vmo_reclaim_total_bytes: Some(14),
419 vmo_reclaim_newest_bytes: Some(15),
420 vmo_reclaim_oldest_bytes: Some(16),
421 vmo_reclaim_disabled_bytes: Some(17),
422 vmo_discardable_locked_bytes: Some(18),
423 vmo_discardable_unlocked_bytes: Some(19),
424 ..Default::default()
425 })
426 .unwrap();
427 }
428 fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
429 unimplemented!("Deprecated call, should not be used")
430 }
431 fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
432 responder
433 .send(&fkernel::MemoryStatsCompression {
434 uncompressed_storage_bytes: Some(20),
435 compressed_storage_bytes: Some(21),
436 compressed_fragmentation_bytes: Some(22),
437 compression_time: Some(23),
438 decompression_time: Some(24),
439 total_page_compression_attempts: Some(25),
440 failed_page_compression_attempts: Some(26),
441 total_page_decompressions: Some(27),
442 compressed_page_evictions: Some(28),
443 eager_page_compressions: Some(29),
444 memory_pressure_page_compressions: Some(30),
445 critical_memory_page_compressions: Some(31),
446 pages_decompressed_unit_ns: Some(32),
447 pages_decompressed_within_log_time: Some([
448 40, 41, 42, 43, 44, 45, 46, 47,
449 ]),
450 ..Default::default()
451 })
452 .unwrap();
453 }
454 fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
455 fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
456 }
457 }
458 Ok(())
459 }
460
461 struct FakeStallProvider {}
462 impl StallProvider for FakeStallProvider {
463 fn get_stall_info(&self) -> Result<MemoryStallMetrics, anyhow::Error> {
464 Ok(MemoryStallMetrics {
465 some: Duration::from_millis(10),
466 full: Duration::from_millis(20),
467 })
468 }
469 }
470
471 #[test]
472 fn test_build_inspect_tree() {
473 let mut exec = fasync::TestExecutor::new();
474 let (stats_provider, stats_request_stream) =
475 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
476
477 fasync::Task::spawn(async move {
478 serve_kernel_stats(stats_request_stream).await.unwrap();
479 })
480 .detach();
481
482 let inspector = fuchsia_inspect::Inspector::default();
483
484 build_inspect_tree(
485 stats_provider,
486 Arc::new(FakeStallProvider {}),
487 &inspector,
488 &Config {
489 capture_on_pressure_change: true,
490 critical_capture_delay_s: 1,
491 imminent_oom_capture_delay_s: 2,
492 normal_capture_delay_s: 3,
493 warning_capture_delay_s: 4,
494 },
495 );
496
497 let output = exec
498 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
499 .expect("got hierarchy");
500
501 assert_data_tree!(@executor exec, output, root: {
502 kmem_stats: {
503 total_bytes: 1u64,
504 free_bytes: 2u64,
505 wired_bytes: 3u64,
506 total_heap_bytes: 4u64,
507 free_heap_bytes: 5u64,
508 vmo_bytes: 6u64,
509 mmu_overhead_bytes: 7u64,
510 ipc_bytes: 8u64,
511 other_bytes: 9u64,
512 free_loaned_bytes: 10u64,
513 cache_bytes: 11u64,
514 slab_bytes: 12u64,
515 zram_bytes: 13u64,
516 vmo_reclaim_total_bytes: 14u64,
517 vmo_reclaim_newest_bytes: 15u64,
518 vmo_reclaim_oldest_bytes: 16u64,
519 vmo_reclaim_disabled_bytes: 17u64,
520 vmo_discardable_locked_bytes: 18u64,
521 vmo_discardable_unlocked_bytes: 19u64
522 },
523 kmem_stats_compression: {
524 uncompressed_storage_bytes: 20u64,
525 compressed_storage_bytes: 21u64,
526 compressed_fragmentation_bytes: 22u64,
527 compression_time: 23i64,
528 decompression_time: 24i64,
529 total_page_compression_attempts: 25u64,
530 failed_page_compression_attempts: 26u64,
531 total_page_decompressions: 27u64,
532 compressed_page_evictions: 28u64,
533 eager_page_compressions: 29u64,
534 memory_pressure_page_compressions: 30u64,
535 critical_memory_page_compressions: 31u64,
536 pages_decompressed_unit_ns: 32u64,
537 pages_decompressed_within_log_time: vec![
538 40u64, 41u64, 42u64, 43u64, 44u64, 45u64, 46u64, 47u64,
539 ]
540 },
541 stalls: {
542 some_ms: 10u64,
543 full_ms: 20u64,
544 },
545 config: {
546 capture_on_pressure_change: true,
547 critical_capture_delay_s: 1u64,
548 imminent_oom_capture_delay_s: 2u64,
549 normal_capture_delay_s: 3u64,
550 warning_capture_delay_s: 4u64,
551 },
552 });
553 }
554
555 #[test]
556 fn test_digest_service_capture_on_pressure_change_and_wait() -> anyhow::Result<()> {
557 let mut exec = fasync::TestExecutor::new_with_fake_time();
558 let (stats_provider, stats_request_stream) =
559 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
560
561 fasync::Task::spawn(async move {
562 serve_kernel_stats(stats_request_stream).await.unwrap();
563 })
564 .detach();
565
566 let inspector = fuchsia_inspect::Inspector::default();
567 let (pressure_provider, pressure_request_stream) =
568 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
569 let mut digest_service = std::pin::pin!(digest_service(
570 Config {
571 capture_on_pressure_change: true,
572 imminent_oom_capture_delay_s: 10,
573 critical_capture_delay_s: 10,
574 warning_capture_delay_s: 10,
575 normal_capture_delay_s: 10,
576 },
577 get_attribution_data_provider(),
578 Arc::new(FakeStallProvider {}),
579 stats_provider,
580 pressure_provider,
581 Default::default(),
582 inspector.root().create_child("logger"),
583 )?);
584 let Poll::Ready(watcher) = exec
589 .run_until_stalled(
590 &mut pressure_request_stream
591 .then(|request| async {
592 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
593 request.expect("digest_service failed to register a watcher");
594 let watcher = watcher.into_proxy();
595 watcher.on_level_changed(fpressure::Level::Normal).await.expect(
596 "digest_service failed to acknowledge the initial pressure level",
597 );
598 watcher
599 })
600 .boxed()
601 .into_future(),
602 )
603 .map(|(watcher, _)| {
604 watcher.ok_or_else(|| anyhow::Error::msg("failed to register watcher"))
605 })?
606 else {
607 panic!("digest_service failed to register a watcher");
608 };
609 assert!(exec
611 .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Warning))?
612 .is_ready());
613 let _ = exec.run_until_stalled(&mut digest_service);
615
616 assert!(exec
618 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
619 exec.now() + Duration::from_secs(10).into()
620 )))
621 .is_ready());
622 let _ = exec.run_until_stalled(&mut digest_service)?;
624
625 let Poll::Ready(output) = exec
627 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
628 .map(|r| r.expect("got hierarchy"))
629 else {
630 panic!("Couldn't retrieve inspect output");
631 };
632
633 assert_data_tree!(@executor exec, output, root: {
634 logger: {
635 buckets: vec![
636 "Undigested",
637 "Orphaned",
638 "Kernel",
639 "Free",
640 "[Addl]PagerTotal",
641 "[Addl]PagerNewest",
642 "[Addl]PagerOldest",
643 "[Addl]DiscardableLocked",
644 "[Addl]DiscardableUnlocked",
645 "[Addl]ZramCompressedBytes",
646 ],
647 measurements: {
648 "0": {
650 timestamp: NonZeroIntProperty,
651 bucket_sizes: vec![
652 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
663 stalls: {
664 some_ms: 10u64,
665 full_ms: 20u64,
666 },
667 },
668 "1": {
670 timestamp: NonZeroIntProperty,
671 bucket_sizes: vec![
672 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
683 stalls: {
684 some_ms: 10u64,
685 full_ms: 20u64,
686 },
687 },
688 },
689 },
690 });
691 Ok(())
692 }
693
694 #[test]
695 fn test_digest_service_wait() -> anyhow::Result<()> {
696 let mut exec = fasync::TestExecutor::new_with_fake_time();
697 let (stats_provider, stats_request_stream) =
698 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
699
700 fasync::Task::spawn(async move {
701 serve_kernel_stats(stats_request_stream).await.unwrap();
702 })
703 .detach();
704 let (pressure_provider, pressure_request_stream) =
705 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
706 let inspector = fuchsia_inspect::Inspector::default();
707 let mut digest_service = std::pin::pin!(digest_service(
708 Config {
709 capture_on_pressure_change: false,
710 imminent_oom_capture_delay_s: 10,
711 critical_capture_delay_s: 10,
712 warning_capture_delay_s: 10,
713 normal_capture_delay_s: 10,
714 },
715 get_attribution_data_provider(),
716 Arc::new(FakeStallProvider {}),
717 stats_provider,
718 pressure_provider,
719 Default::default(),
720 inspector.root().create_child("logger"),
721 )?);
722 let Poll::Ready((_watcher, _pressure_stream)) = exec
726 .run_until_stalled(
727 &mut std::pin::pin!(pressure_request_stream.then(|request| async {
728 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
729 request.map_err(anyhow::Error::from)?;
730 let watcher_proxy = watcher.into_proxy();
731 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
732 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
733 }))
734 .into_future(),
735 )
736 .map(|(watcher, pressure_stream)| {
737 (
738 watcher.ok_or_else(|| {
739 anyhow::Error::msg("Pressure stream unexpectedly exhausted")
740 }),
741 pressure_stream,
742 )
743 })
744 else {
745 panic!("Failed to register the watcher");
746 };
747
748 let _ = exec.run_until_stalled(&mut digest_service)?;
750 assert!(exec
752 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
753 exec.now() + Duration::from_secs(15).into()
754 )))
755 .is_ready());
756 assert!(exec.run_until_stalled(&mut digest_service).is_pending());
758 let Poll::Ready(output) = exec
760 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
761 .map(|r| r.expect("got hierarchy"))
762 else {
763 panic!("Couldn't retrieve inspect output");
764 };
765
766 assert_data_tree!(@executor exec, output, root: {
767 logger: {
768 buckets: vec![
769 "Undigested",
770 "Orphaned",
771 "Kernel",
772 "Free",
773 "[Addl]PagerTotal",
774 "[Addl]PagerNewest",
775 "[Addl]PagerOldest",
776 "[Addl]DiscardableLocked",
777 "[Addl]DiscardableUnlocked",
778 "[Addl]ZramCompressedBytes",
779 ],
780 measurements: {
781 "0": {
783 timestamp: NonZeroIntProperty,
784 bucket_sizes: vec![
785 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
796 stalls: {
797 some_ms: 10u64,
798 full_ms: 20u64,
799 },
800 },
801 },
802 },
803 });
804 Ok(())
805 }
806
807 #[test]
808 fn test_digest_service_new_pressure_does_not_postpone() -> anyhow::Result<()> {
809 let mut exec = fasync::TestExecutor::new_with_fake_time();
811 let (stats_provider, stats_request_stream) =
812 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
813
814 fasync::Task::spawn(async move {
815 serve_kernel_stats(stats_request_stream).await.unwrap();
816 })
817 .detach();
818 let (pressure_provider, pressure_request_stream) =
819 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
820 let inspector = fuchsia_inspect::Inspector::default();
821 let mut digest_service = std::pin::pin!(digest_service(
822 Config {
823 capture_on_pressure_change: false,
824 imminent_oom_capture_delay_s: 10,
825 critical_capture_delay_s: 10,
826 warning_capture_delay_s: 100,
827 normal_capture_delay_s: 100,
828 },
829 get_attribution_data_provider(),
830 Arc::new(FakeStallProvider {}),
831 stats_provider,
832 pressure_provider,
833 Default::default(),
834 inspector.root().create_child("logger"),
835 )?);
836 let Poll::Ready((watcher, _pressure_stream)) = exec
840 .run_until_stalled(
841 &mut std::pin::pin!(pressure_request_stream.then(|request| async {
842 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
843 request.map_err(anyhow::Error::from)?;
844 let watcher_proxy = watcher.into_proxy();
845 watcher_proxy.on_level_changed(fpressure::Level::Critical).await?;
846 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
847 }))
848 .into_future(),
849 )
850 .map(|(watcher, pressure_stream)| {
851 (
852 watcher.ok_or_else(|| {
853 anyhow::Error::msg("Pressure stream unexpectedly exhausted")
854 }),
855 pressure_stream,
856 )
857 })
858 else {
859 panic!("Failed to register the watcher");
860 };
861
862 let watcher = watcher??;
865 assert!(exec.run_until_stalled(&mut digest_service)?.is_pending());
867 assert!(exec
869 .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Normal))?
870 .is_ready());
871 assert!(exec.run_until_stalled(&mut digest_service)?.is_pending());
872 assert!(exec
876 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
877 exec.now() + Duration::from_secs(25).into()
878 )))
879 .is_ready());
880 assert!(exec.run_until_stalled(&mut digest_service)?.is_pending());
882 let Poll::Ready(output) = exec
884 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
885 .map(|r| r.expect("got hierarchy"))
886 else {
887 panic!("Couldn't retrieve inspect output");
888 };
889
890 assert_data_tree!(@executor exec, output, root: {
891 logger: {
892 buckets: vec![
893 "Undigested",
894 "Orphaned",
895 "Kernel",
896 "Free",
897 "[Addl]PagerTotal",
898 "[Addl]PagerNewest",
899 "[Addl]PagerOldest",
900 "[Addl]DiscardableLocked",
901 "[Addl]DiscardableUnlocked",
902 "[Addl]ZramCompressedBytes",
903 ],
904 measurements: {
905 "0": {
907 timestamp: NonZeroIntProperty,
908 bucket_sizes: vec![
909 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
920 stalls: {
921 some_ms: 10u64,
922 full_ms: 20u64,
923 },
924 },
925 },
926 },
927 });
928 Ok(())
929 }
930
931 #[test]
932 fn test_digest_service_no_capture_on_pressure_change() -> anyhow::Result<()> {
933 let mut exec = fasync::TestExecutor::new();
934 let (stats_provider, stats_request_stream) =
935 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
936
937 fasync::Task::spawn(async move {
938 serve_kernel_stats(stats_request_stream).await.unwrap();
939 })
940 .detach();
941
942 let inspector = fuchsia_inspect::Inspector::default();
943 let (pressure_provider, pressure_request_stream) =
944 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
945 let mut serve_pressure_stream = pressure_request_stream
946 .then(|request| async {
947 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
948 request.map_err(anyhow::Error::from)?;
949 let watcher_proxy = watcher.into_proxy();
950 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
951 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
952 })
953 .boxed();
954 let mut digest_service = std::pin::pin!(digest_service(
955 Config {
956 capture_on_pressure_change: false,
957 imminent_oom_capture_delay_s: 10,
958 critical_capture_delay_s: 10,
959 warning_capture_delay_s: 10,
960 normal_capture_delay_s: 10,
961 },
962 get_attribution_data_provider(),
963 Arc::new(FakeStallProvider {}),
964 stats_provider,
965 pressure_provider,
966 Default::default(),
967 inspector.root().create_child("logger"),
968 )?);
969 let watcher =
970 exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
971 let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
972 let _ = exec.run_until_stalled(&mut digest_service);
973 let output = exec
974 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
975 .expect("got hierarchy");
976
977 assert_data_tree!(@executor exec, output, root: {
978 logger: {
979 measurements: {},
980 },
981 });
982 Ok(())
983 }
984
985 #[test]
986 fn test_digest_service_capture_on_pressure_change() -> anyhow::Result<()> {
987 let mut exec = fasync::TestExecutor::new();
988 let (stats_provider, stats_request_stream) =
989 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
990
991 fasync::Task::spawn(async move {
992 serve_kernel_stats(stats_request_stream).await.unwrap();
993 })
994 .detach();
995
996 let inspector = fuchsia_inspect::Inspector::default();
997 let (pressure_provider, pressure_request_stream) =
998 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
999 let mut serve_pressure_stream = pressure_request_stream
1000 .then(|request| async {
1001 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
1002 request.map_err(anyhow::Error::from)?;
1003 let watcher_proxy = watcher.into_proxy();
1004 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
1005 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
1006 })
1007 .boxed();
1008 let mut digest_service = std::pin::pin!(digest_service(
1009 Config {
1010 capture_on_pressure_change: true,
1011 imminent_oom_capture_delay_s: 10,
1012 critical_capture_delay_s: 10,
1013 warning_capture_delay_s: 10,
1014 normal_capture_delay_s: 10,
1015 },
1016 get_attribution_data_provider(),
1017 Arc::new(FakeStallProvider {}),
1018 stats_provider,
1019 pressure_provider,
1020 Default::default(),
1021 inspector.root().create_child("logger"),
1022 )?);
1023 let watcher =
1024 exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
1025 let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
1026 let _ = exec.run_until_stalled(&mut digest_service);
1027 let output = exec
1028 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
1029 .expect("got hierarchy");
1030
1031 assert_data_tree!(@executor exec, output, root: {
1032 logger: {
1033 buckets: vec![
1034 "Undigested",
1035 "Orphaned",
1036 "Kernel",
1037 "Free",
1038 "[Addl]PagerTotal",
1039 "[Addl]PagerNewest",
1040 "[Addl]PagerOldest",
1041 "[Addl]DiscardableLocked",
1042 "[Addl]DiscardableUnlocked",
1043 "[Addl]ZramCompressedBytes",
1044 ],
1045 measurements: {
1046 "0": {
1047 timestamp: NonZeroIntProperty,
1048 bucket_sizes: vec![
1049 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
1060 stalls: {
1061 some_ms: 10u64,
1062 full_ms: 20u64,
1063 },
1064 },
1065 },
1066 },
1067 });
1068 Ok(())
1069 }
1070}