diagnostics_log/fuchsia/
filter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::OnInterestChanged;
5use diagnostics_log_encoding::encode::TestRecord;
6use diagnostics_log_types::Severity;
7use fidl::endpoints::ClientEnd;
8use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy, LogSinkSynchronousProxy};
9use std::future::Future;
10use std::sync::{Arc, Mutex};
11
12#[cfg(fuchsia_api_level_less_than = "27")]
13use fidl_fuchsia_diagnostics as fdiagnostics;
14#[cfg(fuchsia_api_level_at_least = "27")]
15use fidl_fuchsia_diagnostics_types as fdiagnostics;
16
17pub(crate) struct InterestFilter {
18    listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync + 'static>>>>,
19}
20
21impl InterestFilter {
22    /// Constructs a new `InterestFilter` and a future which should be polled to listen
23    /// to changes in the LogSink's interest.
24    pub fn new(
25        client: ClientEnd<LogSinkMarker>,
26        interest: fdiagnostics::Interest,
27        wait_for_initial_interest: bool,
28    ) -> (Self, impl Future<Output = ()>) {
29        let default_severity = interest.min_severity.map(Severity::from).unwrap_or(Severity::Info);
30        let min_severity = if wait_for_initial_interest {
31            let sync_proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
32            match sync_proxy.wait_for_interest_change(zx::MonotonicInstant::INFINITE) {
33                Ok(Ok(initial_interest)) => {
34                    initial_interest.min_severity.map(Severity::from).unwrap_or(default_severity)
35                }
36                _ => default_severity,
37            }
38        } else {
39            default_severity
40        };
41
42        log::set_max_level(min_severity.into());
43
44        let listener = Arc::new(Mutex::new(None));
45        let filter = Self { listener: listener.clone() };
46        (filter, Self::listen_to_interest_changes(listener, default_severity, client.into_proxy()))
47    }
48
49    /// Sets the interest listener.
50    pub fn set_interest_listener<T>(&self, listener: T)
51    where
52        T: OnInterestChanged + Send + Sync + 'static,
53    {
54        let mut listener_guard = self.listener.lock().unwrap();
55        *listener_guard = Some(Box::new(listener));
56    }
57
58    async fn listen_to_interest_changes(
59        listener: Arc<Mutex<Option<Box<dyn OnInterestChanged + Send + Sync>>>>,
60        default_severity: Severity,
61        proxy: LogSinkProxy,
62    ) {
63        while let Ok(Ok(interest)) = proxy.wait_for_interest_change().await {
64            let new_min_severity =
65                interest.min_severity.map(Severity::from).unwrap_or(default_severity);
66            log::set_max_level(new_min_severity.into());
67            let callback_guard = listener.lock().unwrap();
68            if let Some(callback) = &*callback_guard {
69                callback.on_changed(new_min_severity);
70            }
71        }
72    }
73
74    pub fn enabled_for_testing(&self, record: &TestRecord<'_>) -> bool {
75        let min_severity = Severity::try_from(log::max_level()).map(|s| s as u8).unwrap_or(u8::MAX);
76        min_severity <= record.severity
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use fidl::endpoints::create_request_stream;
84    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest, LogSinkRequestStream};
85    use futures::channel::mpsc;
86    use futures::{StreamExt, TryStreamExt};
87    use log::{debug, error, info, trace, warn};
88
89    struct SeverityTracker {
90        _filter: InterestFilter,
91        severity_counts: Arc<Mutex<SeverityCount>>,
92    }
93
94    impl log::Log for SeverityTracker {
95        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
96            true
97        }
98
99        fn log(&self, record: &log::Record<'_>) {
100            let mut count = self.severity_counts.lock().unwrap();
101            let to_increment = match record.level() {
102                log::Level::Trace => &mut count.trace,
103                log::Level::Debug => &mut count.debug,
104                log::Level::Info => &mut count.info,
105                log::Level::Warn => &mut count.warn,
106                log::Level::Error => &mut count.error,
107            };
108            *to_increment += 1;
109        }
110
111        fn flush(&self) {}
112    }
113
114    #[derive(Debug, Default, Eq, PartialEq)]
115    struct SeverityCount {
116        trace: u64,
117        debug: u64,
118        info: u64,
119        warn: u64,
120        error: u64,
121    }
122
123    struct InterestChangedListener(mpsc::UnboundedSender<()>);
124
125    impl OnInterestChanged for InterestChangedListener {
126        fn on_changed(&self, _: crate::Severity) {
127            self.0.unbounded_send(()).unwrap();
128        }
129    }
130
131    #[fuchsia::test(logging = false)]
132    async fn default_filter_is_info_when_unspecified() {
133        let (client, _requests) = create_request_stream::<LogSinkMarker>();
134        let (filter, _on_changes) =
135            InterestFilter::new(client, fdiagnostics::Interest::default(), false);
136        let observed = Arc::new(Mutex::new(SeverityCount::default()));
137        log::set_boxed_logger(Box::new(SeverityTracker {
138            severity_counts: observed.clone(),
139            _filter: filter,
140        }))
141        .unwrap();
142        let mut expected = SeverityCount::default();
143
144        error!("oops");
145        expected.error += 1;
146        assert_eq!(&*observed.lock().unwrap(), &expected);
147
148        warn!("maybe");
149        expected.warn += 1;
150        assert_eq!(&*observed.lock().unwrap(), &expected);
151
152        info!("ok");
153        expected.info += 1;
154        assert_eq!(&*observed.lock().unwrap(), &expected);
155
156        debug!("hint");
157        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
158
159        trace!("spew");
160        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
161    }
162
163    async fn send_interest_change(stream: &mut LogSinkRequestStream, severity: Option<Severity>) {
164        match stream.try_next().await {
165            Ok(Some(LogSinkRequest::WaitForInterestChange { responder })) => {
166                responder
167                    .send(Ok(&fdiagnostics::Interest {
168                        min_severity: severity.map(fdiagnostics::Severity::from),
169                        ..Default::default()
170                    }))
171                    .expect("send response");
172            }
173            other => panic!("Expected WaitForInterestChange but got {:?}", other),
174        }
175    }
176
177    #[fuchsia::test(logging = false)]
178    async fn default_filter_on_interest_changed() {
179        let (client, mut requests) = create_request_stream::<LogSinkMarker>();
180        let (filter, on_changes) = InterestFilter::new(
181            client,
182            fdiagnostics::Interest {
183                min_severity: Some(fdiagnostics::Severity::Warn),
184                ..Default::default()
185            },
186            false,
187        );
188        let (send, mut recv) = mpsc::unbounded();
189        filter.set_interest_listener(InterestChangedListener(send));
190        let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
191        let observed = Arc::new(Mutex::new(SeverityCount::default()));
192        log::set_boxed_logger(Box::new(SeverityTracker {
193            severity_counts: observed.clone(),
194            _filter: filter,
195        }))
196        .expect("set logger");
197
198        // After overriding to info, filtering is at info level. The mpsc channel is used to
199        // get a signal as to when the filter has processed the update.
200        send_interest_change(&mut requests, Some(Severity::Info)).await;
201        recv.next().await.unwrap();
202
203        let mut expected = SeverityCount::default();
204        error!("oops");
205        expected.error += 1;
206        assert_eq!(&*observed.lock().unwrap(), &expected);
207
208        warn!("maybe");
209        expected.warn += 1;
210        assert_eq!(&*observed.lock().unwrap(), &expected);
211
212        info!("ok");
213        expected.info += 1;
214        assert_eq!(&*observed.lock().unwrap(), &expected);
215
216        debug!("hint");
217        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
218
219        trace!("spew");
220        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
221
222        // After resetting to default, filtering is at warn level.
223        send_interest_change(&mut requests, None).await;
224        recv.next().await.unwrap();
225
226        error!("oops");
227        expected.error += 1;
228        assert_eq!(&*observed.lock().unwrap(), &expected);
229
230        warn!("maybe");
231        expected.warn += 1;
232        assert_eq!(&*observed.lock().unwrap(), &expected);
233
234        info!("ok");
235        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
236
237        debug!("hint");
238        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
239
240        trace!("spew");
241        assert_eq!(&*observed.lock().unwrap(), &expected, "should not increment counters");
242    }
243
244    #[fuchsia::test(logging = false)]
245    async fn wait_for_initial_interest() {
246        let (client, mut requests) = create_request_stream::<LogSinkMarker>();
247        let t = std::thread::spawn(move || {
248            // Unused, but its existence is needed by AsyncChannel.
249            let _executor = fuchsia_async::LocalExecutor::new();
250            let (filter, _on_changes) =
251                InterestFilter::new(client, fdiagnostics::Interest::default(), true);
252            filter
253        });
254        if let Some(Ok(request)) = requests.next().await {
255            match request {
256                LogSinkRequest::WaitForInterestChange { responder } => {
257                    responder
258                        .send(Ok(&fdiagnostics::Interest {
259                            min_severity: Some(fdiagnostics::Severity::Trace),
260                            ..Default::default()
261                        }))
262                        .expect("sent initial interest");
263                }
264                other => panic!("Got unexpected: {:?}", other),
265            };
266        }
267        let _filter = t.join().unwrap();
268        assert_eq!(log::max_level(), log::Level::Trace);
269    }
270
271    #[fuchsia::test(logging = false)]
272    async fn log_frontend_tracks_severity() {
273        // Manually set to a known value.
274        log::set_max_level(log::LevelFilter::Off);
275
276        let (client, mut requests) = create_request_stream::<LogSinkMarker>();
277        let (filter, on_changes) = InterestFilter::new(
278            client,
279            fdiagnostics::Interest {
280                min_severity: Some(fdiagnostics::Severity::Warn),
281                ..Default::default()
282            },
283            false,
284        );
285        // Log frontend tracks the default min_severity.
286        assert_eq!(log::max_level(), log::LevelFilter::Warn);
287
288        let (send, mut recv) = mpsc::unbounded();
289        filter.set_interest_listener(InterestChangedListener(send));
290        let _on_changes_task = fuchsia_async::Task::spawn(on_changes);
291
292        send_interest_change(&mut requests, Some(Severity::Trace)).await;
293        recv.next().await.unwrap();
294        assert_eq!(log::max_level(), log::LevelFilter::Trace);
295
296        send_interest_change(&mut requests, Some(Severity::Info)).await;
297        recv.next().await.unwrap();
298        assert_eq!(log::max_level(), log::LevelFilter::Info);
299    }
300}