1use crate::OnInterestChanged;
5use diagnostics_log_encoding::encode::TestRecord;
6use diagnostics_log_types::Severity;
7use fidl::endpoints::Proxy;
8use fidl_fuchsia_logger::{LogSinkProxy, LogSinkSynchronousProxy};
9use std::future::Future;
10use std::sync::{Arc, Mutex};
11
12#[cfg(fuchsia_api_level_less_than = "NEXT")]
13use fidl_fuchsia_diagnostics as fdiagnostics;
14#[cfg(fuchsia_api_level_at_least = "NEXT")]
15use fidl_fuchsia_diagnostics_types as fdiagnostics;
16
17pub(crate) struct InterestFilter {
18 listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync + 'static>>>>,
19}
20
21impl InterestFilter {
22 pub fn new(
25 proxy: LogSinkProxy,
26 interest: fdiagnostics::Interest,
27 wait_for_initial_interest: bool,
28 ) -> (Self, impl Future<Output = ()>) {
29 let default_severity = interest.min_severity.map(Severity::from).unwrap_or(Severity::Info);
30 let (proxy, min_severity) = if wait_for_initial_interest {
31 let sync_proxy = LogSinkSynchronousProxy::new(proxy.into_channel().unwrap().into());
32 let initial_severity = match sync_proxy
33 .wait_for_interest_change(zx::MonotonicInstant::INFINITE)
34 {
35 Ok(Ok(initial_interest)) => {
36 initial_interest.min_severity.map(Severity::from).unwrap_or(default_severity)
37 }
38 _ => default_severity,
39 };
40 (
41 LogSinkProxy::new(fidl::AsyncChannel::from_channel(sync_proxy.into_channel())),
42 initial_severity,
43 )
44 } else {
45 (proxy, default_severity)
46 };
47
48 log::set_max_level(min_severity.into());
49
50 let listener = Arc::new(Mutex::new(None));
51 let filter = Self { listener: listener.clone() };
52 (filter, Self::listen_to_interest_changes(listener, default_severity, proxy))
53 }
54
55 pub fn set_interest_listener<T>(&self, listener: T)
57 where
58 T: OnInterestChanged + Send + Sync + 'static,
59 {
60 let mut listener_guard = self.listener.lock().unwrap();
61 *listener_guard = Some(Box::new(listener));
62 }
63
64 async fn listen_to_interest_changes(
65 listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync>>>>,
66 default_severity: Severity,
67 proxy: LogSinkProxy,
68 ) {
69 while let Ok(Ok(interest)) = proxy.wait_for_interest_change().await {
70 let new_min_severity =
71 interest.min_severity.map(Severity::from).unwrap_or(default_severity);
72 log::set_max_level(new_min_severity.into());
73 let callback_guard = listener.lock().unwrap();
74 if let Some(callback) = &*callback_guard {
75 callback.on_changed(new_min_severity);
76 }
77 }
78 }
79
80 pub fn enabled_for_testing(&self, record: &TestRecord<'_>) -> bool {
81 let min_severity = Severity::try_from(log::max_level()).map(|s| s as u8).unwrap_or(u8::MAX);
82 min_severity <= record.severity
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use fidl::endpoints::create_proxy_and_stream;
90 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest, LogSinkRequestStream};
91 use futures::channel::mpsc;
92 use futures::{StreamExt, TryStreamExt};
93 use log::{debug, error, info, trace, warn};
94
95 struct SeverityTracker {
96 _filter: InterestFilter,
97 severity_counts: Arc<Mutex<SeverityCount>>,
98 }
99
100 impl log::Log for SeverityTracker {
101 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
102 true
103 }
104
105 fn log(&self, record: &log::Record<'_>) {
106 let mut count = self.severity_counts.lock().unwrap();
107 let to_increment = match record.level() {
108 log::Level::Trace => &mut count.trace,
109 log::Level::Debug => &mut count.debug,
110 log::Level::Info => &mut count.info,
111 log::Level::Warn => &mut count.warn,
112 log::Level::Error => &mut count.error,
113 };
114 *to_increment += 1;
115 }
116
117 fn flush(&self) {}
118 }
119
120 #[derive(Debug, Default, Eq, PartialEq)]
121 struct SeverityCount {
122 trace: u64,
123 debug: u64,
124 info: u64,
125 warn: u64,
126 error: u64,
127 }
128
129 struct InterestChangedListener(mpsc::UnboundedSender<()>);
130
131 impl OnInterestChanged for InterestChangedListener {
132 fn on_changed(&self, _: crate::Severity) {
133 self.0.unbounded_send(()).unwrap();
134 }
135 }
136
137 #[fuchsia::test(logging = false)]
138 async fn default_filter_is_info_when_unspecified() {
139 let (proxy, _requests) = create_proxy_and_stream::<LogSinkMarker>();
140 let (filter, _on_changes) =
141 InterestFilter::new(proxy, fdiagnostics::Interest::default(), false);
142 let observed = Arc::new(Mutex::new(SeverityCount::default()));
143 log::set_boxed_logger(Box::new(SeverityTracker {
144 severity_counts: observed.clone(),
145 _filter: filter,
146 }))
147 .unwrap();
148 let mut expected = SeverityCount::default();
149
150 error!("oops");
151 expected.error += 1;
152 assert_eq!(&*observed.lock().unwrap(), &expected);
153
154 warn!("maybe");
155 expected.warn += 1;
156 assert_eq!(&*observed.lock().unwrap(), &expected);
157
158 info!("ok");
159 expected.info += 1;
160 assert_eq!(&*observed.lock().unwrap(), &expected);
161
162 debug!("hint");
163 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
164
165 trace!("spew");
166 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
167 }
168
169 async fn send_interest_change(stream: &mut LogSinkRequestStream, severity: Option<Severity>) {
170 match stream.try_next().await {
171 Ok(Some(LogSinkRequest::WaitForInterestChange { responder })) => {
172 responder
173 .send(Ok(&fdiagnostics::Interest {
174 min_severity: severity.map(fdiagnostics::Severity::from),
175 ..Default::default()
176 }))
177 .expect("send response");
178 }
179 other => panic!("Expected WaitForInterestChange but got {:?}", other),
180 }
181 }
182
183 #[fuchsia::test(logging = false)]
184 async fn default_filter_on_interest_changed() {
185 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
186 let (filter, on_changes) = InterestFilter::new(
187 proxy,
188 fdiagnostics::Interest {
189 min_severity: Some(fdiagnostics::Severity::Warn),
190 ..Default::default()
191 },
192 false,
193 );
194 let (send, mut recv) = mpsc::unbounded();
195 filter.set_interest_listener(InterestChangedListener(send));
196 let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
197 let observed = Arc::new(Mutex::new(SeverityCount::default()));
198 log::set_boxed_logger(Box::new(SeverityTracker {
199 severity_counts: observed.clone(),
200 _filter: filter,
201 }))
202 .expect("set logger");
203
204 send_interest_change(&mut requests, Some(Severity::Info)).await;
207 recv.next().await.unwrap();
208
209 let mut expected = SeverityCount::default();
210 error!("oops");
211 expected.error += 1;
212 assert_eq!(&*observed.lock().unwrap(), &expected);
213
214 warn!("maybe");
215 expected.warn += 1;
216 assert_eq!(&*observed.lock().unwrap(), &expected);
217
218 info!("ok");
219 expected.info += 1;
220 assert_eq!(&*observed.lock().unwrap(), &expected);
221
222 debug!("hint");
223 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
224
225 trace!("spew");
226 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
227
228 send_interest_change(&mut requests, None).await;
230 recv.next().await.unwrap();
231
232 error!("oops");
233 expected.error += 1;
234 assert_eq!(&*observed.lock().unwrap(), &expected);
235
236 warn!("maybe");
237 expected.warn += 1;
238 assert_eq!(&*observed.lock().unwrap(), &expected);
239
240 info!("ok");
241 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
242
243 debug!("hint");
244 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
245
246 trace!("spew");
247 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
248 }
249
250 #[fuchsia::test(logging = false)]
251 async fn wait_for_initial_interest() {
252 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
253 let t = std::thread::spawn(move || {
254 let _executor = fuchsia_async::LocalExecutor::new();
256 let (filter, _on_changes) =
257 InterestFilter::new(proxy, fdiagnostics::Interest::default(), true);
258 filter
259 });
260 if let Some(Ok(request)) = requests.next().await {
261 match request {
262 LogSinkRequest::WaitForInterestChange { responder } => {
263 responder
264 .send(Ok(&fdiagnostics::Interest {
265 min_severity: Some(fdiagnostics::Severity::Trace),
266 ..Default::default()
267 }))
268 .expect("sent initial interest");
269 }
270 other => panic!("Got unexpected: {:?}", other),
271 };
272 }
273 let _filter = t.join().unwrap();
274 assert_eq!(log::max_level(), log::Level::Trace);
275 }
276
277 #[fuchsia::test(logging = false)]
278 async fn log_frontend_tracks_severity() {
279 log::set_max_level(log::LevelFilter::Off);
281
282 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
283 let (filter, on_changes) = InterestFilter::new(
284 proxy,
285 fdiagnostics::Interest {
286 min_severity: Some(fdiagnostics::Severity::Warn),
287 ..Default::default()
288 },
289 false,
290 );
291 assert_eq!(log::max_level(), log::LevelFilter::Warn);
293
294 let (send, mut recv) = mpsc::unbounded();
295 filter.set_interest_listener(InterestChangedListener(send));
296 let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
297
298 send_interest_change(&mut requests, Some(Severity::Trace)).await;
299 recv.next().await.unwrap();
300 assert_eq!(log::max_level(), log::LevelFilter::Trace);
301
302 send_interest_change(&mut requests, Some(Severity::Info)).await;
303 recv.next().await.unwrap();
304 assert_eq!(log::max_level(), log::LevelFilter::Info);
305 }
306}