inspect_nodes/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4#![warn(clippy::unwrap_used)]
5#![cfg_attr(test, allow(clippy::unwrap_used))]
6
7use anyhow::{anyhow, Error, Result};
8use attribution_processing::digest::{BucketDefinition, Digest};
9use attribution_processing::AttributionDataProvider;
10use fpressure::WatcherRequest;
11use fuchsia_async::{MonotonicDuration, MonotonicInstant, Task, WakeupTime};
12use fuchsia_inspect::{ArrayProperty, Inspector, Node};
13use fuchsia_inspect_contrib::nodes::BoundedListNode;
14use futures::{select, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
15use inspect_runtime::PublishedInspectController;
16use log::{debug, warn};
17use memory_monitor2_config::Config;
18use stalls::StallProvider;
19use std::sync::Arc;
20use std::u64;
21use {
22    fidl_fuchsia_kernel as fkernel, fidl_fuchsia_memorypressure as fpressure, fuchsia_inspect as _,
23    inspect_runtime as _,
24};
25
26/// Hold the resource required to serve the inspect tree.
27/// The FIDL service stops when this object is dropped.
28pub struct ServiceTask {
29    _inspect_controller: PublishedInspectController,
30    _periodic_digest: Task<Result<(), anyhow::Error>>,
31}
32
33/// Begins to serve the inspect tree, and returns an object holding the server's resources.
34/// Dropping the `ServiceTask` stops the service.
35pub fn start_service(
36    attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
37    kernel_stats_proxy: fkernel::StatsProxy,
38    stall_provider: Arc<impl StallProvider>,
39    memory_monitor2_config: Config,
40    memorypressure_proxy: fpressure::ProviderProxy,
41    bucket_definitions: Arc<[BucketDefinition]>,
42) -> Result<ServiceTask> {
43    debug!("Start serving inspect tree.");
44
45    // This creates the root of an Inspect tree
46    // The Inspector is a singleton that you can access from any scope
47    let inspector = fuchsia_inspect::component::inspector();
48
49    // This serves the Inspect tree, converting failures into fatal errors
50    let inspect_controller =
51        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
52            .ok_or_else(|| anyhow!("Failed to serve server handling `fuchsia.inspect.Tree`"))?;
53
54    build_inspect_tree(
55        kernel_stats_proxy.clone(),
56        stall_provider.clone(),
57        inspector,
58        &memory_monitor2_config,
59    );
60    let digest_service = digest_service(
61        memory_monitor2_config,
62        attribution_data_service,
63        stall_provider,
64        kernel_stats_proxy,
65        memorypressure_proxy,
66        bucket_definitions,
67        inspector.root().create_child("logger"),
68    )?;
69    Ok(ServiceTask { _inspect_controller: inspect_controller, _periodic_digest: digest_service })
70}
71
72fn build_inspect_tree(
73    kernel_stats_proxy: fkernel::StatsProxy,
74    stall_provider: Arc<impl StallProvider>,
75    inspector: &Inspector,
76    config: &Config,
77) {
78    // Register the current config.
79    inspector.root().record_child("config", |node| config.record_inspect(node));
80
81    // Lazy evaluation is unregistered when the `LazyNode` is dropped.
82    {
83        let kernel_stats_proxy = kernel_stats_proxy.clone();
84        inspector.root().record_lazy_child("kmem_stats", move || {
85            let kernel_stats_proxy = kernel_stats_proxy.clone();
86            async move {
87                let inspector = Inspector::default();
88                let root = inspector.root();
89                let mem_stats = kernel_stats_proxy.get_memory_stats().await?;
90                mem_stats.total_bytes.map(|v| root.record_uint("total_bytes", v));
91                mem_stats.free_bytes.map(|v| root.record_uint("free_bytes", v));
92                mem_stats.free_loaned_bytes.map(|v| root.record_uint("free_loaned_bytes", v));
93                mem_stats.wired_bytes.map(|v| root.record_uint("wired_bytes", v));
94                mem_stats.total_heap_bytes.map(|v| root.record_uint("total_heap_bytes", v));
95                mem_stats.free_heap_bytes.map(|v| root.record_uint("free_heap_bytes", v));
96                mem_stats.vmo_bytes.map(|v| root.record_uint("vmo_bytes", v));
97                mem_stats.mmu_overhead_bytes.map(|v| root.record_uint("mmu_overhead_bytes", v));
98                mem_stats.ipc_bytes.map(|v| root.record_uint("ipc_bytes", v));
99                mem_stats.cache_bytes.map(|v| root.record_uint("cache_bytes", v));
100                mem_stats.slab_bytes.map(|v| root.record_uint("slab_bytes", v));
101                mem_stats.zram_bytes.map(|v| root.record_uint("zram_bytes", v));
102                mem_stats.other_bytes.map(|v| root.record_uint("other_bytes", v));
103                mem_stats
104                    .vmo_reclaim_total_bytes
105                    .map(|v| root.record_uint("vmo_reclaim_total_bytes", v));
106                mem_stats
107                    .vmo_reclaim_newest_bytes
108                    .map(|v| root.record_uint("vmo_reclaim_newest_bytes", v));
109                mem_stats
110                    .vmo_reclaim_oldest_bytes
111                    .map(|v| root.record_uint("vmo_reclaim_oldest_bytes", v));
112                mem_stats
113                    .vmo_reclaim_disabled_bytes
114                    .map(|v| root.record_uint("vmo_reclaim_disabled_bytes", v));
115                mem_stats
116                    .vmo_discardable_locked_bytes
117                    .map(|v| root.record_uint("vmo_discardable_locked_bytes", v));
118                mem_stats
119                    .vmo_discardable_unlocked_bytes
120                    .map(|v| root.record_uint("vmo_discardable_unlocked_bytes", v));
121                Ok(inspector)
122            }
123            .boxed()
124        })
125    };
126
127    {
128        inspector.root().record_lazy_child("kmem_stats_compression", move || {
129            let kernel_stats_proxy = kernel_stats_proxy.clone();
130            async move {
131                let inspector = Inspector::default();
132                let cmp_stats = kernel_stats_proxy.get_memory_stats_compression().await?;
133                cmp_stats
134                    .uncompressed_storage_bytes
135                    .map(|v| inspector.root().record_uint("uncompressed_storage_bytes", v));
136                cmp_stats
137                    .compressed_storage_bytes
138                    .map(|v| inspector.root().record_uint("compressed_storage_bytes", v));
139                cmp_stats
140                    .compressed_fragmentation_bytes
141                    .map(|v| inspector.root().record_uint("compressed_fragmentation_bytes", v));
142                cmp_stats
143                    .compression_time
144                    .map(|v| inspector.root().record_int("compression_time", v));
145                cmp_stats
146                    .decompression_time
147                    .map(|v| inspector.root().record_int("decompression_time", v));
148                cmp_stats
149                    .total_page_compression_attempts
150                    .map(|v| inspector.root().record_uint("total_page_compression_attempts", v));
151                cmp_stats
152                    .failed_page_compression_attempts
153                    .map(|v| inspector.root().record_uint("failed_page_compression_attempts", v));
154                cmp_stats
155                    .total_page_decompressions
156                    .map(|v| inspector.root().record_uint("total_page_decompressions", v));
157                cmp_stats
158                    .compressed_page_evictions
159                    .map(|v| inspector.root().record_uint("compressed_page_evictions", v));
160                cmp_stats
161                    .eager_page_compressions
162                    .map(|v| inspector.root().record_uint("eager_page_compressions", v));
163                cmp_stats
164                    .memory_pressure_page_compressions
165                    .map(|v| inspector.root().record_uint("memory_pressure_page_compressions", v));
166                cmp_stats
167                    .critical_memory_page_compressions
168                    .map(|v| inspector.root().record_uint("critical_memory_page_compressions", v));
169                cmp_stats
170                    .pages_decompressed_unit_ns
171                    .map(|v| inspector.root().record_uint("pages_decompressed_unit_ns", v));
172                cmp_stats.pages_decompressed_within_log_time.map(|v| {
173                    let array =
174                        inspector.root().create_uint_array("pages_decompressed_within_log_time", 8);
175                    // Using constant strings saves allocations.
176                    array.set(0, v[0]);
177                    array.set(1, v[1]);
178                    array.set(2, v[2]);
179                    array.set(3, v[3]);
180                    array.set(4, v[4]);
181                    array.set(5, v[5]);
182                    array.set(6, v[6]);
183                    array.set(7, v[7]);
184                    inspector.root().record(array);
185                });
186                Ok(inspector)
187            }
188            .boxed()
189        });
190    }
191
192    {
193        inspector.root().record_lazy_child("stalls", move || {
194            let stall_provider = stall_provider.clone();
195            async move {
196                let stall_info = stall_provider.get_stall_info()?;
197                let inspector = Inspector::default();
198                inspector.root().record_uint(
199                    "some_ms",
200                    stall_info.some.as_millis().try_into().unwrap_or(u64::MAX),
201                );
202                inspector.root().record_uint(
203                    "full_ms",
204                    stall_info.full.as_millis().try_into().unwrap_or(u64::MAX),
205                );
206                Ok(inspector)
207            }
208            .boxed()
209        });
210    }
211}
212
213fn pressure_to_deadline(level: fpressure::Level, config: &Config) -> MonotonicInstant {
214    MonotonicInstant::now()
215        + MonotonicDuration::from_seconds(match level {
216            fpressure::Level::Normal => config.normal_capture_delay_s,
217            fpressure::Level::Warning => config.warning_capture_delay_s,
218            fpressure::Level::Critical => config.critical_capture_delay_s,
219        } as i64)
220}
221
222fn digest_service(
223    memory_monitor2_config: Config,
224    attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
225    stall_provider: Arc<impl StallProvider>,
226    kernel_stats_proxy: fkernel::StatsProxy,
227    memorypressure_proxy: fpressure::ProviderProxy,
228    bucket_definitions: Arc<[BucketDefinition]>,
229    digest_node: Node,
230) -> Result<Task<Result<(), Error>>> {
231    // Initialize pressure monitoring.
232    let (watcher, watcher_stream) =
233        fidl::endpoints::create_request_stream::<fpressure::WatcherMarker>();
234    memorypressure_proxy.register_watcher(watcher)?;
235
236    Ok(fuchsia_async::Task::spawn(async move {
237        let mut buckets_list_node =
238            // Keep up to 100 measurements.
239            BoundedListNode::new(digest_node.create_child("measurements"), 100);
240        let buckets_names = std::cell::OnceCell::new();
241        let attribution_data_service = attribution_data_service;
242        let pressure_stream = watcher_stream.map_err(anyhow::Error::from);
243
244        // Get the initial, baseline pressure level.
245        let (request, mut pressure_stream) = pressure_stream.into_future().await;
246        let mut current_level = {
247            let WatcherRequest::OnLevelChanged { level, responder } = request.ok_or_else(|| {
248                anyhow::Error::msg(
249                    "Unexpectedly exhausted pressure stream before receiving baseline pressure level",
250                )
251            })??;
252            responder.send()?;
253            level
254        };
255        let mut deadline = pressure_to_deadline(current_level, &memory_monitor2_config);
256        let mut timer = Box::pin(deadline.into_timer());
257        loop {
258            // Wait for either a pressure change or the timer corresponding to the current level. In
259            // either case, reset the timer.
260            let () = select! {
261                // When we receive a pressure change, update the current level, and if necessary do
262                // a capture.
263                pressure = pressure_stream.next() =>
264                    match pressure.ok_or_else(|| anyhow::Error::msg("Unexpectedly exhausted pressure stream"))?? {
265                        WatcherRequest::OnLevelChanged{level, responder} => {
266                            responder.send()?;
267                            // Don't do anything if the pressure has not changed.
268                            if level == current_level { continue; }
269                            current_level = level;
270                            let new_deadline = pressure_to_deadline(level, &memory_monitor2_config);
271                            if memory_monitor2_config.capture_on_pressure_change {
272                                // Do a capture then use new schedule
273                                deadline = new_deadline;
274                                timer = Box::pin(deadline.into_timer());
275                            } else {
276                                // Schedule the next capture if needed. Never postpone a previously
277                                // scheduled but not yet honored capture: this would risk captures
278                                // being arbitrarily delayed if pressure changes frequently.
279                                if deadline > new_deadline {
280                                    deadline = new_deadline;
281                                    timer = Box::pin(deadline.into_timer());
282                                }
283                                continue;
284                            }
285                        },
286                    },
287                // If we reached the deadline, schedule the next capture and do the current one.
288                _ = timer => {
289                    deadline = pressure_to_deadline(current_level, &memory_monitor2_config);
290                    timer = Box::pin(deadline.into_timer());
291                },
292            };
293
294            let timestamp = zx::BootInstant::get();
295            // Retrieve (concurrently) the data necessary to perform the aggregation.
296            let (kmem_stats, kmem_stats_compression) = try_join!(
297                kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
298                kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
299            )?;
300
301            // Compute the aggregation.
302            let Digest { buckets } = Digest::compute(
303                &*attribution_data_service,
304                &kmem_stats,
305                &kmem_stats_compression,
306                &*bucket_definitions,
307            )?;
308
309            // Initialize the inspect property containing the buckets names, if necessary.
310            let _ = buckets_names.get_or_init(|| {
311                // Create inspect node to store buckets related information.
312                let buckets_names = digest_node.create_string_array("buckets", buckets.len());
313                for (i, attribution_processing::digest::Bucket { name, .. }) in
314                    buckets.iter().enumerate()
315                {
316                    buckets_names.set(i, name);
317                }
318                buckets_names
319            });
320
321            let stall_values = stall_provider.get_stall_info()?;
322
323            // Add an entry for the current aggregation.
324            buckets_list_node.add_entry(|n| {
325                n.record_int("timestamp", timestamp.into_nanos());
326                let ia = n.create_uint_array("bucket_sizes", buckets.len());
327                for (i, b) in buckets.iter().enumerate() {
328                    ia.set(i, b.size as u64);
329                }
330                n.record(ia);
331                n.record_child("stalls", |child| {
332                    child.record_uint(
333                        "some_ms",
334                        stall_values.some.as_millis().try_into().unwrap_or(u64::MAX),
335                    );
336                    child.record_uint(
337                        "full_ms",
338                        stall_values.full.as_millis().try_into().unwrap_or(u64::MAX),
339                    );
340                });
341            });
342        }
343    }))
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use attribution_processing::testing::FakeAttributionDataProvider;
350    use attribution_processing::{
351        Attribution, AttributionData, Principal, PrincipalDescription, PrincipalIdentifier,
352        PrincipalType, Resource, ResourceReference, ZXName,
353    };
354    use diagnostics_assertions::{assert_data_tree, NonZeroIntProperty};
355    use fuchsia_async::TestExecutor;
356    use futures::task::Poll;
357    use futures::TryStreamExt;
358    use stalls::MemoryStallMetrics;
359    use std::time::Duration;
360    use {
361        fidl_fuchsia_memory_attribution_plugin as fplugin,
362        fidl_fuchsia_memorypressure as fpressure, fuchsia_async as fasync,
363    };
364
365    fn get_attribution_data_provider() -> Arc<impl AttributionDataProvider + 'static> {
366        let attribution_data = AttributionData {
367            principals_vec: vec![Principal {
368                identifier: PrincipalIdentifier(1),
369                description: PrincipalDescription::Component("principal".to_owned()),
370                principal_type: PrincipalType::Runnable,
371                parent: None,
372            }],
373            resources_vec: vec![Resource {
374                koid: 10,
375                name_index: 0,
376                resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
377                    parent: None,
378                    private_committed_bytes: Some(1024),
379                    private_populated_bytes: Some(2048),
380                    scaled_committed_bytes: Some(1024),
381                    scaled_populated_bytes: Some(2048),
382                    total_committed_bytes: Some(1024),
383                    total_populated_bytes: Some(2048),
384                    ..Default::default()
385                }),
386            }],
387            resource_names: vec![ZXName::from_string_lossy("resource")],
388            attributions: vec![Attribution {
389                source: PrincipalIdentifier(1),
390                subject: PrincipalIdentifier(1),
391                resources: vec![ResourceReference::KernelObject(10)],
392            }],
393        };
394        Arc::new(FakeAttributionDataProvider { attribution_data })
395    }
396
397    async fn serve_kernel_stats(
398        mut request_stream: fkernel::StatsRequestStream,
399    ) -> Result<(), fidl::Error> {
400        while let Some(request) = request_stream.try_next().await? {
401            match request {
402                fkernel::StatsRequest::GetMemoryStats { responder } => {
403                    responder
404                        .send(&fkernel::MemoryStats {
405                            total_bytes: Some(1),
406                            free_bytes: Some(2),
407                            wired_bytes: Some(3),
408                            total_heap_bytes: Some(4),
409                            free_heap_bytes: Some(5),
410                            vmo_bytes: Some(6),
411                            mmu_overhead_bytes: Some(7),
412                            ipc_bytes: Some(8),
413                            other_bytes: Some(9),
414                            free_loaned_bytes: Some(10),
415                            cache_bytes: Some(11),
416                            slab_bytes: Some(12),
417                            zram_bytes: Some(13),
418                            vmo_reclaim_total_bytes: Some(14),
419                            vmo_reclaim_newest_bytes: Some(15),
420                            vmo_reclaim_oldest_bytes: Some(16),
421                            vmo_reclaim_disabled_bytes: Some(17),
422                            vmo_discardable_locked_bytes: Some(18),
423                            vmo_discardable_unlocked_bytes: Some(19),
424                            ..Default::default()
425                        })
426                        .unwrap();
427                }
428                fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
429                    unimplemented!("Deprecated call, should not be used")
430                }
431                fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
432                    responder
433                        .send(&fkernel::MemoryStatsCompression {
434                            uncompressed_storage_bytes: Some(20),
435                            compressed_storage_bytes: Some(21),
436                            compressed_fragmentation_bytes: Some(22),
437                            compression_time: Some(23),
438                            decompression_time: Some(24),
439                            total_page_compression_attempts: Some(25),
440                            failed_page_compression_attempts: Some(26),
441                            total_page_decompressions: Some(27),
442                            compressed_page_evictions: Some(28),
443                            eager_page_compressions: Some(29),
444                            memory_pressure_page_compressions: Some(30),
445                            critical_memory_page_compressions: Some(31),
446                            pages_decompressed_unit_ns: Some(32),
447                            pages_decompressed_within_log_time: Some([
448                                40, 41, 42, 43, 44, 45, 46, 47,
449                            ]),
450                            ..Default::default()
451                        })
452                        .unwrap();
453                }
454                fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
455                fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
456            }
457        }
458        Ok(())
459    }
460
461    struct FakeStallProvider {}
462    impl StallProvider for FakeStallProvider {
463        fn get_stall_info(&self) -> Result<MemoryStallMetrics, anyhow::Error> {
464            Ok(MemoryStallMetrics {
465                some: Duration::from_millis(10),
466                full: Duration::from_millis(20),
467            })
468        }
469    }
470
471    #[test]
472    fn test_build_inspect_tree() {
473        let mut exec = fasync::TestExecutor::new();
474        let (stats_provider, stats_request_stream) =
475            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
476
477        fasync::Task::spawn(async move {
478            serve_kernel_stats(stats_request_stream).await.unwrap();
479        })
480        .detach();
481
482        let inspector = fuchsia_inspect::Inspector::default();
483
484        build_inspect_tree(
485            stats_provider,
486            Arc::new(FakeStallProvider {}),
487            &inspector,
488            &Config {
489                capture_on_pressure_change: true,
490                critical_capture_delay_s: 1,
491                imminent_oom_capture_delay_s: 2,
492                normal_capture_delay_s: 3,
493                warning_capture_delay_s: 4,
494            },
495        );
496
497        let output = exec
498            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
499            .expect("got hierarchy");
500
501        assert_data_tree!(@executor exec, output, root: {
502            kmem_stats: {
503                total_bytes: 1u64,
504                free_bytes: 2u64,
505                wired_bytes: 3u64,
506                total_heap_bytes: 4u64,
507                free_heap_bytes: 5u64,
508                vmo_bytes: 6u64,
509                mmu_overhead_bytes: 7u64,
510                ipc_bytes: 8u64,
511                other_bytes: 9u64,
512                free_loaned_bytes: 10u64,
513                cache_bytes: 11u64,
514                slab_bytes: 12u64,
515                zram_bytes: 13u64,
516                vmo_reclaim_total_bytes: 14u64,
517                vmo_reclaim_newest_bytes: 15u64,
518                vmo_reclaim_oldest_bytes: 16u64,
519                vmo_reclaim_disabled_bytes: 17u64,
520                vmo_discardable_locked_bytes: 18u64,
521                vmo_discardable_unlocked_bytes: 19u64
522            },
523            kmem_stats_compression: {
524                uncompressed_storage_bytes: 20u64,
525                compressed_storage_bytes: 21u64,
526                compressed_fragmentation_bytes: 22u64,
527                compression_time: 23i64,
528                decompression_time: 24i64,
529                total_page_compression_attempts: 25u64,
530                failed_page_compression_attempts: 26u64,
531                total_page_decompressions: 27u64,
532                compressed_page_evictions: 28u64,
533                eager_page_compressions: 29u64,
534                memory_pressure_page_compressions: 30u64,
535                critical_memory_page_compressions: 31u64,
536                pages_decompressed_unit_ns: 32u64,
537                pages_decompressed_within_log_time: vec![
538                    40u64, 41u64, 42u64, 43u64, 44u64, 45u64, 46u64, 47u64,
539                ]
540            },
541            stalls: {
542                some_ms: 10u64,
543                full_ms: 20u64,
544            },
545            config: {
546                capture_on_pressure_change: true,
547                critical_capture_delay_s: 1u64,
548                imminent_oom_capture_delay_s: 2u64,
549                normal_capture_delay_s: 3u64,
550                warning_capture_delay_s: 4u64,
551            },
552        });
553    }
554
555    #[test]
556    fn test_digest_service_capture_on_pressure_change_and_wait() -> anyhow::Result<()> {
557        let mut exec = fasync::TestExecutor::new_with_fake_time();
558        let (stats_provider, stats_request_stream) =
559            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
560
561        fasync::Task::spawn(async move {
562            serve_kernel_stats(stats_request_stream).await.unwrap();
563        })
564        .detach();
565
566        let inspector = fuchsia_inspect::Inspector::default();
567        let (pressure_provider, pressure_request_stream) =
568            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
569        let mut digest_service = std::pin::pin!(digest_service(
570            Config {
571                capture_on_pressure_change: true,
572                imminent_oom_capture_delay_s: 10,
573                critical_capture_delay_s: 10,
574                warning_capture_delay_s: 10,
575                normal_capture_delay_s: 10,
576            },
577            get_attribution_data_provider(),
578            Arc::new(FakeStallProvider {}),
579            stats_provider,
580            pressure_provider,
581            Default::default(),
582            inspector.root().create_child("logger"),
583        )?);
584        // Expects digst_service to register a watcher, answers with
585        // an initial pressure level, then returns the watcher for
586        // further signaling. Panics if this whole transaction is not
587        // immediately ready.
588        let Poll::Ready(watcher) = exec
589            .run_until_stalled(
590                &mut pressure_request_stream
591                    .then(|request| async {
592                        let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
593                            request.expect("digest_service failed to register a watcher");
594                        let watcher = watcher.into_proxy();
595                        watcher.on_level_changed(fpressure::Level::Normal).await.expect(
596                            "digest_service failed to acknowledge the initial pressure level",
597                        );
598                        watcher
599                    })
600                    .boxed()
601                    .into_future(),
602            )
603            .map(|(watcher, _)| {
604                watcher.ok_or_else(|| anyhow::Error::msg("failed to register watcher"))
605            })?
606        else {
607            panic!("digest_service failed to register a watcher");
608        };
609        // Send a pressure signal, to trigger a capture.
610        assert!(exec
611            .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Warning))?
612            .is_ready());
613        // Ensure that digest_service has an opportunity to react to the pressure signal.
614        let _ = exec.run_until_stalled(&mut digest_service);
615
616        // Fake the passage of time, so that digest_service may do another capture.
617        assert!(exec
618            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
619                exec.now() + Duration::from_secs(10).into()
620            )))
621            .is_ready());
622        // Ensure that digest_service has an opportunity to react to the passage of time.
623        let _ = exec.run_until_stalled(&mut digest_service)?;
624
625        // This should resolve immediately because the inspect hierarchy has been populated by now.
626        let Poll::Ready(output) = exec
627            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
628            .map(|r| r.expect("got hierarchy"))
629        else {
630            panic!("Couldn't retrieve inspect output");
631        };
632
633        assert_data_tree!(@executor exec, output, root: {
634            logger: {
635                buckets: vec![
636                    "Undigested",
637                    "Orphaned",
638                    "Kernel",
639                    "Free",
640                    "[Addl]PagerTotal",
641                    "[Addl]PagerNewest",
642                    "[Addl]PagerOldest",
643                    "[Addl]DiscardableLocked",
644                    "[Addl]DiscardableUnlocked",
645                    "[Addl]ZramCompressedBytes",
646                ],
647                measurements: {
648                    // Corresponds to the capture on pressure change
649                    "0": {
650                        timestamp: NonZeroIntProperty,
651                        bucket_sizes: vec![
652                            1024u64, // Undigested: matches the single unmatched VMO
653                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
654                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
655                            2u64,    // Free
656                            14u64,   // [Addl]PagerTotal
657                            15u64,   // [Addl]PagerNewest
658                            16u64,   // [Addl]PagerOldest
659                            18u64,   // [Addl]DiscardableLocked
660                            19u64,   // [Addl]DiscardableUnlocked
661                            21u64,   // [Addl]ZramCompressedBytes
662                        ],
663                        stalls: {
664                            some_ms: 10u64,
665                            full_ms: 20u64,
666                        },
667                    },
668                    // Corresponds to the capture after the passage of time
669                    "1": {
670                        timestamp: NonZeroIntProperty,
671                        bucket_sizes: vec![
672                            1024u64, // Undigested: matches the single unmatched VMO
673                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
674                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
675                            2u64,    // Free
676                            14u64,   // [Addl]PagerTotal
677                            15u64,   // [Addl]PagerNewest
678                            16u64,   // [Addl]PagerOldest
679                            18u64,   // [Addl]DiscardableLocked
680                            19u64,   // [Addl]DiscardableUnlocked
681                            21u64,   // [Addl]ZramCompressedBytes
682                        ],
683                        stalls: {
684                            some_ms: 10u64,
685                            full_ms: 20u64,
686                        },
687                    },
688                },
689            },
690        });
691        Ok(())
692    }
693
694    #[test]
695    fn test_digest_service_wait() -> anyhow::Result<()> {
696        let mut exec = fasync::TestExecutor::new_with_fake_time();
697        let (stats_provider, stats_request_stream) =
698            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
699
700        fasync::Task::spawn(async move {
701            serve_kernel_stats(stats_request_stream).await.unwrap();
702        })
703        .detach();
704        let (pressure_provider, pressure_request_stream) =
705            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
706        let inspector = fuchsia_inspect::Inspector::default();
707        let mut digest_service = std::pin::pin!(digest_service(
708            Config {
709                capture_on_pressure_change: false,
710                imminent_oom_capture_delay_s: 10,
711                critical_capture_delay_s: 10,
712                warning_capture_delay_s: 10,
713                normal_capture_delay_s: 10,
714            },
715            get_attribution_data_provider(),
716            Arc::new(FakeStallProvider {}),
717            stats_provider,
718            pressure_provider,
719            Default::default(),
720            inspector.root().create_child("logger"),
721        )?);
722        // digest_service registers a watcher; make sure we answer.  Also, make sure not to drop the
723        // proxy nor the pressure stream; early termination would get reported to digest_service,
724        // which then prematurely interrupts it, before the timers have a chance to run.
725        let Poll::Ready((_watcher, _pressure_stream)) = exec
726            .run_until_stalled(
727                &mut std::pin::pin!(pressure_request_stream.then(|request| async {
728                    let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
729                        request.map_err(anyhow::Error::from)?;
730                    let watcher_proxy = watcher.into_proxy();
731                    let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
732                    Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
733                }))
734                .into_future(),
735            )
736            .map(|(watcher, pressure_stream)| {
737                (
738                    watcher.ok_or_else(|| {
739                        anyhow::Error::msg("Pressure stream unexpectedly exhausted")
740                    }),
741                    pressure_stream,
742                )
743            })
744        else {
745            panic!("Failed to register the watcher");
746        };
747
748        // Give digest_service the opportunity to setup its timers.
749        let _ = exec.run_until_stalled(&mut digest_service)?;
750        // Fake the passage of time, so that digest_service may do another capture.
751        assert!(exec
752            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
753                exec.now() + Duration::from_secs(15).into()
754            )))
755            .is_ready());
756        // Ensure that digest_service has an opportunity to react to the passage of time.
757        assert!(exec.run_until_stalled(&mut digest_service).is_pending());
758        // This should resolve immediately because the inspect hierarchy has been populated by now.
759        let Poll::Ready(output) = exec
760            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
761            .map(|r| r.expect("got hierarchy"))
762        else {
763            panic!("Couldn't retrieve inspect output");
764        };
765
766        assert_data_tree!(@executor exec, output, root: {
767            logger: {
768                buckets: vec![
769                    "Undigested",
770                    "Orphaned",
771                    "Kernel",
772                    "Free",
773                    "[Addl]PagerTotal",
774                    "[Addl]PagerNewest",
775                    "[Addl]PagerOldest",
776                    "[Addl]DiscardableLocked",
777                    "[Addl]DiscardableUnlocked",
778                    "[Addl]ZramCompressedBytes",
779                ],
780                measurements: {
781                    // Corresponds to the capture after the passage of time
782                    "0": {
783                        timestamp: NonZeroIntProperty,
784                        bucket_sizes: vec![
785                            1024u64, // Undigested: matches the single unmatched VMO
786                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
787                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
788                            2u64,    // Free
789                            14u64,   // [Addl]PagerTotal
790                            15u64,   // [Addl]PagerNewest
791                            16u64,   // [Addl]PagerOldest
792                            18u64,   // [Addl]DiscardableLocked
793                            19u64,   // [Addl]DiscardableUnlocked
794                            21u64,   // [Addl]ZramCompressedBytes
795                        ],
796                        stalls: {
797                            some_ms: 10u64,
798                            full_ms: 20u64,
799                        },
800                    },
801            },
802            },
803        });
804        Ok(())
805    }
806
807    #[test]
808    fn test_digest_service_new_pressure_does_not_postpone() -> anyhow::Result<()> {
809        // See https://fxbug.dev/417722087 for context.
810        let mut exec = fasync::TestExecutor::new_with_fake_time();
811        let (stats_provider, stats_request_stream) =
812            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
813
814        fasync::Task::spawn(async move {
815            serve_kernel_stats(stats_request_stream).await.unwrap();
816        })
817        .detach();
818        let (pressure_provider, pressure_request_stream) =
819            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
820        let inspector = fuchsia_inspect::Inspector::default();
821        let mut digest_service = std::pin::pin!(digest_service(
822            Config {
823                capture_on_pressure_change: false,
824                imminent_oom_capture_delay_s: 10,
825                critical_capture_delay_s: 10,
826                warning_capture_delay_s: 100,
827                normal_capture_delay_s: 100,
828            },
829            get_attribution_data_provider(),
830            Arc::new(FakeStallProvider {}),
831            stats_provider,
832            pressure_provider,
833            Default::default(),
834            inspector.root().create_child("logger"),
835        )?);
836        // digest_service registers a watcher; make sure we answer.  Also, make sure not to drop the
837        // proxy nor the pressure stream; early termination would get reported to digest_service,
838        // which then prematurely interrupts it, before the timers have a chance to run.
839        let Poll::Ready((watcher, _pressure_stream)) = exec
840            .run_until_stalled(
841                &mut std::pin::pin!(pressure_request_stream.then(|request| async {
842                    let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
843                        request.map_err(anyhow::Error::from)?;
844                    let watcher_proxy = watcher.into_proxy();
845                    watcher_proxy.on_level_changed(fpressure::Level::Critical).await?;
846                    Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
847                }))
848                .into_future(),
849            )
850            .map(|(watcher, pressure_stream)| {
851                (
852                    watcher.ok_or_else(|| {
853                        anyhow::Error::msg("Pressure stream unexpectedly exhausted")
854                    }),
855                    pressure_stream,
856                )
857            })
858        else {
859            panic!("Failed to register the watcher");
860        };
861
862        // Keep the watcher alive; otherwise the pressure stream would get dropped, which would
863        // cause digest_service to early error out.
864        let watcher = watcher??;
865        // Give digest_service the opportunity to setup its timers.
866        assert!(exec.run_until_stalled(&mut digest_service)?.is_pending());
867        // Fake a pressure change, so that the frequency of captures drops down.
868        assert!(exec
869            .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Normal))?
870            .is_ready());
871        assert!(exec.run_until_stalled(&mut digest_service)?.is_pending());
872        // Fake the passage of time, so that digest_service may do a capture; wait long enough that
873        // the first two captures at Critical frequency would have already happened, but short
874        // enough that no capture happens at the Normal frequency.
875        assert!(exec
876            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
877                exec.now() + Duration::from_secs(25).into()
878            )))
879            .is_ready());
880        // Ensure that digest_service has an opportunity to react to the passage of time.
881        assert!(exec.run_until_stalled(&mut digest_service)?.is_pending());
882        // This should resolve immediately because the inspect hierarchy has been populated by now.
883        let Poll::Ready(output) = exec
884            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
885            .map(|r| r.expect("got hierarchy"))
886        else {
887            panic!("Couldn't retrieve inspect output");
888        };
889
890        assert_data_tree!(@executor exec, output, root: {
891            logger: {
892                buckets: vec![
893                    "Undigested",
894                    "Orphaned",
895                    "Kernel",
896                    "Free",
897                    "[Addl]PagerTotal",
898                    "[Addl]PagerNewest",
899                    "[Addl]PagerOldest",
900                    "[Addl]DiscardableLocked",
901                    "[Addl]DiscardableUnlocked",
902                    "[Addl]ZramCompressedBytes",
903                ],
904                measurements: {
905                    // Corresponds to the capture after the passage of time
906                    "0": {
907                        timestamp: NonZeroIntProperty,
908                        bucket_sizes: vec![
909                            1024u64, // Undigested: matches the single unmatched VMO
910                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
911                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
912                            2u64,    // Free
913                            14u64,   // [Addl]PagerTotal
914                            15u64,   // [Addl]PagerNewest
915                            16u64,   // [Addl]PagerOldest
916                            18u64,   // [Addl]DiscardableLocked
917                            19u64,   // [Addl]DiscardableUnlocked
918                            21u64,   // [Addl]ZramCompressedBytes
919                        ],
920                        stalls: {
921                            some_ms: 10u64,
922                            full_ms: 20u64,
923                        },
924                    },
925            },
926            },
927        });
928        Ok(())
929    }
930
931    #[test]
932    fn test_digest_service_no_capture_on_pressure_change() -> anyhow::Result<()> {
933        let mut exec = fasync::TestExecutor::new();
934        let (stats_provider, stats_request_stream) =
935            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
936
937        fasync::Task::spawn(async move {
938            serve_kernel_stats(stats_request_stream).await.unwrap();
939        })
940        .detach();
941
942        let inspector = fuchsia_inspect::Inspector::default();
943        let (pressure_provider, pressure_request_stream) =
944            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
945        let mut serve_pressure_stream = pressure_request_stream
946            .then(|request| async {
947                let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
948                    request.map_err(anyhow::Error::from)?;
949                let watcher_proxy = watcher.into_proxy();
950                let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
951                Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
952            })
953            .boxed();
954        let mut digest_service = std::pin::pin!(digest_service(
955            Config {
956                capture_on_pressure_change: false,
957                imminent_oom_capture_delay_s: 10,
958                critical_capture_delay_s: 10,
959                warning_capture_delay_s: 10,
960                normal_capture_delay_s: 10,
961            },
962            get_attribution_data_provider(),
963            Arc::new(FakeStallProvider {}),
964            stats_provider,
965            pressure_provider,
966            Default::default(),
967            inspector.root().create_child("logger"),
968        )?);
969        let watcher =
970            exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
971        let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
972        let _ = exec.run_until_stalled(&mut digest_service);
973        let output = exec
974            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
975            .expect("got hierarchy");
976
977        assert_data_tree!(@executor exec, output, root: {
978            logger: {
979                measurements: {},
980            },
981        });
982        Ok(())
983    }
984
985    #[test]
986    fn test_digest_service_capture_on_pressure_change() -> anyhow::Result<()> {
987        let mut exec = fasync::TestExecutor::new();
988        let (stats_provider, stats_request_stream) =
989            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
990
991        fasync::Task::spawn(async move {
992            serve_kernel_stats(stats_request_stream).await.unwrap();
993        })
994        .detach();
995
996        let inspector = fuchsia_inspect::Inspector::default();
997        let (pressure_provider, pressure_request_stream) =
998            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
999        let mut serve_pressure_stream = pressure_request_stream
1000            .then(|request| async {
1001                let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
1002                    request.map_err(anyhow::Error::from)?;
1003                let watcher_proxy = watcher.into_proxy();
1004                let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
1005                Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
1006            })
1007            .boxed();
1008        let mut digest_service = std::pin::pin!(digest_service(
1009            Config {
1010                capture_on_pressure_change: true,
1011                imminent_oom_capture_delay_s: 10,
1012                critical_capture_delay_s: 10,
1013                warning_capture_delay_s: 10,
1014                normal_capture_delay_s: 10,
1015            },
1016            get_attribution_data_provider(),
1017            Arc::new(FakeStallProvider {}),
1018            stats_provider,
1019            pressure_provider,
1020            Default::default(),
1021            inspector.root().create_child("logger"),
1022        )?);
1023        let watcher =
1024            exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
1025        let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
1026        let _ = exec.run_until_stalled(&mut digest_service);
1027        let output = exec
1028            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
1029            .expect("got hierarchy");
1030
1031        assert_data_tree!(@executor exec, output, root: {
1032            logger: {
1033                buckets: vec![
1034                    "Undigested",
1035                    "Orphaned",
1036                    "Kernel",
1037                    "Free",
1038                    "[Addl]PagerTotal",
1039                    "[Addl]PagerNewest",
1040                    "[Addl]PagerOldest",
1041                    "[Addl]DiscardableLocked",
1042                    "[Addl]DiscardableUnlocked",
1043                    "[Addl]ZramCompressedBytes",
1044                ],
1045                measurements: {
1046                    "0": {
1047                        timestamp: NonZeroIntProperty,
1048                        bucket_sizes: vec![
1049                            1024u64, // Undigested: matches the single unmatched VMO
1050                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
1051                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
1052                            2u64,    // Free
1053                            14u64,   // [Addl]PagerTotal
1054                            15u64,   // [Addl]PagerNewest
1055                            16u64,   // [Addl]PagerOldest
1056                            18u64,   // [Addl]DiscardableLocked
1057                            19u64,   // [Addl]DiscardableUnlocked
1058                            21u64,   // [Addl]ZramCompressedBytes
1059                        ],
1060                        stalls: {
1061                            some_ms: 10u64,
1062                            full_ms: 20u64,
1063                        },
1064                    },
1065                },
1066            },
1067        });
1068        Ok(())
1069    }
1070}