1use anyhow::{anyhow, Error, Result};
5use attribution_processing::digest::{BucketDefinition, Digest};
6use attribution_processing::AttributionDataProvider;
7use fpressure::WatcherRequest;
8use fuchsia_async::{MonotonicDuration, Task, WakeupTime};
9use fuchsia_inspect::{ArrayProperty, Inspector, Node};
10use fuchsia_inspect_contrib::nodes::BoundedListNode;
11use futures::{select, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
12use inspect_runtime::PublishedInspectController;
13use log::debug;
14use memory_monitor2_config::Config;
15use stalls::StallProvider;
16use std::sync::Arc;
17use {
18 fidl_fuchsia_kernel as fkernel, fidl_fuchsia_memorypressure as fpressure, fuchsia_inspect as _,
19 inspect_runtime as _,
20};
21
22pub struct ServiceTask {
25 _inspect_controller: PublishedInspectController,
26 _periodic_digest: Task<Result<(), anyhow::Error>>,
27}
28
29pub fn start_service(
32 attribution_data_service: Arc<impl AttributionDataProvider>,
33 kernel_stats_proxy: fkernel::StatsProxy,
34 stall_provider: Arc<impl StallProvider>,
35 memory_monitor2_config: Config,
36 memorypressure_proxy: fpressure::ProviderProxy,
37 bucket_definitions: Vec<BucketDefinition>,
38) -> Result<ServiceTask> {
39 debug!("Start serving inspect tree.");
40
41 let inspector = fuchsia_inspect::component::inspector();
44
45 let inspect_controller =
47 inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
48 .ok_or_else(|| anyhow!("Failed to serve server handling `fuchsia.inspect.Tree`"))?;
49
50 build_inspect_tree(kernel_stats_proxy.clone(), stall_provider, inspector);
51 let digest_service = digest_service(
52 memory_monitor2_config,
53 attribution_data_service,
54 kernel_stats_proxy,
55 memorypressure_proxy,
56 bucket_definitions,
57 inspector.root().create_child("logger"),
58 )?;
59 Ok(ServiceTask { _inspect_controller: inspect_controller, _periodic_digest: digest_service })
60}
61
62fn build_inspect_tree(
63 kernel_stats_proxy: fkernel::StatsProxy,
64 stall_provider: Arc<impl StallProvider>,
65 inspector: &Inspector,
66) {
67 {
69 let kernel_stats_proxy = kernel_stats_proxy.clone();
70 inspector.root().record_lazy_child("kmem_stats", move || {
71 let kernel_stats_proxy = kernel_stats_proxy.clone();
72 async move {
73 let inspector = Inspector::default();
74 let root = inspector.root();
75 let mem_stats = kernel_stats_proxy.get_memory_stats().await?;
76 mem_stats.total_bytes.map(|v| root.record_uint("total_bytes", v));
77 mem_stats.free_bytes.map(|v| root.record_uint("free_bytes", v));
78 mem_stats.free_loaned_bytes.map(|v| root.record_uint("free_loaned_bytes", v));
79 mem_stats.wired_bytes.map(|v| root.record_uint("wired_bytes", v));
80 mem_stats.total_heap_bytes.map(|v| root.record_uint("total_heap_bytes", v));
81 mem_stats.free_heap_bytes.map(|v| root.record_uint("free_heap_bytes", v));
82 mem_stats.vmo_bytes.map(|v| root.record_uint("vmo_bytes", v));
83 mem_stats.mmu_overhead_bytes.map(|v| root.record_uint("mmu_overhead_bytes", v));
84 mem_stats.ipc_bytes.map(|v| root.record_uint("ipc_bytes", v));
85 mem_stats.cache_bytes.map(|v| root.record_uint("cache_bytes", v));
86 mem_stats.slab_bytes.map(|v| root.record_uint("slab_bytes", v));
87 mem_stats.zram_bytes.map(|v| root.record_uint("zram_bytes", v));
88 mem_stats.other_bytes.map(|v| root.record_uint("other_bytes", v));
89 mem_stats
90 .vmo_reclaim_total_bytes
91 .map(|v| root.record_uint("vmo_reclaim_total_bytes", v));
92 mem_stats
93 .vmo_reclaim_newest_bytes
94 .map(|v| root.record_uint("vmo_reclaim_newest_bytes", v));
95 mem_stats
96 .vmo_reclaim_oldest_bytes
97 .map(|v| root.record_uint("vmo_reclaim_oldest_bytes", v));
98 mem_stats
99 .vmo_reclaim_disabled_bytes
100 .map(|v| root.record_uint("vmo_reclaim_disabled_bytes", v));
101 mem_stats
102 .vmo_discardable_locked_bytes
103 .map(|v| root.record_uint("vmo_discardable_locked_bytes", v));
104 mem_stats
105 .vmo_discardable_unlocked_bytes
106 .map(|v| root.record_uint("vmo_discardable_unlocked_bytes", v));
107 Ok(inspector)
108 }
109 .boxed()
110 })
111 };
112
113 {
114 inspector.root().record_lazy_child("kmem_stats_compression", move || {
115 let kernel_stats_proxy = kernel_stats_proxy.clone();
116 async move {
117 let inspector = Inspector::default();
118 let cmp_stats = kernel_stats_proxy.get_memory_stats_compression().await?;
119 cmp_stats
120 .uncompressed_storage_bytes
121 .map(|v| inspector.root().record_uint("uncompressed_storage_bytes", v));
122 cmp_stats
123 .compressed_storage_bytes
124 .map(|v| inspector.root().record_uint("compressed_storage_bytes", v));
125 cmp_stats
126 .compressed_fragmentation_bytes
127 .map(|v| inspector.root().record_uint("compressed_fragmentation_bytes", v));
128 cmp_stats
129 .compression_time
130 .map(|v| inspector.root().record_int("compression_time", v));
131 cmp_stats
132 .decompression_time
133 .map(|v| inspector.root().record_int("decompression_time", v));
134 cmp_stats
135 .total_page_compression_attempts
136 .map(|v| inspector.root().record_uint("total_page_compression_attempts", v));
137 cmp_stats
138 .failed_page_compression_attempts
139 .map(|v| inspector.root().record_uint("failed_page_compression_attempts", v));
140 cmp_stats
141 .total_page_decompressions
142 .map(|v| inspector.root().record_uint("total_page_decompressions", v));
143 cmp_stats
144 .compressed_page_evictions
145 .map(|v| inspector.root().record_uint("compressed_page_evictions", v));
146 cmp_stats
147 .eager_page_compressions
148 .map(|v| inspector.root().record_uint("eager_page_compressions", v));
149 cmp_stats
150 .memory_pressure_page_compressions
151 .map(|v| inspector.root().record_uint("memory_pressure_page_compressions", v));
152 cmp_stats
153 .critical_memory_page_compressions
154 .map(|v| inspector.root().record_uint("critical_memory_page_compressions", v));
155 cmp_stats
156 .pages_decompressed_unit_ns
157 .map(|v| inspector.root().record_uint("pages_decompressed_unit_ns", v));
158 cmp_stats.pages_decompressed_within_log_time.map(|v| {
159 let array =
160 inspector.root().create_uint_array("pages_decompressed_within_log_time", 8);
161 array.set(0, v[0]);
163 array.set(1, v[1]);
164 array.set(2, v[2]);
165 array.set(3, v[3]);
166 array.set(4, v[4]);
167 array.set(5, v[5]);
168 array.set(6, v[6]);
169 array.set(7, v[7]);
170 inspector.root().record(array);
171 });
172 Ok(inspector)
173 }
174 .boxed()
175 });
176 }
177
178 {
179 inspector.root().record_lazy_child("stalls", move || {
180 let stall_info = stall_provider.get_stall_info().unwrap();
181 let stall_rate_opt = stall_provider.get_stall_rate();
182 async move {
183 let inspector = Inspector::default();
184 inspector.root().record_int("current_some", stall_info.stall_time_some);
185 inspector.root().record_int("current_full", stall_info.stall_time_full);
186 if let Some(stall_rate) = stall_rate_opt {
187 inspector
188 .root()
189 .record_int("rate_interval_s", stall_rate.interval.into_seconds());
190 inspector.root().record_int("rate_some", stall_rate.rate_some);
191 inspector.root().record_int("rate_full", stall_rate.rate_full);
192 }
193 Ok(inspector)
194 }
195 .boxed()
196 });
197 }
198}
199
200fn digest_service(
201 memory_monitor2_config: Config,
202 attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
203 kernel_stats_proxy: fkernel::StatsProxy,
204 memorypressure_proxy: fpressure::ProviderProxy,
205 bucket_definitions: Vec<BucketDefinition>,
206 digest_node: Node,
207) -> Result<Task<Result<(), Error>>> {
208 let (watcher, watcher_stream) =
210 fidl::endpoints::create_request_stream::<fpressure::WatcherMarker>();
211 memorypressure_proxy.register_watcher(watcher)?;
212
213 Ok(fuchsia_async::Task::spawn(async move {
214 let mut buckets_list_node =
215 BoundedListNode::new(digest_node.create_child("measurements"), 100);
217 let buckets_names = std::cell::OnceCell::new();
218 let attribution_data_service = attribution_data_service;
219 let pressure_stream = watcher_stream.map_err(anyhow::Error::from);
220
221 let (request, mut pressure_stream) = pressure_stream.into_future().await;
223 let WatcherRequest::OnLevelChanged { level, responder } = request.ok_or_else(|| {
224 anyhow::Error::msg(
225 "Unexpectedly exhausted pressure stream before receiving baseline pressure level",
226 )
227 })??;
228 responder.send()?;
229 let mut current_level = level;
230 let new_timer = |level| {
231 MonotonicDuration::from_seconds(match level {
232 fpressure::Level::Normal => memory_monitor2_config.normal_capture_delay_s,
233 fpressure::Level::Warning => memory_monitor2_config.warning_capture_delay_s,
234 fpressure::Level::Critical => memory_monitor2_config.critical_capture_delay_s,
235 } as i64)
236 .into_timer()
237 .boxed()
238 .fuse()
239 };
240 let mut timer = new_timer(current_level);
241 loop {
242 let () = select! {
245 pressure = pressure_stream.next() =>
248 match pressure.ok_or_else(|| anyhow::Error::msg("Unexpectedly exhausted pressure stream"))?? {
249 WatcherRequest::OnLevelChanged{level, responder} => {
250 responder.send()?;
251 if level == current_level { continue; }
252 current_level = level;
253 timer = new_timer(level);
254 if !memory_monitor2_config.capture_on_pressure_change { continue; }
255 },
256 },
257 _ = timer => {timer = new_timer(current_level);}
260 };
261
262 let timestamp = zx::BootInstant::get();
263 let (kmem_stats, kmem_stats_compression) = try_join!(
265 kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
266 kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
267 )?;
268
269 let Digest { buckets } = Digest::compute(
271 &*attribution_data_service,
272 &kmem_stats,
273 &kmem_stats_compression,
274 &bucket_definitions,
275 )?;
276
277 let _ = buckets_names.get_or_init(|| {
279 let buckets_names = digest_node.create_string_array("buckets", buckets.len());
281 for (i, attribution_processing::digest::Bucket { name, .. }) in
282 buckets.iter().enumerate()
283 {
284 buckets_names.set(i, name);
285 }
286 buckets_names
287 });
288
289 buckets_list_node.add_entry(|n| {
291 n.record_int("timestamp", timestamp.into_nanos());
292 let ia = n.create_uint_array("bucket_sizes", buckets.len());
293 for (i, b) in buckets.iter().enumerate() {
294 ia.set(i, b.size as u64);
295 }
296 n.record(ia);
297 });
298 }
299 }))
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use attribution_processing::testing::FakeAttributionDataProvider;
306 use attribution_processing::{
307 Attribution, AttributionData, Principal, PrincipalDescription, PrincipalIdentifier,
308 PrincipalType, Resource, ResourceReference, ZXName,
309 };
310 use diagnostics_assertions::{assert_data_tree, NonZeroIntProperty};
311 use fuchsia_async::TestExecutor;
312 use futures::task::Poll;
313 use futures::TryStreamExt;
314 use std::time::Duration;
315 use {
316 fidl_fuchsia_memory_attribution_plugin as fplugin,
317 fidl_fuchsia_memorypressure as fpressure, fuchsia_async as fasync,
318 };
319
320 fn get_attribution_data_provider() -> Arc<impl AttributionDataProvider + 'static> {
321 let attribution_data = AttributionData {
322 principals_vec: vec![Principal {
323 identifier: PrincipalIdentifier(1),
324 description: PrincipalDescription::Component("principal".to_owned()),
325 principal_type: PrincipalType::Runnable,
326 parent: None,
327 }],
328 resources_vec: vec![Resource {
329 koid: 10,
330 name_index: 0,
331 resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
332 parent: None,
333 private_committed_bytes: Some(1024),
334 private_populated_bytes: Some(2048),
335 scaled_committed_bytes: Some(1024),
336 scaled_populated_bytes: Some(2048),
337 total_committed_bytes: Some(1024),
338 total_populated_bytes: Some(2048),
339 ..Default::default()
340 }),
341 }],
342 resource_names: vec![ZXName::from_string_lossy("resource")],
343 attributions: vec![Attribution {
344 source: PrincipalIdentifier(1),
345 subject: PrincipalIdentifier(1),
346 resources: vec![ResourceReference::KernelObject(10)],
347 }],
348 };
349 Arc::new(FakeAttributionDataProvider { attribution_data })
350 }
351
352 async fn serve_kernel_stats(
353 mut request_stream: fkernel::StatsRequestStream,
354 ) -> Result<(), fidl::Error> {
355 while let Some(request) = request_stream.try_next().await? {
356 match request {
357 fkernel::StatsRequest::GetMemoryStats { responder } => {
358 responder
359 .send(&fkernel::MemoryStats {
360 total_bytes: Some(1),
361 free_bytes: Some(2),
362 wired_bytes: Some(3),
363 total_heap_bytes: Some(4),
364 free_heap_bytes: Some(5),
365 vmo_bytes: Some(6),
366 mmu_overhead_bytes: Some(7),
367 ipc_bytes: Some(8),
368 other_bytes: Some(9),
369 free_loaned_bytes: Some(10),
370 cache_bytes: Some(11),
371 slab_bytes: Some(12),
372 zram_bytes: Some(13),
373 vmo_reclaim_total_bytes: Some(14),
374 vmo_reclaim_newest_bytes: Some(15),
375 vmo_reclaim_oldest_bytes: Some(16),
376 vmo_reclaim_disabled_bytes: Some(17),
377 vmo_discardable_locked_bytes: Some(18),
378 vmo_discardable_unlocked_bytes: Some(19),
379 ..Default::default()
380 })
381 .unwrap();
382 }
383 fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
384 unimplemented!("Deprecated call, should not be used")
385 }
386 fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
387 responder
388 .send(&fkernel::MemoryStatsCompression {
389 uncompressed_storage_bytes: Some(20),
390 compressed_storage_bytes: Some(21),
391 compressed_fragmentation_bytes: Some(22),
392 compression_time: Some(23),
393 decompression_time: Some(24),
394 total_page_compression_attempts: Some(25),
395 failed_page_compression_attempts: Some(26),
396 total_page_decompressions: Some(27),
397 compressed_page_evictions: Some(28),
398 eager_page_compressions: Some(29),
399 memory_pressure_page_compressions: Some(30),
400 critical_memory_page_compressions: Some(31),
401 pages_decompressed_unit_ns: Some(32),
402 pages_decompressed_within_log_time: Some([
403 40, 41, 42, 43, 44, 45, 46, 47,
404 ]),
405 ..Default::default()
406 })
407 .unwrap();
408 }
409 fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
410 fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
411 }
412 }
413 Ok(())
414 }
415
416 #[test]
417 fn test_build_inspect_tree() {
418 let mut exec = fasync::TestExecutor::new();
419 let (stats_provider, stats_request_stream) =
420 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
421
422 fasync::Task::spawn(async move {
423 serve_kernel_stats(stats_request_stream).await.unwrap();
424 })
425 .detach();
426
427 let inspector = fuchsia_inspect::Inspector::default();
428
429 struct FakeStallProvider {}
430 impl StallProvider for FakeStallProvider {
431 fn get_stall_info(&self) -> Result<zx::MemoryStall, anyhow::Error> {
432 Ok(zx::MemoryStall { stall_time_some: 10, stall_time_full: 20 })
433 }
434
435 fn get_stall_rate(&self) -> Option<stalls::MemoryStallRate> {
436 Some(stalls::MemoryStallRate {
437 interval: fasync::MonotonicDuration::from_seconds(60),
438 rate_some: 1,
439 rate_full: 2,
440 })
441 }
442 }
443
444 build_inspect_tree(stats_provider, Arc::new(FakeStallProvider {}), &inspector);
445
446 let output = exec
447 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
448 .expect("got hierarchy");
449
450 assert_data_tree!(output, root: {
451 kmem_stats: {
452 total_bytes: 1u64,
453 free_bytes: 2u64,
454 wired_bytes: 3u64,
455 total_heap_bytes: 4u64,
456 free_heap_bytes: 5u64,
457 vmo_bytes: 6u64,
458 mmu_overhead_bytes: 7u64,
459 ipc_bytes: 8u64,
460 other_bytes: 9u64,
461 free_loaned_bytes: 10u64,
462 cache_bytes: 11u64,
463 slab_bytes: 12u64,
464 zram_bytes: 13u64,
465 vmo_reclaim_total_bytes: 14u64,
466 vmo_reclaim_newest_bytes: 15u64,
467 vmo_reclaim_oldest_bytes: 16u64,
468 vmo_reclaim_disabled_bytes: 17u64,
469 vmo_discardable_locked_bytes: 18u64,
470 vmo_discardable_unlocked_bytes: 19u64
471 },
472 kmem_stats_compression: {
473 uncompressed_storage_bytes: 20u64,
474 compressed_storage_bytes: 21u64,
475 compressed_fragmentation_bytes: 22u64,
476 compression_time: 23i64,
477 decompression_time: 24i64,
478 total_page_compression_attempts: 25u64,
479 failed_page_compression_attempts: 26u64,
480 total_page_decompressions: 27u64,
481 compressed_page_evictions: 28u64,
482 eager_page_compressions: 29u64,
483 memory_pressure_page_compressions: 30u64,
484 critical_memory_page_compressions: 31u64,
485 pages_decompressed_unit_ns: 32u64,
486 pages_decompressed_within_log_time: vec![
487 40u64, 41u64, 42u64, 43u64, 44u64, 45u64, 46u64, 47u64,
488 ]
489 },
490 stalls: {
491 current_some: 10i64,
492 current_full: 20i64,
493 rate_some: 1i64,
494 rate_full: 2i64,
495 rate_interval_s: 60i64
496 }
497 });
498 }
499
500 #[test]
501 fn test_digest_service_capture_on_pressure_change_and_wait() -> anyhow::Result<()> {
502 let mut exec = fasync::TestExecutor::new_with_fake_time();
503 let (stats_provider, stats_request_stream) =
504 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
505
506 fasync::Task::spawn(async move {
507 serve_kernel_stats(stats_request_stream).await.unwrap();
508 })
509 .detach();
510
511 let inspector = fuchsia_inspect::Inspector::default();
512 let (pressure_provider, pressure_request_stream) =
513 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
514 let mut digest_service = std::pin::pin!(digest_service(
515 Config {
516 capture_on_pressure_change: true,
517 imminent_oom_capture_delay_s: 10,
518 critical_capture_delay_s: 10,
519 warning_capture_delay_s: 10,
520 normal_capture_delay_s: 10,
521 },
522 get_attribution_data_provider(),
523 stats_provider,
524 pressure_provider,
525 vec![],
526 inspector.root().create_child("logger"),
527 )?);
528 let Poll::Ready(watcher) = exec
533 .run_until_stalled(
534 &mut pressure_request_stream
535 .then(|request| async {
536 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
537 request.expect("digest_service failed to register a watcher");
538 let watcher = watcher.into_proxy();
539 watcher.on_level_changed(fpressure::Level::Normal).await.expect(
540 "digest_service failed to acknowledge the initial pressure level",
541 );
542 watcher
543 })
544 .boxed()
545 .into_future(),
546 )
547 .map(|(watcher, _)| {
548 watcher.ok_or_else(|| anyhow::Error::msg("failed to register watcher"))
549 })?
550 else {
551 panic!("digest_service failed to register a watcher");
552 };
553 assert!(exec
555 .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Warning))?
556 .is_ready());
557 let _ = exec.run_until_stalled(&mut digest_service);
559
560 assert!(exec
562 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
563 exec.now() + Duration::from_secs(10).into()
564 )))
565 .is_ready());
566 let _ = exec.run_until_stalled(&mut digest_service)?;
568
569 let Poll::Ready(output) = exec
571 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
572 .map(|r| r.expect("got hierarchy"))
573 else {
574 panic!("Couldn't retrieve inspect output");
575 };
576
577 assert_data_tree!(output, root: {
578 logger: {
579 buckets: vec![
580 "Undigested",
581 "Orphaned",
582 "Kernel",
583 "Free",
584 "[Addl]PagerTotal",
585 "[Addl]PagerNewest",
586 "[Addl]PagerOldest",
587 "[Addl]DiscardableLocked",
588 "[Addl]DiscardableUnlocked",
589 "[Addl]ZramCompressedBytes",
590 ],
591 measurements: {
592 "0": {
594 timestamp: NonZeroIntProperty,
595 bucket_sizes: vec![
596 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
607 },
608 "1": {
610 timestamp: NonZeroIntProperty,
611 bucket_sizes: vec![
612 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
623 },
624 },
625 },
626 });
627 Ok(())
628 }
629
630 #[test]
631 fn test_digest_service_wait() -> anyhow::Result<()> {
632 let mut exec = fasync::TestExecutor::new_with_fake_time();
633 let (stats_provider, stats_request_stream) =
634 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
635
636 fasync::Task::spawn(async move {
637 serve_kernel_stats(stats_request_stream).await.unwrap();
638 })
639 .detach();
640 let (pressure_provider, pressure_request_stream) =
641 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
642 let inspector = fuchsia_inspect::Inspector::default();
643 let mut digest_service = std::pin::pin!(digest_service(
644 Config {
645 capture_on_pressure_change: false,
646 imminent_oom_capture_delay_s: 10,
647 critical_capture_delay_s: 10,
648 warning_capture_delay_s: 10,
649 normal_capture_delay_s: 10,
650 },
651 get_attribution_data_provider(),
652 stats_provider,
653 pressure_provider,
654 vec![],
655 inspector.root().create_child("logger"),
656 )?);
657 let Poll::Ready((_watcher, _pressure_stream)) = exec
661 .run_until_stalled(
662 &mut std::pin::pin!(pressure_request_stream.then(|request| async {
663 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
664 request.map_err(anyhow::Error::from)?;
665 let watcher_proxy = watcher.into_proxy();
666 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
667 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
668 }))
669 .into_future(),
670 )
671 .map(|(watcher, pressure_stream)| {
672 (
673 watcher.ok_or_else(|| {
674 anyhow::Error::msg("Pressure stream unexpectedly exhausted")
675 }),
676 pressure_stream,
677 )
678 })
679 else {
680 panic!("Failed to register the watcher");
681 };
682
683 let _ = exec.run_until_stalled(&mut digest_service)?;
685 assert!(exec
687 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
688 exec.now() + Duration::from_secs(15).into()
689 )))
690 .is_ready());
691 assert!(exec.run_until_stalled(&mut digest_service).is_pending());
693 let Poll::Ready(output) = exec
695 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
696 .map(|r| r.expect("got hierarchy"))
697 else {
698 panic!("Couldn't retrieve inspect output");
699 };
700
701 assert_data_tree!(output, root: {
702 logger: {
703 buckets: vec![
704 "Undigested",
705 "Orphaned",
706 "Kernel",
707 "Free",
708 "[Addl]PagerTotal",
709 "[Addl]PagerNewest",
710 "[Addl]PagerOldest",
711 "[Addl]DiscardableLocked",
712 "[Addl]DiscardableUnlocked",
713 "[Addl]ZramCompressedBytes",
714 ],
715 measurements: {
716 "0": {
718 timestamp: NonZeroIntProperty,
719 bucket_sizes: vec![
720 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
731 },
732 },
733 },
734 });
735 Ok(())
736 }
737
738 #[test]
739 fn test_digest_service_no_capture_on_pressure_change() -> anyhow::Result<()> {
740 let mut exec = fasync::TestExecutor::new();
741 let (stats_provider, stats_request_stream) =
742 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
743
744 fasync::Task::spawn(async move {
745 serve_kernel_stats(stats_request_stream).await.unwrap();
746 })
747 .detach();
748
749 let inspector = fuchsia_inspect::Inspector::default();
750 let (pressure_provider, pressure_request_stream) =
751 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
752 let mut serve_pressure_stream = pressure_request_stream
753 .then(|request| async {
754 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
755 request.map_err(anyhow::Error::from)?;
756 let watcher_proxy = watcher.into_proxy();
757 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
758 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
759 })
760 .boxed();
761 let mut digest_service = std::pin::pin!(digest_service(
762 Config {
763 capture_on_pressure_change: false,
764 imminent_oom_capture_delay_s: 10,
765 critical_capture_delay_s: 10,
766 warning_capture_delay_s: 10,
767 normal_capture_delay_s: 10,
768 },
769 get_attribution_data_provider(),
770 stats_provider,
771 pressure_provider,
772 vec![],
773 inspector.root().create_child("logger"),
774 )?);
775 let watcher =
776 exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
777 let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
778 let _ = exec.run_until_stalled(&mut digest_service);
779 let output = exec
780 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
781 .expect("got hierarchy");
782
783 assert_data_tree!(output, root: {
784 logger: {
785 measurements: {},
786 },
787 });
788 Ok(())
789 }
790
791 #[test]
792 fn test_digest_service_capture_on_pressure_change() -> anyhow::Result<()> {
793 let mut exec = fasync::TestExecutor::new();
794 let (stats_provider, stats_request_stream) =
795 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
796
797 fasync::Task::spawn(async move {
798 serve_kernel_stats(stats_request_stream).await.unwrap();
799 })
800 .detach();
801
802 let inspector = fuchsia_inspect::Inspector::default();
803 let (pressure_provider, pressure_request_stream) =
804 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
805 let mut serve_pressure_stream = pressure_request_stream
806 .then(|request| async {
807 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
808 request.map_err(anyhow::Error::from)?;
809 let watcher_proxy = watcher.into_proxy();
810 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
811 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
812 })
813 .boxed();
814 let mut digest_service = std::pin::pin!(digest_service(
815 Config {
816 capture_on_pressure_change: true,
817 imminent_oom_capture_delay_s: 10,
818 critical_capture_delay_s: 10,
819 warning_capture_delay_s: 10,
820 normal_capture_delay_s: 10,
821 },
822 get_attribution_data_provider(),
823 stats_provider,
824 pressure_provider,
825 vec![],
826 inspector.root().create_child("logger"),
827 )?);
828 let watcher =
829 exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
830 let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
831 let _ = exec.run_until_stalled(&mut digest_service);
832 let output = exec
833 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
834 .expect("got hierarchy");
835
836 assert_data_tree!(output, root: {
837 logger: {
838 buckets: vec![
839 "Undigested",
840 "Orphaned",
841 "Kernel",
842 "Free",
843 "[Addl]PagerTotal",
844 "[Addl]PagerNewest",
845 "[Addl]PagerOldest",
846 "[Addl]DiscardableLocked",
847 "[Addl]DiscardableUnlocked",
848 "[Addl]ZramCompressedBytes",
849 ],
850 measurements: {
851 "0": {
852 timestamp: NonZeroIntProperty,
853 bucket_sizes: vec![
854 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
865 },
866 },
867 },
868 });
869 Ok(())
870 }
871}