diagnostics_log/fuchsia/
filter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::OnInterestChanged;
5use diagnostics_log_encoding::encode::TestRecord;
6use diagnostics_log_types::Severity;
7use fidl::endpoints::Proxy;
8use fidl_fuchsia_logger::{LogSinkProxy, LogSinkSynchronousProxy};
9use std::future::Future;
10use std::sync::{Arc, Mutex};
11
12#[cfg(fuchsia_api_level_less_than = "NEXT")]
13use fidl_fuchsia_diagnostics as fdiagnostics;
14#[cfg(fuchsia_api_level_at_least = "NEXT")]
15use fidl_fuchsia_diagnostics_types as fdiagnostics;
16
17pub(crate) struct InterestFilter {
18    listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync + 'static>>>>,
19}
20
21impl InterestFilter {
22    /// Constructs a new `InterestFilter` and a future which should be polled to listen
23    /// to changes in the LogSink's interest.
24    pub fn new(
25        proxy: LogSinkProxy,
26        interest: fdiagnostics::Interest,
27        wait_for_initial_interest: bool,
28    ) -> (Self, impl Future<Output = ()>) {
29        let default_severity = interest.min_severity.map(Severity::from).unwrap_or(Severity::Info);
30        let (proxy, min_severity) = if wait_for_initial_interest {
31            let sync_proxy = LogSinkSynchronousProxy::new(proxy.into_channel().unwrap().into());
32            let initial_severity = match sync_proxy
33                .wait_for_interest_change(zx::MonotonicInstant::INFINITE)
34            {
35                Ok(Ok(initial_interest)) => {
36                    initial_interest.min_severity.map(Severity::from).unwrap_or(default_severity)
37                }
38                _ => default_severity,
39            };
40            (
41                LogSinkProxy::new(fidl::AsyncChannel::from_channel(sync_proxy.into_channel())),
42                initial_severity,
43            )
44        } else {
45            (proxy, default_severity)
46        };
47
48        log::set_max_level(min_severity.into());
49
50        let listener = Arc::new(Mutex::new(None));
51        let filter = Self { listener: listener.clone() };
52        (filter, Self::listen_to_interest_changes(listener, default_severity, proxy))
53    }
54
55    /// Sets the interest listener.
56    pub fn set_interest_listener<T>(&self, listener: T)
57    where
58        T: OnInterestChanged + Send + Sync + 'static,
59    {
60        let mut listener_guard = self.listener.lock().unwrap();
61        *listener_guard = Some(Box::new(listener));
62    }
63
64    async fn listen_to_interest_changes(
65        listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync>>>>,
66        default_severity: Severity,
67        proxy: LogSinkProxy,
68    ) {
69        while let Ok(Ok(interest)) = proxy.wait_for_interest_change().await {
70            let new_min_severity =
71                interest.min_severity.map(Severity::from).unwrap_or(default_severity);
72            log::set_max_level(new_min_severity.into());
73            let callback_guard = listener.lock().unwrap();
74            if let Some(callback) = &*callback_guard {
75                callback.on_changed(new_min_severity);
76            }
77        }
78    }
79
80    pub fn enabled_for_testing(&self, record: &TestRecord<'_>) -> bool {
81        let min_severity = Severity::try_from(log::max_level()).map(|s| s as u8).unwrap_or(u8::MAX);
82        min_severity <= record.severity
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use fidl::endpoints::create_proxy_and_stream;
90    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest, LogSinkRequestStream};
91    use futures::channel::mpsc;
92    use futures::{StreamExt, TryStreamExt};
93    use log::{debug, error, info, trace, warn};
94
95    struct SeverityTracker {
96        _filter: InterestFilter,
97        severity_counts: Arc<Mutex<SeverityCount>>,
98    }
99
100    impl log::Log for SeverityTracker {
101        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
102            true
103        }
104
105        fn log(&self, record: &log::Record<'_>) {
106            let mut count = self.severity_counts.lock().unwrap();
107            let to_increment = match record.level() {
108                log::Level::Trace => &mut count.trace,
109                log::Level::Debug => &mut count.debug,
110                log::Level::Info => &mut count.info,
111                log::Level::Warn => &mut count.warn,
112                log::Level::Error => &mut count.error,
113            };
114            *to_increment += 1;
115        }
116
117        fn flush(&self) {}
118    }
119
120    #[derive(Debug, Default, Eq, PartialEq)]
121    struct SeverityCount {
122        trace: u64,
123        debug: u64,
124        info: u64,
125        warn: u64,
126        error: u64,
127    }
128
129    struct InterestChangedListener(mpsc::UnboundedSender<()>);
130
131    impl OnInterestChanged for InterestChangedListener {
132        fn on_changed(&self, _: crate::Severity) {
133            self.0.unbounded_send(()).unwrap();
134        }
135    }
136
137    #[fuchsia::test(logging = false)]
138    async fn default_filter_is_info_when_unspecified() {
139        let (proxy, _requests) = create_proxy_and_stream::<LogSinkMarker>();
140        let (filter, _on_changes) =
141            InterestFilter::new(proxy, fdiagnostics::Interest::default(), false);
142        let observed = Arc::new(Mutex::new(SeverityCount::default()));
143        log::set_boxed_logger(Box::new(SeverityTracker {
144            severity_counts: observed.clone(),
145            _filter: filter,
146        }))
147        .unwrap();
148        let mut expected = SeverityCount::default();
149
150        error!("oops");
151        expected.error += 1;
152        assert_eq!(&*observed.lock().unwrap(), &expected);
153
154        warn!("maybe");
155        expected.warn += 1;
156        assert_eq!(&*observed.lock().unwrap(), &expected);
157
158        info!("ok");
159        expected.info += 1;
160        assert_eq!(&*observed.lock().unwrap(), &expected);
161
162        debug!("hint");
163        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
164
165        trace!("spew");
166        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
167    }
168
169    async fn send_interest_change(stream: &mut LogSinkRequestStream, severity: Option<Severity>) {
170        match stream.try_next().await {
171            Ok(Some(LogSinkRequest::WaitForInterestChange { responder })) => {
172                responder
173                    .send(Ok(&fdiagnostics::Interest {
174                        min_severity: severity.map(fdiagnostics::Severity::from),
175                        ..Default::default()
176                    }))
177                    .expect("send response");
178            }
179            other => panic!("Expected WaitForInterestChange but got {:?}", other),
180        }
181    }
182
183    #[fuchsia::test(logging = false)]
184    async fn default_filter_on_interest_changed() {
185        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
186        let (filter, on_changes) = InterestFilter::new(
187            proxy,
188            fdiagnostics::Interest {
189                min_severity: Some(fdiagnostics::Severity::Warn),
190                ..Default::default()
191            },
192            false,
193        );
194        let (send, mut recv) = mpsc::unbounded();
195        filter.set_interest_listener(InterestChangedListener(send));
196        let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
197        let observed = Arc::new(Mutex::new(SeverityCount::default()));
198        log::set_boxed_logger(Box::new(SeverityTracker {
199            severity_counts: observed.clone(),
200            _filter: filter,
201        }))
202        .expect("set logger");
203
204        // After overriding to info, filtering is at info level. The mpsc channel is used to
205        // get a signal as to when the filter has processed the update.
206        send_interest_change(&mut requests, Some(Severity::Info)).await;
207        recv.next().await.unwrap();
208
209        let mut expected = SeverityCount::default();
210        error!("oops");
211        expected.error += 1;
212        assert_eq!(&*observed.lock().unwrap(), &expected);
213
214        warn!("maybe");
215        expected.warn += 1;
216        assert_eq!(&*observed.lock().unwrap(), &expected);
217
218        info!("ok");
219        expected.info += 1;
220        assert_eq!(&*observed.lock().unwrap(), &expected);
221
222        debug!("hint");
223        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
224
225        trace!("spew");
226        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
227
228        // After resetting to default, filtering is at warn level.
229        send_interest_change(&mut requests, None).await;
230        recv.next().await.unwrap();
231
232        error!("oops");
233        expected.error += 1;
234        assert_eq!(&*observed.lock().unwrap(), &expected);
235
236        warn!("maybe");
237        expected.warn += 1;
238        assert_eq!(&*observed.lock().unwrap(), &expected);
239
240        info!("ok");
241        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
242
243        debug!("hint");
244        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
245
246        trace!("spew");
247        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
248    }
249
250    #[fuchsia::test(logging = false)]
251    async fn wait_for_initial_interest() {
252        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
253        let t = std::thread::spawn(move || {
254            // Unused, but its existence is needed by AsyncChannel.
255            let _executor = fuchsia_async::LocalExecutor::new();
256            let (filter, _on_changes) =
257                InterestFilter::new(proxy, fdiagnostics::Interest::default(), true);
258            filter
259        });
260        if let Some(Ok(request)) = requests.next().await {
261            match request {
262                LogSinkRequest::WaitForInterestChange { responder } => {
263                    responder
264                        .send(Ok(&fdiagnostics::Interest {
265                            min_severity: Some(fdiagnostics::Severity::Trace),
266                            ..Default::default()
267                        }))
268                        .expect("sent initial interest");
269                }
270                other => panic!("Got unexpected: {:?}", other),
271            };
272        }
273        let _filter = t.join().unwrap();
274        assert_eq!(log::max_level(), log::Level::Trace);
275    }
276
277    #[fuchsia::test(logging = false)]
278    async fn log_frontend_tracks_severity() {
279        // Manually set to a known value.
280        log::set_max_level(log::LevelFilter::Off);
281
282        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
283        let (filter, on_changes) = InterestFilter::new(
284            proxy,
285            fdiagnostics::Interest {
286                min_severity: Some(fdiagnostics::Severity::Warn),
287                ..Default::default()
288            },
289            false,
290        );
291        // Log frontend tracks the default min_severity.
292        assert_eq!(log::max_level(), log::LevelFilter::Warn);
293
294        let (send, mut recv) = mpsc::unbounded();
295        filter.set_interest_listener(InterestChangedListener(send));
296        let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
297
298        send_interest_change(&mut requests, Some(Severity::Trace)).await;
299        recv.next().await.unwrap();
300        assert_eq!(log::max_level(), log::LevelFilter::Trace);
301
302        send_interest_change(&mut requests, Some(Severity::Info)).await;
303        recv.next().await.unwrap();
304        assert_eq!(log::max_level(), log::LevelFilter::Info);
305    }
306}