1use std::sync::Arc;
6
7use cobalt_client::traits::AsEventCode;
8use futures::StreamExt;
9use memory_metrics_registry::cobalt_registry;
10use stalls::{MemoryStallMetrics, StallProvider};
11use zx::MonotonicInstant;
12use {anyhow, fidl_fuchsia_metrics as fmetrics};
13
14use crate::error_from_metrics_error;
15
16pub async fn collect_stalls_forever(
18 stalls_provider: Arc<impl StallProvider + 'static>,
19 metric_event_logger: fmetrics::MetricEventLoggerProxy,
20) -> Result<(), anyhow::Error> {
21 let mut last_stall = MemoryStallMetrics::default();
22
23 fuchsia_async::Timer::new(MonotonicInstant::ZERO + zx::Duration::from_hours(1)).await;
26
27 let mut timer = fuchsia_async::Interval::new(zx::Duration::from_hours(1));
28 loop {
29 let new_stall = stalls_provider.get_stall_info()?;
30
31 let stall_some_event = fmetrics::MetricEvent {
33 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
34 payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
35 (new_stall.some - last_stall.some).as_millis(),
36 )?),
37 event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()],
38 };
39 let stall_full_event = fmetrics::MetricEvent {
40 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
41 payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
42 (new_stall.full - last_stall.full).as_millis(),
43 )?),
44 event_codes: vec![cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()],
45 };
46
47 last_stall = new_stall;
48
49 let events = vec![stall_some_event, stall_full_event];
50 metric_event_logger.log_metric_events(&events).await?.map_err(error_from_metrics_error)?;
51 timer.next().await;
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use anyhow::anyhow;
59 use fuchsia_async as fasync;
60 use futures::task::Poll;
61 use std::sync::atomic::{AtomicU32, Ordering};
62 use std::time::Duration;
63
64 fn get_stall_provider() -> Arc<impl StallProvider + 'static> {
65 struct FakeStallProvider {
66 count: AtomicU32,
67 }
68
69 impl Default for FakeStallProvider {
70 fn default() -> Self {
71 Self { count: AtomicU32::new(1) }
72 }
73 }
74
75 impl StallProvider for FakeStallProvider {
76 fn get_stall_info(&self) -> Result<MemoryStallMetrics, anyhow::Error> {
77 let count = self.count.fetch_add(1, Ordering::Relaxed);
78 let memory_stall = MemoryStallMetrics {
79 some: Duration::from_millis((count * 10).into()),
80 full: Duration::from_millis((count * 20).into()),
81 };
82 Ok(memory_stall)
83 }
84 }
85
86 Arc::new(FakeStallProvider::default())
87 }
88
89 #[test]
90 fn test_periodic_stalls_collection() -> anyhow::Result<()> {
91 let mut exec = fasync::TestExecutor::new_with_fake_time();
93
94 let data_provider = get_stall_provider();
96
97 let (metric_event_logger, metric_event_request_stream) =
99 fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
100
101 exec.set_fake_time(
103 (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(3 * 60)).into(),
104 );
105
106 let mut stalls_collector =
108 fuchsia_async::Task::spawn(collect_stalls_forever(data_provider, metric_event_logger));
109
110 assert!(
112 exec.run_until_stalled(&mut stalls_collector).is_pending(),
113 "Stalls collection service returned unexpectedly early"
114 );
115
116 let mut metric_event_request_future = metric_event_request_stream.into_future();
118 assert!(
119 exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
120 "Stalls collection service returned unexpectedly early"
121 );
122
123 assert!(
125 exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
126 exec.now() + zx::Duration::from_seconds(60 * 60 + 10)
127 )))
128 .is_ready(),
129 "Failed to advance time"
130 );
131
132 let Poll::Ready((event, metric_event_request_stream)) =
134 exec.run_until_stalled(&mut metric_event_request_future)
135 else {
136 panic!("Failed to receive metrics")
137 };
138 let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
139 match event {
140 fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
141 assert_eq!(events.len(), 2);
142 assert_eq!(
144 events[0],
145 fmetrics::MetricEvent {
146 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
147 event_codes: vec![
148 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
149 ],
150 payload: fmetrics::MetricEventPayload::IntegerValue(10)
151 }
152 );
153 assert_eq!(
154 events[1],
155 fmetrics::MetricEvent {
156 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
157 event_codes: vec![
158 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
159 ],
160 payload: fmetrics::MetricEventPayload::IntegerValue(20)
161 }
162 );
163 responder.send(Ok(()))?;
164 }
165 _ => panic!("Unexpected metric event"),
166 }
167
168 let mut metric_event_request_future = metric_event_request_stream.into_future();
169
170 assert!(exec.run_until_stalled(&mut metric_event_request_future).is_pending());
171
172 assert!(
174 exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
175 (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(60 * 60 * 2 + 10)).into()
176 )))
177 .is_ready(),
178 "Failed to advance time"
179 );
180
181 let Poll::Ready((event, metric_event_request_stream)) =
183 exec.run_until_stalled(&mut metric_event_request_future)
184 else {
185 panic!("Failed to receive metrics")
186 };
187 let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
188 match event {
189 fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
190 assert_eq!(events.len(), 2);
191 assert_eq!(
193 events[0],
194 fmetrics::MetricEvent {
195 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
196 event_codes: vec![
197 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
198 ],
199 payload: fmetrics::MetricEventPayload::IntegerValue(10)
200 }
201 );
202 assert_eq!(
203 events[1],
204 fmetrics::MetricEvent {
205 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
206 event_codes: vec![
207 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
208 ],
209 payload: fmetrics::MetricEventPayload::IntegerValue(20)
210 }
211 );
212 responder.send(Ok(()))?;
213 }
214 _ => panic!("Unexpected metric event"),
215 }
216
217 assert!(exec
218 .run_until_stalled(&mut metric_event_request_stream.into_future())
219 .is_pending());
220
221 Ok(())
222 }
223}