inspect_nodes/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error, Result};
5use attribution_processing::digest::{BucketDefinition, Digest};
6use attribution_processing::AttributionDataProvider;
7use fpressure::WatcherRequest;
8use fuchsia_async::{MonotonicDuration, Task, WakeupTime};
9use fuchsia_inspect::{ArrayProperty, Inspector, Node};
10use fuchsia_inspect_contrib::nodes::BoundedListNode;
11use futures::{select, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
12use inspect_runtime::PublishedInspectController;
13use log::debug;
14use memory_monitor2_config::Config;
15use stalls::StallProvider;
16use std::sync::Arc;
17use {
18    fidl_fuchsia_kernel as fkernel, fidl_fuchsia_memorypressure as fpressure, fuchsia_inspect as _,
19    inspect_runtime as _,
20};
21
22/// Hold the resource required to serve the inspect tree.
23/// The FIDL service stops when this object is dropped.
24pub struct ServiceTask {
25    _inspect_controller: PublishedInspectController,
26    _periodic_digest: Task<Result<(), anyhow::Error>>,
27}
28
29/// Begins to serve the inspect tree, and returns an object holding the server's resources.
30/// Dropping the `ServiceTask` stops the service.
31pub fn start_service(
32    attribution_data_service: Arc<impl AttributionDataProvider>,
33    kernel_stats_proxy: fkernel::StatsProxy,
34    stall_provider: Arc<impl StallProvider>,
35    memory_monitor2_config: Config,
36    memorypressure_proxy: fpressure::ProviderProxy,
37    bucket_definitions: Vec<BucketDefinition>,
38) -> Result<ServiceTask> {
39    debug!("Start serving inspect tree.");
40
41    // This creates the root of an Inspect tree
42    // The Inspector is a singleton that you can access from any scope
43    let inspector = fuchsia_inspect::component::inspector();
44
45    // This serves the Inspect tree, converting failures into fatal errors
46    let inspect_controller =
47        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
48            .ok_or_else(|| anyhow!("Failed to serve server handling `fuchsia.inspect.Tree`"))?;
49
50    build_inspect_tree(kernel_stats_proxy.clone(), stall_provider, inspector);
51    let digest_service = digest_service(
52        memory_monitor2_config,
53        attribution_data_service,
54        kernel_stats_proxy,
55        memorypressure_proxy,
56        bucket_definitions,
57        inspector.root().create_child("logger"),
58    )?;
59    Ok(ServiceTask { _inspect_controller: inspect_controller, _periodic_digest: digest_service })
60}
61
62fn build_inspect_tree(
63    kernel_stats_proxy: fkernel::StatsProxy,
64    stall_provider: Arc<impl StallProvider>,
65    inspector: &Inspector,
66) {
67    // Lazy evaluation is unregistered when the `LazyNode` is dropped.
68    {
69        let kernel_stats_proxy = kernel_stats_proxy.clone();
70        inspector.root().record_lazy_child("kmem_stats", move || {
71            let kernel_stats_proxy = kernel_stats_proxy.clone();
72            async move {
73                let inspector = Inspector::default();
74                let root = inspector.root();
75                let mem_stats = kernel_stats_proxy.get_memory_stats().await?;
76                mem_stats.total_bytes.map(|v| root.record_uint("total_bytes", v));
77                mem_stats.free_bytes.map(|v| root.record_uint("free_bytes", v));
78                mem_stats.free_loaned_bytes.map(|v| root.record_uint("free_loaned_bytes", v));
79                mem_stats.wired_bytes.map(|v| root.record_uint("wired_bytes", v));
80                mem_stats.total_heap_bytes.map(|v| root.record_uint("total_heap_bytes", v));
81                mem_stats.free_heap_bytes.map(|v| root.record_uint("free_heap_bytes", v));
82                mem_stats.vmo_bytes.map(|v| root.record_uint("vmo_bytes", v));
83                mem_stats.mmu_overhead_bytes.map(|v| root.record_uint("mmu_overhead_bytes", v));
84                mem_stats.ipc_bytes.map(|v| root.record_uint("ipc_bytes", v));
85                mem_stats.cache_bytes.map(|v| root.record_uint("cache_bytes", v));
86                mem_stats.slab_bytes.map(|v| root.record_uint("slab_bytes", v));
87                mem_stats.zram_bytes.map(|v| root.record_uint("zram_bytes", v));
88                mem_stats.other_bytes.map(|v| root.record_uint("other_bytes", v));
89                mem_stats
90                    .vmo_reclaim_total_bytes
91                    .map(|v| root.record_uint("vmo_reclaim_total_bytes", v));
92                mem_stats
93                    .vmo_reclaim_newest_bytes
94                    .map(|v| root.record_uint("vmo_reclaim_newest_bytes", v));
95                mem_stats
96                    .vmo_reclaim_oldest_bytes
97                    .map(|v| root.record_uint("vmo_reclaim_oldest_bytes", v));
98                mem_stats
99                    .vmo_reclaim_disabled_bytes
100                    .map(|v| root.record_uint("vmo_reclaim_disabled_bytes", v));
101                mem_stats
102                    .vmo_discardable_locked_bytes
103                    .map(|v| root.record_uint("vmo_discardable_locked_bytes", v));
104                mem_stats
105                    .vmo_discardable_unlocked_bytes
106                    .map(|v| root.record_uint("vmo_discardable_unlocked_bytes", v));
107                Ok(inspector)
108            }
109            .boxed()
110        })
111    };
112
113    {
114        inspector.root().record_lazy_child("kmem_stats_compression", move || {
115            let kernel_stats_proxy = kernel_stats_proxy.clone();
116            async move {
117                let inspector = Inspector::default();
118                let cmp_stats = kernel_stats_proxy.get_memory_stats_compression().await?;
119                cmp_stats
120                    .uncompressed_storage_bytes
121                    .map(|v| inspector.root().record_uint("uncompressed_storage_bytes", v));
122                cmp_stats
123                    .compressed_storage_bytes
124                    .map(|v| inspector.root().record_uint("compressed_storage_bytes", v));
125                cmp_stats
126                    .compressed_fragmentation_bytes
127                    .map(|v| inspector.root().record_uint("compressed_fragmentation_bytes", v));
128                cmp_stats
129                    .compression_time
130                    .map(|v| inspector.root().record_int("compression_time", v));
131                cmp_stats
132                    .decompression_time
133                    .map(|v| inspector.root().record_int("decompression_time", v));
134                cmp_stats
135                    .total_page_compression_attempts
136                    .map(|v| inspector.root().record_uint("total_page_compression_attempts", v));
137                cmp_stats
138                    .failed_page_compression_attempts
139                    .map(|v| inspector.root().record_uint("failed_page_compression_attempts", v));
140                cmp_stats
141                    .total_page_decompressions
142                    .map(|v| inspector.root().record_uint("total_page_decompressions", v));
143                cmp_stats
144                    .compressed_page_evictions
145                    .map(|v| inspector.root().record_uint("compressed_page_evictions", v));
146                cmp_stats
147                    .eager_page_compressions
148                    .map(|v| inspector.root().record_uint("eager_page_compressions", v));
149                cmp_stats
150                    .memory_pressure_page_compressions
151                    .map(|v| inspector.root().record_uint("memory_pressure_page_compressions", v));
152                cmp_stats
153                    .critical_memory_page_compressions
154                    .map(|v| inspector.root().record_uint("critical_memory_page_compressions", v));
155                cmp_stats
156                    .pages_decompressed_unit_ns
157                    .map(|v| inspector.root().record_uint("pages_decompressed_unit_ns", v));
158                cmp_stats.pages_decompressed_within_log_time.map(|v| {
159                    let array =
160                        inspector.root().create_uint_array("pages_decompressed_within_log_time", 8);
161                    // Using constant strings saves allocations.
162                    array.set(0, v[0]);
163                    array.set(1, v[1]);
164                    array.set(2, v[2]);
165                    array.set(3, v[3]);
166                    array.set(4, v[4]);
167                    array.set(5, v[5]);
168                    array.set(6, v[6]);
169                    array.set(7, v[7]);
170                    inspector.root().record(array);
171                });
172                Ok(inspector)
173            }
174            .boxed()
175        });
176    }
177
178    {
179        inspector.root().record_lazy_child("stalls", move || {
180            let stall_info = stall_provider.get_stall_info().unwrap();
181            let stall_rate_opt = stall_provider.get_stall_rate();
182            async move {
183                let inspector = Inspector::default();
184                inspector.root().record_int("current_some", stall_info.stall_time_some);
185                inspector.root().record_int("current_full", stall_info.stall_time_full);
186                if let Some(stall_rate) = stall_rate_opt {
187                    inspector
188                        .root()
189                        .record_int("rate_interval_s", stall_rate.interval.into_seconds());
190                    inspector.root().record_int("rate_some", stall_rate.rate_some);
191                    inspector.root().record_int("rate_full", stall_rate.rate_full);
192                }
193                Ok(inspector)
194            }
195            .boxed()
196        });
197    }
198}
199
200fn digest_service(
201    memory_monitor2_config: Config,
202    attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
203    kernel_stats_proxy: fkernel::StatsProxy,
204    memorypressure_proxy: fpressure::ProviderProxy,
205    bucket_definitions: Vec<BucketDefinition>,
206    digest_node: Node,
207) -> Result<Task<Result<(), Error>>> {
208    // Initialize pressure monitoring.
209    let (watcher, watcher_stream) =
210        fidl::endpoints::create_request_stream::<fpressure::WatcherMarker>();
211    memorypressure_proxy.register_watcher(watcher)?;
212
213    Ok(fuchsia_async::Task::spawn(async move {
214        let mut buckets_list_node =
215            // Keep up to 100 measurements.
216            BoundedListNode::new(digest_node.create_child("measurements"), 100);
217        let buckets_names = std::cell::OnceCell::new();
218        let attribution_data_service = attribution_data_service;
219        let pressure_stream = watcher_stream.map_err(anyhow::Error::from);
220
221        // Get the initial, baseline pressure level.
222        let (request, mut pressure_stream) = pressure_stream.into_future().await;
223        let WatcherRequest::OnLevelChanged { level, responder } = request.ok_or_else(|| {
224            anyhow::Error::msg(
225                "Unexpectedly exhausted pressure stream before receiving baseline pressure level",
226            )
227        })??;
228        responder.send()?;
229        let mut current_level = level;
230        let new_timer = |level| {
231            MonotonicDuration::from_seconds(match level {
232                fpressure::Level::Normal => memory_monitor2_config.normal_capture_delay_s,
233                fpressure::Level::Warning => memory_monitor2_config.warning_capture_delay_s,
234                fpressure::Level::Critical => memory_monitor2_config.critical_capture_delay_s,
235            } as i64)
236            .into_timer()
237            .boxed()
238            .fuse()
239        };
240        let mut timer = new_timer(current_level);
241        loop {
242            // Wait for either a pressure change or the timer corresponding to the current level. In
243            // either case, reset the timer.
244            let () = select! {
245                // When we receive a pressure change, update the current level, and if necessary do
246                // a capture.
247                pressure = pressure_stream.next() =>
248                    match pressure.ok_or_else(|| anyhow::Error::msg("Unexpectedly exhausted pressure stream"))?? {
249                        WatcherRequest::OnLevelChanged{level, responder} => {
250                            responder.send()?;
251                            if level == current_level { continue; }
252                            current_level = level;
253                            timer = new_timer(level);
254                            if !memory_monitor2_config.capture_on_pressure_change { continue; }
255                        },
256                    },
257                // If instead we reached the deadline, do a capture anyway. The deadline depends on
258                // the current pressure level and the configuration.
259                _ = timer => {timer = new_timer(current_level);}
260            };
261
262            let timestamp = zx::BootInstant::get();
263            // Retrieve (concurrently) the data necessary to perform the aggregation.
264            let (kmem_stats, kmem_stats_compression) = try_join!(
265                kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
266                kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
267            )?;
268
269            // Compute the aggregation.
270            let Digest { buckets } = Digest::compute(
271                &*attribution_data_service,
272                &kmem_stats,
273                &kmem_stats_compression,
274                &bucket_definitions,
275            )?;
276
277            // Initialize the inspect property containing the buckets names, if necessary.
278            let _ = buckets_names.get_or_init(|| {
279                // Create inspect node to store buckets related information.
280                let buckets_names = digest_node.create_string_array("buckets", buckets.len());
281                for (i, attribution_processing::digest::Bucket { name, .. }) in
282                    buckets.iter().enumerate()
283                {
284                    buckets_names.set(i, name);
285                }
286                buckets_names
287            });
288
289            // Add an entry for the current aggregation.
290            buckets_list_node.add_entry(|n| {
291                n.record_int("timestamp", timestamp.into_nanos());
292                let ia = n.create_uint_array("bucket_sizes", buckets.len());
293                for (i, b) in buckets.iter().enumerate() {
294                    ia.set(i, b.size as u64);
295                }
296                n.record(ia);
297            });
298        }
299    }))
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use attribution_processing::testing::FakeAttributionDataProvider;
306    use attribution_processing::{
307        Attribution, AttributionData, Principal, PrincipalDescription, PrincipalIdentifier,
308        PrincipalType, Resource, ResourceReference, ZXName,
309    };
310    use diagnostics_assertions::{assert_data_tree, NonZeroIntProperty};
311    use fuchsia_async::TestExecutor;
312    use futures::task::Poll;
313    use futures::TryStreamExt;
314    use std::time::Duration;
315    use {
316        fidl_fuchsia_memory_attribution_plugin as fplugin,
317        fidl_fuchsia_memorypressure as fpressure, fuchsia_async as fasync,
318    };
319
320    fn get_attribution_data_provider() -> Arc<impl AttributionDataProvider + 'static> {
321        let attribution_data = AttributionData {
322            principals_vec: vec![Principal {
323                identifier: PrincipalIdentifier(1),
324                description: PrincipalDescription::Component("principal".to_owned()),
325                principal_type: PrincipalType::Runnable,
326                parent: None,
327            }],
328            resources_vec: vec![Resource {
329                koid: 10,
330                name_index: 0,
331                resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
332                    parent: None,
333                    private_committed_bytes: Some(1024),
334                    private_populated_bytes: Some(2048),
335                    scaled_committed_bytes: Some(1024),
336                    scaled_populated_bytes: Some(2048),
337                    total_committed_bytes: Some(1024),
338                    total_populated_bytes: Some(2048),
339                    ..Default::default()
340                }),
341            }],
342            resource_names: vec![ZXName::from_string_lossy("resource")],
343            attributions: vec![Attribution {
344                source: PrincipalIdentifier(1),
345                subject: PrincipalIdentifier(1),
346                resources: vec![ResourceReference::KernelObject(10)],
347            }],
348        };
349        Arc::new(FakeAttributionDataProvider { attribution_data })
350    }
351
352    async fn serve_kernel_stats(
353        mut request_stream: fkernel::StatsRequestStream,
354    ) -> Result<(), fidl::Error> {
355        while let Some(request) = request_stream.try_next().await? {
356            match request {
357                fkernel::StatsRequest::GetMemoryStats { responder } => {
358                    responder
359                        .send(&fkernel::MemoryStats {
360                            total_bytes: Some(1),
361                            free_bytes: Some(2),
362                            wired_bytes: Some(3),
363                            total_heap_bytes: Some(4),
364                            free_heap_bytes: Some(5),
365                            vmo_bytes: Some(6),
366                            mmu_overhead_bytes: Some(7),
367                            ipc_bytes: Some(8),
368                            other_bytes: Some(9),
369                            free_loaned_bytes: Some(10),
370                            cache_bytes: Some(11),
371                            slab_bytes: Some(12),
372                            zram_bytes: Some(13),
373                            vmo_reclaim_total_bytes: Some(14),
374                            vmo_reclaim_newest_bytes: Some(15),
375                            vmo_reclaim_oldest_bytes: Some(16),
376                            vmo_reclaim_disabled_bytes: Some(17),
377                            vmo_discardable_locked_bytes: Some(18),
378                            vmo_discardable_unlocked_bytes: Some(19),
379                            ..Default::default()
380                        })
381                        .unwrap();
382                }
383                fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
384                    unimplemented!("Deprecated call, should not be used")
385                }
386                fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
387                    responder
388                        .send(&fkernel::MemoryStatsCompression {
389                            uncompressed_storage_bytes: Some(20),
390                            compressed_storage_bytes: Some(21),
391                            compressed_fragmentation_bytes: Some(22),
392                            compression_time: Some(23),
393                            decompression_time: Some(24),
394                            total_page_compression_attempts: Some(25),
395                            failed_page_compression_attempts: Some(26),
396                            total_page_decompressions: Some(27),
397                            compressed_page_evictions: Some(28),
398                            eager_page_compressions: Some(29),
399                            memory_pressure_page_compressions: Some(30),
400                            critical_memory_page_compressions: Some(31),
401                            pages_decompressed_unit_ns: Some(32),
402                            pages_decompressed_within_log_time: Some([
403                                40, 41, 42, 43, 44, 45, 46, 47,
404                            ]),
405                            ..Default::default()
406                        })
407                        .unwrap();
408                }
409                fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
410                fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
411            }
412        }
413        Ok(())
414    }
415
416    #[test]
417    fn test_build_inspect_tree() {
418        let mut exec = fasync::TestExecutor::new();
419        let (stats_provider, stats_request_stream) =
420            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
421
422        fasync::Task::spawn(async move {
423            serve_kernel_stats(stats_request_stream).await.unwrap();
424        })
425        .detach();
426
427        let inspector = fuchsia_inspect::Inspector::default();
428
429        struct FakeStallProvider {}
430        impl StallProvider for FakeStallProvider {
431            fn get_stall_info(&self) -> Result<zx::MemoryStall, anyhow::Error> {
432                Ok(zx::MemoryStall { stall_time_some: 10, stall_time_full: 20 })
433            }
434
435            fn get_stall_rate(&self) -> Option<stalls::MemoryStallRate> {
436                Some(stalls::MemoryStallRate {
437                    interval: fasync::MonotonicDuration::from_seconds(60),
438                    rate_some: 1,
439                    rate_full: 2,
440                })
441            }
442        }
443
444        build_inspect_tree(stats_provider, Arc::new(FakeStallProvider {}), &inspector);
445
446        let output = exec
447            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
448            .expect("got hierarchy");
449
450        assert_data_tree!(output, root: {
451            kmem_stats: {
452                total_bytes: 1u64,
453                free_bytes: 2u64,
454                wired_bytes: 3u64,
455                total_heap_bytes: 4u64,
456                free_heap_bytes: 5u64,
457                vmo_bytes: 6u64,
458                mmu_overhead_bytes: 7u64,
459                ipc_bytes: 8u64,
460                other_bytes: 9u64,
461                free_loaned_bytes: 10u64,
462                cache_bytes: 11u64,
463                slab_bytes: 12u64,
464                zram_bytes: 13u64,
465                vmo_reclaim_total_bytes: 14u64,
466                vmo_reclaim_newest_bytes: 15u64,
467                vmo_reclaim_oldest_bytes: 16u64,
468                vmo_reclaim_disabled_bytes: 17u64,
469                vmo_discardable_locked_bytes: 18u64,
470                vmo_discardable_unlocked_bytes: 19u64
471            },
472            kmem_stats_compression: {
473                uncompressed_storage_bytes: 20u64,
474                compressed_storage_bytes: 21u64,
475                compressed_fragmentation_bytes: 22u64,
476                compression_time: 23i64,
477                decompression_time: 24i64,
478                total_page_compression_attempts: 25u64,
479                failed_page_compression_attempts: 26u64,
480                total_page_decompressions: 27u64,
481                compressed_page_evictions: 28u64,
482                eager_page_compressions: 29u64,
483                memory_pressure_page_compressions: 30u64,
484                critical_memory_page_compressions: 31u64,
485                pages_decompressed_unit_ns: 32u64,
486                pages_decompressed_within_log_time: vec![
487                    40u64, 41u64, 42u64, 43u64, 44u64, 45u64, 46u64, 47u64,
488                ]
489            },
490            stalls: {
491                current_some: 10i64,
492                current_full: 20i64,
493                rate_some: 1i64,
494                rate_full: 2i64,
495                rate_interval_s: 60i64
496            }
497        });
498    }
499
500    #[test]
501    fn test_digest_service_capture_on_pressure_change_and_wait() -> anyhow::Result<()> {
502        let mut exec = fasync::TestExecutor::new_with_fake_time();
503        let (stats_provider, stats_request_stream) =
504            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
505
506        fasync::Task::spawn(async move {
507            serve_kernel_stats(stats_request_stream).await.unwrap();
508        })
509        .detach();
510
511        let inspector = fuchsia_inspect::Inspector::default();
512        let (pressure_provider, pressure_request_stream) =
513            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
514        let mut digest_service = std::pin::pin!(digest_service(
515            Config {
516                capture_on_pressure_change: true,
517                imminent_oom_capture_delay_s: 10,
518                critical_capture_delay_s: 10,
519                warning_capture_delay_s: 10,
520                normal_capture_delay_s: 10,
521            },
522            get_attribution_data_provider(),
523            stats_provider,
524            pressure_provider,
525            vec![],
526            inspector.root().create_child("logger"),
527        )?);
528        // Expects digst_service to register a watcher, answers with
529        // an initial pressure level, then returns the watcher for
530        // further signaling. Panics if this whole transaction is not
531        // immediately ready.
532        let Poll::Ready(watcher) = exec
533            .run_until_stalled(
534                &mut pressure_request_stream
535                    .then(|request| async {
536                        let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
537                            request.expect("digest_service failed to register a watcher");
538                        let watcher = watcher.into_proxy();
539                        watcher.on_level_changed(fpressure::Level::Normal).await.expect(
540                            "digest_service failed to acknowledge the initial pressure level",
541                        );
542                        watcher
543                    })
544                    .boxed()
545                    .into_future(),
546            )
547            .map(|(watcher, _)| {
548                watcher.ok_or_else(|| anyhow::Error::msg("failed to register watcher"))
549            })?
550        else {
551            panic!("digest_service failed to register a watcher");
552        };
553        // Send a pressure signal, to trigger a capture.
554        assert!(exec
555            .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Warning))?
556            .is_ready());
557        // Ensure that digest_service has an opportunity to react to the pressure signal.
558        let _ = exec.run_until_stalled(&mut digest_service);
559
560        // Fake the passage of time, so that digest_service may do another capture.
561        assert!(exec
562            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
563                exec.now() + Duration::from_secs(10).into()
564            )))
565            .is_ready());
566        // Ensure that digest_service has an opportunity to react to the passage of time.
567        let _ = exec.run_until_stalled(&mut digest_service)?;
568
569        // This should resolve immediately because the inspect hierarchy has been populated by now.
570        let Poll::Ready(output) = exec
571            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
572            .map(|r| r.expect("got hierarchy"))
573        else {
574            panic!("Couldn't retrieve inspect output");
575        };
576
577        assert_data_tree!(output, root: {
578            logger: {
579                buckets: vec![
580                    "Undigested",
581                    "Orphaned",
582                    "Kernel",
583                    "Free",
584                    "[Addl]PagerTotal",
585                    "[Addl]PagerNewest",
586                    "[Addl]PagerOldest",
587                    "[Addl]DiscardableLocked",
588                    "[Addl]DiscardableUnlocked",
589                    "[Addl]ZramCompressedBytes",
590                ],
591                measurements: {
592                    // Corresponds to the capture on pressure change
593                    "0": {
594                        timestamp: NonZeroIntProperty,
595                        bucket_sizes: vec![
596                            1024u64, // Undigested: matches the single unmatched VMO
597                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
598                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
599                            2u64,    // Free
600                            14u64,   // [Addl]PagerTotal
601                            15u64,   // [Addl]PagerNewest
602                            16u64,   // [Addl]PagerOldest
603                            18u64,   // [Addl]DiscardableLocked
604                            19u64,   // [Addl]DiscardableUnlocked
605                            21u64,   // [Addl]ZramCompressedBytes
606                        ],
607                    },
608                    // Corresponds to the capture after the passage of time
609                    "1": {
610                        timestamp: NonZeroIntProperty,
611                        bucket_sizes: vec![
612                            1024u64, // Undigested: matches the single unmatched VMO
613                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
614                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
615                            2u64,    // Free
616                            14u64,   // [Addl]PagerTotal
617                            15u64,   // [Addl]PagerNewest
618                            16u64,   // [Addl]PagerOldest
619                            18u64,   // [Addl]DiscardableLocked
620                            19u64,   // [Addl]DiscardableUnlocked
621                            21u64,   // [Addl]ZramCompressedBytes
622                        ],
623                    },
624                },
625            },
626        });
627        Ok(())
628    }
629
630    #[test]
631    fn test_digest_service_wait() -> anyhow::Result<()> {
632        let mut exec = fasync::TestExecutor::new_with_fake_time();
633        let (stats_provider, stats_request_stream) =
634            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
635
636        fasync::Task::spawn(async move {
637            serve_kernel_stats(stats_request_stream).await.unwrap();
638        })
639        .detach();
640        let (pressure_provider, pressure_request_stream) =
641            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
642        let inspector = fuchsia_inspect::Inspector::default();
643        let mut digest_service = std::pin::pin!(digest_service(
644            Config {
645                capture_on_pressure_change: false,
646                imminent_oom_capture_delay_s: 10,
647                critical_capture_delay_s: 10,
648                warning_capture_delay_s: 10,
649                normal_capture_delay_s: 10,
650            },
651            get_attribution_data_provider(),
652            stats_provider,
653            pressure_provider,
654            vec![],
655            inspector.root().create_child("logger"),
656        )?);
657        // digest_service registers a watcher; make sure we answer.  Also, make sure not to drop the
658        // proxy nor the pressure stream; early termination would get reported to digest_service,
659        // which then prematurely interrupts it, before the timers have a chance to run.
660        let Poll::Ready((_watcher, _pressure_stream)) = exec
661            .run_until_stalled(
662                &mut std::pin::pin!(pressure_request_stream.then(|request| async {
663                    let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
664                        request.map_err(anyhow::Error::from)?;
665                    let watcher_proxy = watcher.into_proxy();
666                    let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
667                    Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
668                }))
669                .into_future(),
670            )
671            .map(|(watcher, pressure_stream)| {
672                (
673                    watcher.ok_or_else(|| {
674                        anyhow::Error::msg("Pressure stream unexpectedly exhausted")
675                    }),
676                    pressure_stream,
677                )
678            })
679        else {
680            panic!("Failed to register the watcher");
681        };
682
683        // Give digest_service the opportunity to setup its timers.
684        let _ = exec.run_until_stalled(&mut digest_service)?;
685        // Fake the passage of time, so that digest_service may do another capture.
686        assert!(exec
687            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
688                exec.now() + Duration::from_secs(15).into()
689            )))
690            .is_ready());
691        // Ensure that digest_service has an opportunity to react to the passage of time.
692        assert!(exec.run_until_stalled(&mut digest_service).is_pending());
693        // This should resolve immediately because the inspect hierarchy has been populated by now.
694        let Poll::Ready(output) = exec
695            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
696            .map(|r| r.expect("got hierarchy"))
697        else {
698            panic!("Couldn't retrieve inspect output");
699        };
700
701        assert_data_tree!(output, root: {
702            logger: {
703                buckets: vec![
704                    "Undigested",
705                    "Orphaned",
706                    "Kernel",
707                    "Free",
708                    "[Addl]PagerTotal",
709                    "[Addl]PagerNewest",
710                    "[Addl]PagerOldest",
711                    "[Addl]DiscardableLocked",
712                    "[Addl]DiscardableUnlocked",
713                    "[Addl]ZramCompressedBytes",
714                ],
715                measurements: {
716                    // Corresponds to the capture after the passage of time
717                    "0": {
718                        timestamp: NonZeroIntProperty,
719                        bucket_sizes: vec![
720                            1024u64, // Undigested: matches the single unmatched VMO
721                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
722                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
723                            2u64,    // Free
724                            14u64,   // [Addl]PagerTotal
725                            15u64,   // [Addl]PagerNewest
726                            16u64,   // [Addl]PagerOldest
727                            18u64,   // [Addl]DiscardableLocked
728                            19u64,   // [Addl]DiscardableUnlocked
729                            21u64,   // [Addl]ZramCompressedBytes
730                        ],
731                    },
732                },
733            },
734        });
735        Ok(())
736    }
737
738    #[test]
739    fn test_digest_service_no_capture_on_pressure_change() -> anyhow::Result<()> {
740        let mut exec = fasync::TestExecutor::new();
741        let (stats_provider, stats_request_stream) =
742            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
743
744        fasync::Task::spawn(async move {
745            serve_kernel_stats(stats_request_stream).await.unwrap();
746        })
747        .detach();
748
749        let inspector = fuchsia_inspect::Inspector::default();
750        let (pressure_provider, pressure_request_stream) =
751            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
752        let mut serve_pressure_stream = pressure_request_stream
753            .then(|request| async {
754                let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
755                    request.map_err(anyhow::Error::from)?;
756                let watcher_proxy = watcher.into_proxy();
757                let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
758                Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
759            })
760            .boxed();
761        let mut digest_service = std::pin::pin!(digest_service(
762            Config {
763                capture_on_pressure_change: false,
764                imminent_oom_capture_delay_s: 10,
765                critical_capture_delay_s: 10,
766                warning_capture_delay_s: 10,
767                normal_capture_delay_s: 10,
768            },
769            get_attribution_data_provider(),
770            stats_provider,
771            pressure_provider,
772            vec![],
773            inspector.root().create_child("logger"),
774        )?);
775        let watcher =
776            exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
777        let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
778        let _ = exec.run_until_stalled(&mut digest_service);
779        let output = exec
780            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
781            .expect("got hierarchy");
782
783        assert_data_tree!(output, root: {
784            logger: {
785                measurements: {},
786            },
787        });
788        Ok(())
789    }
790
791    #[test]
792    fn test_digest_service_capture_on_pressure_change() -> anyhow::Result<()> {
793        let mut exec = fasync::TestExecutor::new();
794        let (stats_provider, stats_request_stream) =
795            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
796
797        fasync::Task::spawn(async move {
798            serve_kernel_stats(stats_request_stream).await.unwrap();
799        })
800        .detach();
801
802        let inspector = fuchsia_inspect::Inspector::default();
803        let (pressure_provider, pressure_request_stream) =
804            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
805        let mut serve_pressure_stream = pressure_request_stream
806            .then(|request| async {
807                let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
808                    request.map_err(anyhow::Error::from)?;
809                let watcher_proxy = watcher.into_proxy();
810                let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
811                Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
812            })
813            .boxed();
814        let mut digest_service = std::pin::pin!(digest_service(
815            Config {
816                capture_on_pressure_change: true,
817                imminent_oom_capture_delay_s: 10,
818                critical_capture_delay_s: 10,
819                warning_capture_delay_s: 10,
820                normal_capture_delay_s: 10,
821            },
822            get_attribution_data_provider(),
823            stats_provider,
824            pressure_provider,
825            vec![],
826            inspector.root().create_child("logger"),
827        )?);
828        let watcher =
829            exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
830        let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
831        let _ = exec.run_until_stalled(&mut digest_service);
832        let output = exec
833            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
834            .expect("got hierarchy");
835
836        assert_data_tree!(output, root: {
837            logger: {
838                buckets: vec![
839                    "Undigested",
840                    "Orphaned",
841                    "Kernel",
842                    "Free",
843                    "[Addl]PagerTotal",
844                    "[Addl]PagerNewest",
845                    "[Addl]PagerOldest",
846                    "[Addl]DiscardableLocked",
847                    "[Addl]DiscardableUnlocked",
848                    "[Addl]ZramCompressedBytes",
849                ],
850                measurements: {
851                    "0": {
852                        timestamp: NonZeroIntProperty,
853                        bucket_sizes: vec![
854                            1024u64, // Undigested: matches the single unmatched VMO
855                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
856                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
857                            2u64,    // Free
858                            14u64,   // [Addl]PagerTotal
859                            15u64,   // [Addl]PagerNewest
860                            16u64,   // [Addl]PagerOldest
861                            18u64,   // [Addl]DiscardableLocked
862                            19u64,   // [Addl]DiscardableUnlocked
863                            21u64,   // [Addl]ZramCompressedBytes
864                        ],
865                    },
866                },
867            },
868        });
869        Ok(())
870    }
871}