cobalt/
stalls.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::Arc;
6
7use cobalt_client::traits::AsEventCode;
8use futures::StreamExt;
9use memory_metrics_registry::cobalt_registry;
10use stalls::StallProvider;
11use zx::MonotonicInstant;
12use {anyhow, fidl_fuchsia_metrics as fmetrics};
13
14use crate::error_from_metrics_error;
15
16/// Collect and publish to Cobalt memory stall increase rate, every hour.
17pub async fn collect_stalls_forever(
18    stalls_provider: Arc<impl StallProvider + 'static>,
19    metric_event_logger: fmetrics::MetricEventLoggerProxy,
20) -> Result<(), anyhow::Error> {
21    let mut last_stall = zx::MemoryStall::default();
22
23    // Wait for one hour after device start to get the first stall value. We don't use the one-hour
24    // timer as we may have been started later than at boot exactly.
25    fuchsia_async::Timer::new(MonotonicInstant::ZERO + zx::Duration::from_hours(1)).await;
26
27    let mut timer = fuchsia_async::Interval::new(zx::Duration::from_hours(1));
28    loop {
29        let new_stall = stalls_provider.get_stall_info()?;
30
31        let stall_some_event = fmetrics::MetricEvent {
32            metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
33            payload: fmetrics::MetricEventPayload::IntegerValue(
34                new_stall.stall_time_some - last_stall.stall_time_some,
35            ),
36            event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()],
37        };
38        let stall_full_event = fmetrics::MetricEvent {
39            metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
40            payload: fmetrics::MetricEventPayload::IntegerValue(
41                new_stall.stall_time_full - last_stall.stall_time_full,
42            ),
43            event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()],
44        };
45
46        last_stall = new_stall;
47
48        let events = vec![stall_some_event, stall_full_event];
49        metric_event_logger.log_metric_events(&events).await?.map_err(error_from_metrics_error)?;
50        timer.next().await;
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use anyhow::anyhow;
58    use fuchsia_async as fasync;
59    use futures::task::Poll;
60    use std::sync::atomic::{AtomicU32, Ordering};
61    use zx::Duration;
62
63    fn get_stall_provider() -> Arc<impl StallProvider + 'static> {
64        struct FakeStallProvider {
65            count: AtomicU32,
66        }
67
68        impl Default for FakeStallProvider {
69            fn default() -> Self {
70                Self { count: AtomicU32::new(1) }
71            }
72        }
73
74        impl StallProvider for FakeStallProvider {
75            fn get_stall_info(&self) -> Result<zx::MemoryStall, anyhow::Error> {
76                let count = self.count.fetch_add(1, Ordering::Relaxed);
77                let memory_stall = zx::MemoryStall {
78                    stall_time_some: (count * 10) as i64,
79                    stall_time_full: (count * 20) as i64,
80                };
81                Ok(memory_stall)
82            }
83        }
84
85        Arc::new(FakeStallProvider::default())
86    }
87
88    #[test]
89    fn test_periodic_stalls_collection() -> anyhow::Result<()> {
90        // Setup executor.
91        let mut exec = fasync::TestExecutor::new_with_fake_time();
92
93        // Setup mock data providers.
94        let data_provider = get_stall_provider();
95
96        // Setup test proxy to observe emitted events from the service.
97        let (metric_event_logger, metric_event_request_stream) =
98            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
99
100        // Set the time to shortly after boot
101        exec.set_fake_time((zx::MonotonicInstant::ZERO + Duration::from_seconds(3 * 60)).into());
102
103        // Service under test.
104        let mut stalls_collector =
105            fuchsia_async::Task::spawn(collect_stalls_forever(data_provider, metric_event_logger));
106
107        // Give the service the opportunity to run.
108        assert!(
109            exec.run_until_stalled(&mut stalls_collector).is_pending(),
110            "Stalls collection service returned unexpectedly early"
111        );
112
113        // Ensure no metrics has been uploaded yet.
114        let mut metric_event_request_future = metric_event_request_stream.into_future();
115        assert!(
116            exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
117            "Stalls collection service returned unexpectedly early"
118        );
119
120        // Fake the passage of time, so that collect_metrics may do a capture.
121        assert!(
122            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
123                exec.now() + Duration::from_seconds(60 * 60 + 10)
124            )))
125            .is_ready(),
126            "Failed to advance time"
127        );
128
129        // Ensure we have one and only one event ready for consumption.
130        let Poll::Ready((event, metric_event_request_stream)) =
131            exec.run_until_stalled(&mut metric_event_request_future)
132        else {
133            panic!("Failed to receive metrics")
134        };
135        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
136        match event {
137            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
138                assert_eq!(events.len(), 2);
139                // Kernel metrics
140                assert_eq!(
141                    events[0],
142                    fmetrics::MetricEvent {
143                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
144                        event_codes: vec![
145                            cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
146                        ],
147                        payload: fmetrics::MetricEventPayload::IntegerValue(10)
148                    }
149                );
150                assert_eq!(
151                    events[1],
152                    fmetrics::MetricEvent {
153                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
154                        event_codes: vec![
155                            cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
156                        ],
157                        payload: fmetrics::MetricEventPayload::IntegerValue(20)
158                    }
159                );
160                responder.send(Ok(()))?;
161            }
162            _ => panic!("Unexpected metric event"),
163        }
164
165        let mut metric_event_request_future = metric_event_request_stream.into_future();
166
167        assert!(exec.run_until_stalled(&mut metric_event_request_future).is_pending());
168
169        // Advance to the next hour
170        assert!(
171            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
172                (zx::MonotonicInstant::ZERO + Duration::from_seconds(60 * 60 * 2 + 10)).into()
173            )))
174            .is_ready(),
175            "Failed to advance time"
176        );
177
178        // Ensure we have one and only one event ready for consumption.
179        let Poll::Ready((event, metric_event_request_stream)) =
180            exec.run_until_stalled(&mut metric_event_request_future)
181        else {
182            panic!("Failed to receive metrics")
183        };
184        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
185        match event {
186            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
187                assert_eq!(events.len(), 2);
188                // Kernel metrics
189                assert_eq!(
190                    events[0],
191                    fmetrics::MetricEvent {
192                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
193                        event_codes: vec![
194                            cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
195                        ],
196                        payload: fmetrics::MetricEventPayload::IntegerValue(10)
197                    }
198                );
199                assert_eq!(
200                    events[1],
201                    fmetrics::MetricEvent {
202                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
203                        event_codes: vec![
204                            cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
205                        ],
206                        payload: fmetrics::MetricEventPayload::IntegerValue(20)
207                    }
208                );
209                responder.send(Ok(()))?;
210            }
211            _ => panic!("Unexpected metric event"),
212        }
213
214        assert!(exec
215            .run_until_stalled(&mut metric_event_request_stream.into_future())
216            .is_pending());
217
218        Ok(())
219    }
220}