archivist_lib/logs/
listener.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use diagnostics_data::LogsData;
5use diagnostics_message::error::MessageError;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8    LogFilterOptions, LogListenerSafeMarker, LogListenerSafeProxy, LogMessage,
9};
10use futures::prelude::*;
11use log::{debug, error, trace};
12use logmessage_measure_tape::Measurable as _;
13use std::sync::Arc;
14use std::task::Poll;
15use thiserror::Error;
16
17mod filter;
18
19use filter::MessageFilter;
20
21// Number of bytes the header of a vector occupies in a fidl message.
22const FIDL_VECTOR_HEADER_BYTES: usize = 16;
23
24/// An individual log listener. Wraps the FIDL type `LogListenerProxy` in filtering options provided
25/// when connecting.
26pub struct Listener {
27    listener: LogListenerSafeProxy,
28    filter: MessageFilter,
29    status: Status,
30}
31
32#[derive(Debug, PartialEq)]
33enum Status {
34    Fine,
35    Stale,
36}
37
38fn is_valid(message: &LogMessage) -> bool {
39    // Check that the tags fit in FIDL.
40    if message.tags.len() > fidl_fuchsia_logger::MAX_TAGS.into() {
41        debug!("Unable to encode message, it exceeded our MAX_TAGS");
42        return false;
43    }
44    for tag in &message.tags {
45        if tag.len() > fidl_fuchsia_logger::MAX_TAG_LEN_BYTES.into() {
46            debug!("Unable to encode message, it exceeded our MAX_TAG_LEN_BYTES");
47            return false;
48        }
49    }
50
51    // If a message by itself is too big to fit into fidl, warn and skip.
52    let msg_size = message.measure().num_bytes;
53    if msg_size + FIDL_VECTOR_HEADER_BYTES > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
54        debug!("Unable to encode message, it exceeded our MAX_LOG_MANY_SIZE_BYTES by itself.");
55        return false;
56    }
57    true
58}
59
60impl Listener {
61    /// Create a new `Listener`. Fails if `client` can't be converted into a `LogListenerProxy` or
62    /// if `LogFilterOptions` are invalid.
63    pub fn new(
64        log_listener: ClientEnd<LogListenerSafeMarker>,
65        options: Option<Box<LogFilterOptions>>,
66    ) -> Result<Self, ListenerError> {
67        debug!("New listener with options {:?}", &options);
68        Ok(Self {
69            status: Status::Fine,
70            listener: log_listener.into_proxy(),
71            filter: MessageFilter::new(options)?,
72        })
73    }
74
75    /// Send messages to the listener. First eagerly collects any backlog and sends it out in
76    /// batches before waiting for wakeups.
77    pub async fn run(
78        mut self,
79        mut logs: impl Stream<Item = Arc<LogsData>> + Unpin,
80        call_done: bool,
81    ) {
82        debug!("Backfilling from cursor until pending.");
83        let mut backlog = vec![];
84        futures::future::poll_fn(|cx| {
85            while let Poll::Ready(Some(next)) = logs.poll_next_unpin(cx) {
86                backlog.push(next);
87            }
88
89            Poll::Ready(())
90        })
91        .await;
92
93        self.backfill(backlog).await;
94        debug!("Done backfilling.");
95        if !self.is_healthy() {
96            return;
97        }
98
99        self.send_new_logs(logs).await;
100        if call_done {
101            self.listener.done().ok();
102        }
103        debug!("Listener exiting.");
104    }
105
106    /// Returns whether this listener should continue receiving messages.
107    fn is_healthy(&self) -> bool {
108        self.status == Status::Fine
109    }
110
111    async fn send_new_logs<S>(&mut self, mut logs: S)
112    where
113        S: Stream<Item = Arc<LogsData>> + Unpin,
114    {
115        while let Some(message) = logs.next().await {
116            self.send_log(&message).await;
117            if !self.is_healthy() {
118                break;
119            }
120        }
121    }
122
123    /// Send all messages currently in the provided buffer to this listener. Attempts to batch up
124    /// to the message size limit. Returns early if the listener appears to be unhealthy.
125    async fn backfill(&mut self, mut messages: Vec<Arc<LogsData>>) {
126        messages.sort_by_key(|m| m.metadata.timestamp);
127
128        // Initialize batch size to the size of the vector header.
129        let mut batch_size = FIDL_VECTOR_HEADER_BYTES;
130        let mut filtered_batch = vec![];
131        for msg in messages {
132            if self.filter.should_send(&msg) {
133                // Convert archivist-encoded log message to legacy format expected
134                // by the listener, then use measure_tape to get true size.
135                let legacy_msg: LogMessage = msg.as_ref().into();
136                let msg_size = legacy_msg.measure().num_bytes;
137
138                if !is_valid(&legacy_msg) {
139                    continue;
140                }
141
142                if batch_size + msg_size > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
143                    self.send_filtered_logs(&filtered_batch).await;
144                    if !self.is_healthy() {
145                        return;
146                    }
147                    filtered_batch.clear();
148                    batch_size = FIDL_VECTOR_HEADER_BYTES;
149                }
150
151                batch_size += msg_size;
152                filtered_batch.push(legacy_msg);
153            }
154        }
155
156        if !filtered_batch.is_empty() {
157            self.send_filtered_logs(&filtered_batch).await;
158        }
159    }
160
161    /// Send a batch of pre-filtered log messages to this listener.
162    async fn send_filtered_logs(&mut self, log_messages: &[LogMessage]) {
163        trace!("Flushing batch.");
164        self.check_result(self.listener.log_many(log_messages).await);
165    }
166
167    /// Send a single log message if it should be sent according to this listener's filter settings.
168    async fn send_log(&mut self, log_message: &LogsData) {
169        if self.filter.should_send(log_message) {
170            let to_send: LogMessage = log_message.into();
171            if !is_valid(&to_send) {
172                return;
173            }
174            self.check_result(self.listener.log(&to_send).await);
175        }
176    }
177
178    /// Consume the result of sending logs to this listener, potentially marking it stale.
179    fn check_result(&mut self, result: Result<(), fidl::Error>) {
180        if let Err(e) = result {
181            if e.is_closed() {
182                self.status = Status::Stale;
183            } else {
184                error!(e:?; "Error calling listener");
185            }
186        }
187    }
188}
189
190#[derive(Debug, Error)]
191pub enum ListenerError {
192    #[error("{count} tags provided, max {}", fidl_fuchsia_logger::MAX_TAGS)]
193    TooManyTags { count: usize },
194
195    #[error("tag at index {index} is too long, max {}", fidl_fuchsia_logger::MAX_TAG_LEN_BYTES)]
196    TagTooLong { index: usize },
197
198    #[error("couldn't create LogListenerProxy")]
199    CreatingListenerProxy { source: fidl::Error },
200
201    #[error("couldn't decode value: {source}")]
202    Decode {
203        #[from]
204        source: MessageError,
205    },
206
207    #[error("error while forwarding unsafe log requests: {source}")]
208    AsbestosIo { source: fidl::Error },
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::identity::ComponentIdentity;
215    use diagnostics_message::{fx_log_packet_t, LoggerMessage, METADATA_SIZE};
216    use fidl::endpoints::ServerEnd;
217    use fidl_fuchsia_logger::{LogLevelFilter, LogListenerSafeRequest};
218    use fuchsia_async as fasync;
219    use libc::c_char;
220    use moniker::ExtendedMoniker;
221
222    #[fuchsia::test]
223    async fn normal_behavior_test() {
224        let message_vec =
225            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 4);
226
227        assert_eq!(run_and_consume_backfill(message_vec).await, 4);
228    }
229
230    #[fuchsia::test]
231    async fn packet_fits_but_converted_struct_would_cause_overflow_test() {
232        let message_vec =
233            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
234
235        assert_eq!(run_and_consume_backfill(message_vec).await, 0);
236    }
237
238    #[fuchsia::test]
239    async fn one_packet_would_overflow_but_others_fit_test() {
240        let mut message_vec =
241            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
242
243        message_vec.append(&mut provide_messages(
244            fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize,
245            4,
246        ));
247
248        assert_eq!(run_and_consume_backfill(message_vec).await, 4);
249    }
250
251    #[fuchsia::test]
252    async fn verify_client_disconnect() {
253        let message_vec =
254            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 3);
255        let logs = stream::iter(message_vec);
256
257        let (client_end, mut requests) =
258            fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
259        let mut listener = Listener::new(client_end, None).unwrap();
260
261        let listener_task = fasync::Task::spawn(async move {
262            listener.send_new_logs(logs).await;
263        });
264
265        match requests.next().await.unwrap() {
266            Ok(LogListenerSafeRequest::Log { log: _, responder }) => {
267                responder.send().unwrap();
268            }
269            other => panic!("Unexpected request: {other:?}"),
270        }
271        drop(requests);
272
273        // The task should finish since the `LogListenerSafe` server disconnected.
274        listener_task.await;
275    }
276
277    async fn run_and_consume_backfill(message_vec: Vec<Arc<LogsData>>) -> usize {
278        let (client, server) = zx::Channel::create();
279        let client_end = ClientEnd::<LogListenerSafeMarker>::new(client);
280        let mut listener_server = ServerEnd::<LogListenerSafeMarker>::new(server).into_stream();
281        let mut listener = Listener::new(client_end, None).unwrap();
282
283        fasync::Task::spawn(async move {
284            listener.backfill(message_vec).await;
285        })
286        .detach();
287
288        let mut observed_logs: usize = 0;
289        while let Some(req) = listener_server.try_next().await.unwrap() {
290            match req {
291                LogListenerSafeRequest::LogMany { log, responder } => {
292                    observed_logs += log.len();
293                    responder.send().unwrap();
294                }
295                _ => panic!("only testing backfill mode."),
296            }
297        }
298
299        observed_logs
300    }
301
302    fn provide_messages(summed_msg_size_bytes: usize, num_messages: usize) -> Vec<Arc<LogsData>> {
303        let per_msg_size = summed_msg_size_bytes / num_messages;
304        let mut message_vec = Vec::new();
305        for _ in 0..num_messages {
306            let byte_encoding = generate_byte_encoded_log(per_msg_size);
307            message_vec.push(Arc::new(diagnostics_message::from_logger(
308                get_test_identity().into(),
309                LoggerMessage::try_from(byte_encoding.as_bytes()).unwrap(),
310            )))
311        }
312
313        message_vec
314    }
315
316    // Generate an fx log packet of a target size with size split between tags and data.
317    fn generate_byte_encoded_log(target_size: usize) -> fx_log_packet_t {
318        let mut test_packet = test_packet();
319        let data_size = target_size - METADATA_SIZE;
320        let tag_size =
321            core::cmp::min(data_size / 2, fidl_fuchsia_logger::MAX_TAG_LEN_BYTES as usize);
322        let message_size = data_size - tag_size;
323
324        populate_packet(&mut test_packet, tag_size, message_size);
325        test_packet
326    }
327
328    fn test_packet() -> fx_log_packet_t {
329        let mut packet: fx_log_packet_t = Default::default();
330        packet.metadata.pid = 1;
331        packet.metadata.tid = 2;
332        packet.metadata.time = 3;
333        packet.metadata.severity = LogLevelFilter::Debug as i32;
334        packet.metadata.dropped_logs = 10;
335        packet
336    }
337
338    fn populate_packet(packet: &mut fx_log_packet_t, tag_count: usize, message_size: usize) {
339        let tag_start = 1;
340        let tag_end = tag_start + tag_count;
341
342        packet.data[0] = tag_count as c_char;
343        packet.fill_data(tag_start..tag_end, b'T' as _);
344        packet.data[tag_end] = 0; // terminate tags
345
346        let message_start = tag_start + tag_count + 1;
347        let message_end = message_start + message_size;
348        packet.fill_data(message_start..message_end, b'D' as _);
349    }
350
351    fn get_test_identity() -> ComponentIdentity {
352        ComponentIdentity::new(
353            ExtendedMoniker::parse_str("./fake-test-env/bleebloo").unwrap(),
354            "fuchsia-pkg://fuchsia.com/testing123#test-component.cm",
355        )
356    }
357}