1use std::sync::Arc;
6
7use cobalt_client::traits::AsEventCode;
8use futures::StreamExt;
9use memory_metrics_registry::cobalt_registry;
10use stalls::StallProvider;
11use zx::MonotonicInstant;
12use {anyhow, fidl_fuchsia_metrics as fmetrics};
13
14use crate::error_from_metrics_error;
15
16pub async fn collect_stalls_forever(
18 stalls_provider: Arc<impl StallProvider + 'static>,
19 metric_event_logger: fmetrics::MetricEventLoggerProxy,
20) -> Result<(), anyhow::Error> {
21 let mut last_stall = zx::MemoryStall::default();
22
23 fuchsia_async::Timer::new(MonotonicInstant::ZERO + zx::Duration::from_hours(1)).await;
26
27 let mut timer = fuchsia_async::Interval::new(zx::Duration::from_hours(1));
28 loop {
29 let new_stall = stalls_provider.get_stall_info()?;
30
31 let stall_some_event = fmetrics::MetricEvent {
32 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
33 payload: fmetrics::MetricEventPayload::IntegerValue(
34 new_stall.stall_time_some - last_stall.stall_time_some,
35 ),
36 event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()],
37 };
38 let stall_full_event = fmetrics::MetricEvent {
39 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
40 payload: fmetrics::MetricEventPayload::IntegerValue(
41 new_stall.stall_time_full - last_stall.stall_time_full,
42 ),
43 event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()],
44 };
45
46 last_stall = new_stall;
47
48 let events = vec![stall_some_event, stall_full_event];
49 metric_event_logger.log_metric_events(&events).await?.map_err(error_from_metrics_error)?;
50 timer.next().await;
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57 use anyhow::anyhow;
58 use fuchsia_async as fasync;
59 use futures::task::Poll;
60 use std::sync::atomic::{AtomicU32, Ordering};
61 use zx::Duration;
62
63 fn get_stall_provider() -> Arc<impl StallProvider + 'static> {
64 struct FakeStallProvider {
65 count: AtomicU32,
66 }
67
68 impl Default for FakeStallProvider {
69 fn default() -> Self {
70 Self { count: AtomicU32::new(1) }
71 }
72 }
73
74 impl StallProvider for FakeStallProvider {
75 fn get_stall_info(&self) -> Result<zx::MemoryStall, anyhow::Error> {
76 let count = self.count.fetch_add(1, Ordering::Relaxed);
77 let memory_stall = zx::MemoryStall {
78 stall_time_some: (count * 10) as i64,
79 stall_time_full: (count * 20) as i64,
80 };
81 Ok(memory_stall)
82 }
83 }
84
85 Arc::new(FakeStallProvider::default())
86 }
87
88 #[test]
89 fn test_periodic_stalls_collection() -> anyhow::Result<()> {
90 let mut exec = fasync::TestExecutor::new_with_fake_time();
92
93 let data_provider = get_stall_provider();
95
96 let (metric_event_logger, metric_event_request_stream) =
98 fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
99
100 exec.set_fake_time((zx::MonotonicInstant::ZERO + Duration::from_seconds(3 * 60)).into());
102
103 let mut stalls_collector =
105 fuchsia_async::Task::spawn(collect_stalls_forever(data_provider, metric_event_logger));
106
107 assert!(
109 exec.run_until_stalled(&mut stalls_collector).is_pending(),
110 "Stalls collection service returned unexpectedly early"
111 );
112
113 let mut metric_event_request_future = metric_event_request_stream.into_future();
115 assert!(
116 exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
117 "Stalls collection service returned unexpectedly early"
118 );
119
120 assert!(
122 exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
123 exec.now() + Duration::from_seconds(60 * 60 + 10)
124 )))
125 .is_ready(),
126 "Failed to advance time"
127 );
128
129 let Poll::Ready((event, metric_event_request_stream)) =
131 exec.run_until_stalled(&mut metric_event_request_future)
132 else {
133 panic!("Failed to receive metrics")
134 };
135 let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
136 match event {
137 fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
138 assert_eq!(events.len(), 2);
139 assert_eq!(
141 events[0],
142 fmetrics::MetricEvent {
143 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
144 event_codes: vec![
145 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
146 ],
147 payload: fmetrics::MetricEventPayload::IntegerValue(10)
148 }
149 );
150 assert_eq!(
151 events[1],
152 fmetrics::MetricEvent {
153 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
154 event_codes: vec![
155 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
156 ],
157 payload: fmetrics::MetricEventPayload::IntegerValue(20)
158 }
159 );
160 responder.send(Ok(()))?;
161 }
162 _ => panic!("Unexpected metric event"),
163 }
164
165 let mut metric_event_request_future = metric_event_request_stream.into_future();
166
167 assert!(exec.run_until_stalled(&mut metric_event_request_future).is_pending());
168
169 assert!(
171 exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
172 (zx::MonotonicInstant::ZERO + Duration::from_seconds(60 * 60 * 2 + 10)).into()
173 )))
174 .is_ready(),
175 "Failed to advance time"
176 );
177
178 let Poll::Ready((event, metric_event_request_stream)) =
180 exec.run_until_stalled(&mut metric_event_request_future)
181 else {
182 panic!("Failed to receive metrics")
183 };
184 let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
185 match event {
186 fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
187 assert_eq!(events.len(), 2);
188 assert_eq!(
190 events[0],
191 fmetrics::MetricEvent {
192 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
193 event_codes: vec![
194 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
195 ],
196 payload: fmetrics::MetricEventPayload::IntegerValue(10)
197 }
198 );
199 assert_eq!(
200 events[1],
201 fmetrics::MetricEvent {
202 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
203 event_codes: vec![
204 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
205 ],
206 payload: fmetrics::MetricEventPayload::IntegerValue(20)
207 }
208 );
209 responder.send(Ok(()))?;
210 }
211 _ => panic!("Unexpected metric event"),
212 }
213
214 assert!(exec
215 .run_until_stalled(&mut metric_event_request_stream.into_future())
216 .is_pending());
217
218 Ok(())
219 }
220}