cobalt/
stalls.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::Arc;
6
7use cobalt_client::traits::AsEventCode;
8use futures::StreamExt;
9use memory_metrics_registry::cobalt_registry;
10use stalls::{MemoryStallMetrics, StallProvider};
11use zx::MonotonicInstant;
12use {anyhow, fidl_fuchsia_metrics as fmetrics};
13
14use crate::error_from_metrics_error;
15
16/// Collect and publish to Cobalt memory stall increase rate, every hour.
17pub async fn collect_stalls_forever(
18    stalls_provider: Arc<impl StallProvider + 'static>,
19    metric_event_logger: fmetrics::MetricEventLoggerProxy,
20) -> Result<(), anyhow::Error> {
21    let mut last_stall = MemoryStallMetrics::default();
22
23    // Wait for one hour after device start to get the first stall value. We don't use the one-hour
24    // timer as we may have been started later than at boot exactly.
25    fuchsia_async::Timer::new(MonotonicInstant::ZERO + zx::Duration::from_hours(1)).await;
26
27    let mut timer = fuchsia_async::Interval::new(zx::Duration::from_hours(1));
28    loop {
29        let new_stall = stalls_provider.get_stall_info()?;
30
31        // The Cobalt metrics for stalls expect milliseconds, as defined in the Cobalt registry.
32        let stall_some_event = fmetrics::MetricEvent {
33            metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
34            payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
35                (new_stall.some - last_stall.some).as_millis(),
36            )?),
37            event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()],
38        };
39        let stall_full_event = fmetrics::MetricEvent {
40            metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
41            payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
42                (new_stall.full - last_stall.full).as_millis(),
43            )?),
44            event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()],
45        };
46
47        last_stall = new_stall;
48
49        let events = vec![stall_some_event, stall_full_event];
50        metric_event_logger.log_metric_events(&events).await?.map_err(error_from_metrics_error)?;
51        timer.next().await;
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use anyhow::anyhow;
59    use fuchsia_async as fasync;
60    use futures::task::Poll;
61    use std::sync::atomic::{AtomicU32, Ordering};
62    use std::time::Duration;
63
64    fn get_stall_provider() -> Arc<impl StallProvider + 'static> {
65        struct FakeStallProvider {
66            count: AtomicU32,
67        }
68
69        impl Default for FakeStallProvider {
70            fn default() -> Self {
71                Self { count: AtomicU32::new(1) }
72            }
73        }
74
75        impl StallProvider for FakeStallProvider {
76            fn get_stall_info(&self) -> Result<MemoryStallMetrics, anyhow::Error> {
77                let count = self.count.fetch_add(1, Ordering::Relaxed);
78                let memory_stall = MemoryStallMetrics {
79                    some: Duration::from_millis((count * 10).into()),
80                    full: Duration::from_millis((count * 20).into()),
81                };
82                Ok(memory_stall)
83            }
84        }
85
86        Arc::new(FakeStallProvider::default())
87    }
88
89    #[test]
90    fn test_periodic_stalls_collection() -> anyhow::Result<()> {
91        // Setup executor.
92        let mut exec = fasync::TestExecutor::new_with_fake_time();
93
94        // Setup mock data providers.
95        let data_provider = get_stall_provider();
96
97        // Setup test proxy to observe emitted events from the service.
98        let (metric_event_logger, metric_event_request_stream) =
99            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
100
101        // Set the time to shortly after boot
102        exec.set_fake_time(
103            (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(3 * 60)).into(),
104        );
105
106        // Service under test.
107        let mut stalls_collector =
108            fuchsia_async::Task::spawn(collect_stalls_forever(data_provider, metric_event_logger));
109
110        // Give the service the opportunity to run.
111        assert!(
112            exec.run_until_stalled(&mut stalls_collector).is_pending(),
113            "Stalls collection service returned unexpectedly early"
114        );
115
116        // Ensure no metrics has been uploaded yet.
117        let mut metric_event_request_future = metric_event_request_stream.into_future();
118        assert!(
119            exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
120            "Stalls collection service returned unexpectedly early"
121        );
122
123        // Fake the passage of time, so that collect_metrics may do a capture.
124        assert!(
125            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
126                exec.now() + zx::Duration::from_seconds(60 * 60 + 10)
127            )))
128            .is_ready(),
129            "Failed to advance time"
130        );
131
132        // Ensure we have one and only one event ready for consumption.
133        let Poll::Ready((event, metric_event_request_stream)) =
134            exec.run_until_stalled(&mut metric_event_request_future)
135        else {
136            panic!("Failed to receive metrics")
137        };
138        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
139        match event {
140            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
141                assert_eq!(events.len(), 2);
142                // Kernel metrics
143                assert_eq!(
144                    events[0],
145                    fmetrics::MetricEvent {
146                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
147                        event_codes: vec![
148                            cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
149                        ],
150                        payload: fmetrics::MetricEventPayload::IntegerValue(10)
151                    }
152                );
153                assert_eq!(
154                    events[1],
155                    fmetrics::MetricEvent {
156                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
157                        event_codes: vec![
158                            cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
159                        ],
160                        payload: fmetrics::MetricEventPayload::IntegerValue(20)
161                    }
162                );
163                responder.send(Ok(()))?;
164            }
165            _ => panic!("Unexpected metric event"),
166        }
167
168        let mut metric_event_request_future = metric_event_request_stream.into_future();
169
170        assert!(exec.run_until_stalled(&mut metric_event_request_future).is_pending());
171
172        // Advance to the next hour
173        assert!(
174            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
175                (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(60 * 60 * 2 + 10)).into()
176            )))
177            .is_ready(),
178            "Failed to advance time"
179        );
180
181        // Ensure we have one and only one event ready for consumption.
182        let Poll::Ready((event, metric_event_request_stream)) =
183            exec.run_until_stalled(&mut metric_event_request_future)
184        else {
185            panic!("Failed to receive metrics")
186        };
187        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
188        match event {
189            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
190                assert_eq!(events.len(), 2);
191                // Kernel metrics
192                assert_eq!(
193                    events[0],
194                    fmetrics::MetricEvent {
195                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
196                        event_codes: vec![
197                            cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
198                        ],
199                        payload: fmetrics::MetricEventPayload::IntegerValue(10)
200                    }
201                );
202                assert_eq!(
203                    events[1],
204                    fmetrics::MetricEvent {
205                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
206                        event_codes: vec![
207                            cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
208                        ],
209                        payload: fmetrics::MetricEventPayload::IntegerValue(20)
210                    }
211                );
212                responder.send(Ok(()))?;
213            }
214            _ => panic!("Unexpected metric event"),
215        }
216
217        assert!(exec
218            .run_until_stalled(&mut metric_event_request_stream.into_future())
219            .is_pending());
220
221        Ok(())
222    }
223}