1use crate::OnInterestChanged;
5use diagnostics_log_encoding::encode::TestRecord;
6use diagnostics_log_types::Severity;
7use fidl::endpoints::ClientEnd;
8use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy, LogSinkSynchronousProxy};
9use std::future::Future;
10use std::sync::{Arc, Mutex};
11
12#[cfg(fuchsia_api_level_less_than = "27")]
13use fidl_fuchsia_diagnostics as fdiagnostics;
14#[cfg(fuchsia_api_level_at_least = "27")]
15use fidl_fuchsia_diagnostics_types as fdiagnostics;
16
17pub(crate) struct InterestFilter {
18 listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync + 'static>>>>,
19}
20
21impl InterestFilter {
22 pub fn new(
25 client: ClientEnd<LogSinkMarker>,
26 interest: fdiagnostics::Interest,
27 wait_for_initial_interest: bool,
28 ) -> (Self, impl Future<Output = ()>) {
29 let default_severity = interest.min_severity.map(Severity::from).unwrap_or(Severity::Info);
30 let min_severity = if wait_for_initial_interest {
31 let sync_proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
32 match sync_proxy.wait_for_interest_change(zx::MonotonicInstant::INFINITE) {
33 Ok(Ok(initial_interest)) => {
34 initial_interest.min_severity.map(Severity::from).unwrap_or(default_severity)
35 }
36 _ => default_severity,
37 }
38 } else {
39 default_severity
40 };
41
42 log::set_max_level(min_severity.into());
43
44 let listener = Arc::new(Mutex::new(None));
45 let filter = Self { listener: listener.clone() };
46 (filter, Self::listen_to_interest_changes(listener, default_severity, client.into_proxy()))
47 }
48
49 pub fn set_interest_listener<T>(&self, listener: T)
51 where
52 T: OnInterestChanged + Send + Sync + 'static,
53 {
54 let mut listener_guard = self.listener.lock().unwrap();
55 *listener_guard = Some(Box::new(listener));
56 }
57
58 async fn listen_to_interest_changes(
59 listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync>>>>,
60 default_severity: Severity,
61 proxy: LogSinkProxy,
62 ) {
63 while let Ok(Ok(interest)) = proxy.wait_for_interest_change().await {
64 let new_min_severity =
65 interest.min_severity.map(Severity::from).unwrap_or(default_severity);
66 log::set_max_level(new_min_severity.into());
67 let callback_guard = listener.lock().unwrap();
68 if let Some(callback) = &*callback_guard {
69 callback.on_changed(new_min_severity);
70 }
71 }
72 }
73
74 pub fn enabled_for_testing(&self, record: &TestRecord<'_>) -> bool {
75 let min_severity = Severity::try_from(log::max_level()).map(|s| s as u8).unwrap_or(u8::MAX);
76 min_severity <= record.severity
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use fidl::endpoints::create_request_stream;
84 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest, LogSinkRequestStream};
85 use futures::channel::mpsc;
86 use futures::{StreamExt, TryStreamExt};
87 use log::{debug, error, info, trace, warn};
88
89 struct SeverityTracker {
90 _filter: InterestFilter,
91 severity_counts: Arc<Mutex<SeverityCount>>,
92 }
93
94 impl log::Log for SeverityTracker {
95 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
96 true
97 }
98
99 fn log(&self, record: &log::Record<'_>) {
100 let mut count = self.severity_counts.lock().unwrap();
101 let to_increment = match record.level() {
102 log::Level::Trace => &mut count.trace,
103 log::Level::Debug => &mut count.debug,
104 log::Level::Info => &mut count.info,
105 log::Level::Warn => &mut count.warn,
106 log::Level::Error => &mut count.error,
107 };
108 *to_increment += 1;
109 }
110
111 fn flush(&self) {}
112 }
113
114 #[derive(Debug, Default, Eq, PartialEq)]
115 struct SeverityCount {
116 trace: u64,
117 debug: u64,
118 info: u64,
119 warn: u64,
120 error: u64,
121 }
122
123 struct InterestChangedListener(mpsc::UnboundedSender<()>);
124
125 impl OnInterestChanged for InterestChangedListener {
126 fn on_changed(&self, _: crate::Severity) {
127 self.0.unbounded_send(()).unwrap();
128 }
129 }
130
131 #[fuchsia::test(logging = false)]
132 async fn default_filter_is_info_when_unspecified() {
133 let (client, _requests) = create_request_stream::<LogSinkMarker>();
134 let (filter, _on_changes) =
135 InterestFilter::new(client, fdiagnostics::Interest::default(), false);
136 let observed = Arc::new(Mutex::new(SeverityCount::default()));
137 log::set_boxed_logger(Box::new(SeverityTracker {
138 severity_counts: observed.clone(),
139 _filter: filter,
140 }))
141 .unwrap();
142 let mut expected = SeverityCount::default();
143
144 error!("oops");
145 expected.error += 1;
146 assert_eq!(&*observed.lock().unwrap(), &expected);
147
148 warn!("maybe");
149 expected.warn += 1;
150 assert_eq!(&*observed.lock().unwrap(), &expected);
151
152 info!("ok");
153 expected.info += 1;
154 assert_eq!(&*observed.lock().unwrap(), &expected);
155
156 debug!("hint");
157 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
158
159 trace!("spew");
160 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
161 }
162
163 async fn send_interest_change(stream: &mut LogSinkRequestStream, severity: Option<Severity>) {
164 match stream.try_next().await {
165 Ok(Some(LogSinkRequest::WaitForInterestChange { responder })) => {
166 responder
167 .send(Ok(&fdiagnostics::Interest {
168 min_severity: severity.map(fdiagnostics::Severity::from),
169 ..Default::default()
170 }))
171 .expect("send response");
172 }
173 other => panic!("Expected WaitForInterestChange but got {:?}", other),
174 }
175 }
176
177 #[fuchsia::test(logging = false)]
178 async fn default_filter_on_interest_changed() {
179 let (client, mut requests) = create_request_stream::<LogSinkMarker>();
180 let (filter, on_changes) = InterestFilter::new(
181 client,
182 fdiagnostics::Interest {
183 min_severity: Some(fdiagnostics::Severity::Warn),
184 ..Default::default()
185 },
186 false,
187 );
188 let (send, mut recv) = mpsc::unbounded();
189 filter.set_interest_listener(InterestChangedListener(send));
190 let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
191 let observed = Arc::new(Mutex::new(SeverityCount::default()));
192 log::set_boxed_logger(Box::new(SeverityTracker {
193 severity_counts: observed.clone(),
194 _filter: filter,
195 }))
196 .expect("set logger");
197
198 send_interest_change(&mut requests, Some(Severity::Info)).await;
201 recv.next().await.unwrap();
202
203 let mut expected = SeverityCount::default();
204 error!("oops");
205 expected.error += 1;
206 assert_eq!(&*observed.lock().unwrap(), &expected);
207
208 warn!("maybe");
209 expected.warn += 1;
210 assert_eq!(&*observed.lock().unwrap(), &expected);
211
212 info!("ok");
213 expected.info += 1;
214 assert_eq!(&*observed.lock().unwrap(), &expected);
215
216 debug!("hint");
217 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
218
219 trace!("spew");
220 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
221
222 send_interest_change(&mut requests, None).await;
224 recv.next().await.unwrap();
225
226 error!("oops");
227 expected.error += 1;
228 assert_eq!(&*observed.lock().unwrap(), &expected);
229
230 warn!("maybe");
231 expected.warn += 1;
232 assert_eq!(&*observed.lock().unwrap(), &expected);
233
234 info!("ok");
235 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
236
237 debug!("hint");
238 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
239
240 trace!("spew");
241 assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
242 }
243
244 #[fuchsia::test(logging = false)]
245 async fn wait_for_initial_interest() {
246 let (client, mut requests) = create_request_stream::<LogSinkMarker>();
247 let t = std::thread::spawn(move || {
248 let _executor = fuchsia_async::LocalExecutor::new();
250 let (filter, _on_changes) =
251 InterestFilter::new(client, fdiagnostics::Interest::default(), true);
252 filter
253 });
254 if let Some(Ok(request)) = requests.next().await {
255 match request {
256 LogSinkRequest::WaitForInterestChange { responder } => {
257 responder
258 .send(Ok(&fdiagnostics::Interest {
259 min_severity: Some(fdiagnostics::Severity::Trace),
260 ..Default::default()
261 }))
262 .expect("sent initial interest");
263 }
264 other => panic!("Got unexpected: {:?}", other),
265 };
266 }
267 let _filter = t.join().unwrap();
268 assert_eq!(log::max_level(), log::Level::Trace);
269 }
270
271 #[fuchsia::test(logging = false)]
272 async fn log_frontend_tracks_severity() {
273 log::set_max_level(log::LevelFilter::Off);
275
276 let (client, mut requests) = create_request_stream::<LogSinkMarker>();
277 let (filter, on_changes) = InterestFilter::new(
278 client,
279 fdiagnostics::Interest {
280 min_severity: Some(fdiagnostics::Severity::Warn),
281 ..Default::default()
282 },
283 false,
284 );
285 assert_eq!(log::max_level(), log::LevelFilter::Warn);
287
288 let (send, mut recv) = mpsc::unbounded();
289 filter.set_interest_listener(InterestChangedListener(send));
290 let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
291
292 send_interest_change(&mut requests, Some(Severity::Trace)).await;
293 recv.next().await.unwrap();
294 assert_eq!(log::max_level(), log::LevelFilter::Trace);
295
296 send_interest_change(&mut requests, Some(Severity::Info)).await;
297 recv.next().await.unwrap();
298 assert_eq!(log::max_level(), log::LevelFilter::Info);
299 }
300}