archivist_lib/logs/
listener.rs1use diagnostics_data::LogsData;
5use diagnostics_message::error::MessageError;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8 LogFilterOptions, LogListenerSafeMarker, LogListenerSafeProxy, LogMessage,
9};
10use futures::prelude::*;
11use log::{debug, error, trace};
12use logmessage_measure_tape::Measurable as _;
13use std::sync::Arc;
14use std::task::Poll;
15use thiserror::Error;
16
17mod filter;
18
19use filter::MessageFilter;
20
21const FIDL_VECTOR_HEADER_BYTES: usize = 16;
23
24pub struct Listener {
27 listener: LogListenerSafeProxy,
28 filter: MessageFilter,
29 status: Status,
30}
31
32#[derive(Debug, PartialEq)]
33enum Status {
34 Fine,
35 Stale,
36}
37
38fn is_valid(message: &LogMessage) -> bool {
39 if message.tags.len() > fidl_fuchsia_logger::MAX_TAGS.into() {
41 debug!("Unable to encode message, it exceeded our MAX_TAGS");
42 return false;
43 }
44 for tag in &message.tags {
45 if tag.len() > fidl_fuchsia_logger::MAX_TAG_LEN_BYTES.into() {
46 debug!("Unable to encode message, it exceeded our MAX_TAG_LEN_BYTES");
47 return false;
48 }
49 }
50
51 let msg_size = message.measure().num_bytes;
53 if msg_size + FIDL_VECTOR_HEADER_BYTES > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
54 debug!("Unable to encode message, it exceeded our MAX_LOG_MANY_SIZE_BYTES by itself.");
55 return false;
56 }
57 true
58}
59
60impl Listener {
61 pub fn new(
64 log_listener: ClientEnd<LogListenerSafeMarker>,
65 options: Option<Box<LogFilterOptions>>,
66 ) -> Result<Self, ListenerError> {
67 debug!("New listener with options {:?}", &options);
68 Ok(Self {
69 status: Status::Fine,
70 listener: log_listener.into_proxy(),
71 filter: MessageFilter::new(options)?,
72 })
73 }
74
75 pub async fn run(
78 mut self,
79 mut logs: impl Stream<Item = Arc<LogsData>> + Unpin,
80 call_done: bool,
81 ) {
82 debug!("Backfilling from cursor until pending.");
83 let mut backlog = vec![];
84 futures::future::poll_fn(|cx| {
85 while let Poll::Ready(Some(next)) = logs.poll_next_unpin(cx) {
86 backlog.push(next);
87 }
88
89 Poll::Ready(())
90 })
91 .await;
92
93 self.backfill(backlog).await;
94 debug!("Done backfilling.");
95 if !self.is_healthy() {
96 return;
97 }
98
99 self.send_new_logs(logs).await;
100 if call_done {
101 self.listener.done().ok();
102 }
103 debug!("Listener exiting.");
104 }
105
106 fn is_healthy(&self) -> bool {
108 self.status == Status::Fine
109 }
110
111 async fn send_new_logs<S>(&mut self, mut logs: S)
112 where
113 S: Stream<Item = Arc<LogsData>> + Unpin,
114 {
115 while let Some(message) = logs.next().await {
116 self.send_log(&message).await;
117 if !self.is_healthy() {
118 break;
119 }
120 }
121 }
122
123 async fn backfill(&mut self, mut messages: Vec<Arc<LogsData>>) {
126 messages.sort_by_key(|m| m.metadata.timestamp);
127
128 let mut batch_size = FIDL_VECTOR_HEADER_BYTES;
130 let mut filtered_batch = vec![];
131 for msg in messages {
132 if self.filter.should_send(&msg) {
133 let legacy_msg: LogMessage = msg.as_ref().into();
136 let msg_size = legacy_msg.measure().num_bytes;
137
138 if !is_valid(&legacy_msg) {
139 continue;
140 }
141
142 if batch_size + msg_size > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
143 self.send_filtered_logs(&filtered_batch).await;
144 if !self.is_healthy() {
145 return;
146 }
147 filtered_batch.clear();
148 batch_size = FIDL_VECTOR_HEADER_BYTES;
149 }
150
151 batch_size += msg_size;
152 filtered_batch.push(legacy_msg);
153 }
154 }
155
156 if !filtered_batch.is_empty() {
157 self.send_filtered_logs(&filtered_batch).await;
158 }
159 }
160
161 async fn send_filtered_logs(&mut self, log_messages: &[LogMessage]) {
163 trace!("Flushing batch.");
164 self.check_result(self.listener.log_many(log_messages).await);
165 }
166
167 async fn send_log(&mut self, log_message: &LogsData) {
169 if self.filter.should_send(log_message) {
170 let to_send: LogMessage = log_message.into();
171 if !is_valid(&to_send) {
172 return;
173 }
174 self.check_result(self.listener.log(&to_send).await);
175 }
176 }
177
178 fn check_result(&mut self, result: Result<(), fidl::Error>) {
180 if let Err(e) = result {
181 if e.is_closed() {
182 self.status = Status::Stale;
183 } else {
184 error!(e:?; "Error calling listener");
185 }
186 }
187 }
188}
189
190#[derive(Debug, Error)]
191pub enum ListenerError {
192 #[error("{count} tags provided, max {}", fidl_fuchsia_logger::MAX_TAGS)]
193 TooManyTags { count: usize },
194
195 #[error("tag at index {index} is too long, max {}", fidl_fuchsia_logger::MAX_TAG_LEN_BYTES)]
196 TagTooLong { index: usize },
197
198 #[error("couldn't create LogListenerProxy")]
199 CreatingListenerProxy { source: fidl::Error },
200
201 #[error("couldn't decode value: {source}")]
202 Decode {
203 #[from]
204 source: MessageError,
205 },
206
207 #[error("error while forwarding unsafe log requests: {source}")]
208 AsbestosIo { source: fidl::Error },
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::identity::ComponentIdentity;
215 use diagnostics_message::{fx_log_packet_t, LoggerMessage, METADATA_SIZE};
216 use fidl::endpoints::ServerEnd;
217 use fidl_fuchsia_logger::{LogLevelFilter, LogListenerSafeRequest};
218 use fuchsia_async as fasync;
219 use libc::c_char;
220 use moniker::ExtendedMoniker;
221
222 #[fuchsia::test]
223 async fn normal_behavior_test() {
224 let message_vec =
225 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 4);
226
227 assert_eq!(run_and_consume_backfill(message_vec).await, 4);
228 }
229
230 #[fuchsia::test]
231 async fn packet_fits_but_converted_struct_would_cause_overflow_test() {
232 let message_vec =
233 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
234
235 assert_eq!(run_and_consume_backfill(message_vec).await, 0);
236 }
237
238 #[fuchsia::test]
239 async fn one_packet_would_overflow_but_others_fit_test() {
240 let mut message_vec =
241 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
242
243 message_vec.append(&mut provide_messages(
244 fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize,
245 4,
246 ));
247
248 assert_eq!(run_and_consume_backfill(message_vec).await, 4);
249 }
250
251 #[fuchsia::test]
252 async fn verify_client_disconnect() {
253 let message_vec =
254 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 3);
255 let logs = stream::iter(message_vec);
256
257 let (client_end, mut requests) =
258 fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
259 let mut listener = Listener::new(client_end, None).unwrap();
260
261 let listener_task = fasync::Task::spawn(async move {
262 listener.send_new_logs(logs).await;
263 });
264
265 match requests.next().await.unwrap() {
266 Ok(LogListenerSafeRequest::Log { log: _, responder }) => {
267 responder.send().unwrap();
268 }
269 other => panic!("Unexpected request: {other:?}"),
270 }
271 drop(requests);
272
273 listener_task.await;
275 }
276
277 async fn run_and_consume_backfill(message_vec: Vec<Arc<LogsData>>) -> usize {
278 let (client, server) = zx::Channel::create();
279 let client_end = ClientEnd::<LogListenerSafeMarker>::new(client);
280 let mut listener_server = ServerEnd::<LogListenerSafeMarker>::new(server).into_stream();
281 let mut listener = Listener::new(client_end, None).unwrap();
282
283 fasync::Task::spawn(async move {
284 listener.backfill(message_vec).await;
285 })
286 .detach();
287
288 let mut observed_logs: usize = 0;
289 while let Some(req) = listener_server.try_next().await.unwrap() {
290 match req {
291 LogListenerSafeRequest::LogMany { log, responder } => {
292 observed_logs += log.len();
293 responder.send().unwrap();
294 }
295 _ => panic!("only testing backfill mode."),
296 }
297 }
298
299 observed_logs
300 }
301
302 fn provide_messages(summed_msg_size_bytes: usize, num_messages: usize) -> Vec<Arc<LogsData>> {
303 let per_msg_size = summed_msg_size_bytes / num_messages;
304 let mut message_vec = Vec::new();
305 for _ in 0..num_messages {
306 let byte_encoding = generate_byte_encoded_log(per_msg_size);
307 message_vec.push(Arc::new(diagnostics_message::from_logger(
308 get_test_identity().into(),
309 LoggerMessage::try_from(byte_encoding.as_bytes()).unwrap(),
310 )))
311 }
312
313 message_vec
314 }
315
316 fn generate_byte_encoded_log(target_size: usize) -> fx_log_packet_t {
318 let mut test_packet = test_packet();
319 let data_size = target_size - METADATA_SIZE;
320 let tag_size =
321 core::cmp::min(data_size / 2, fidl_fuchsia_logger::MAX_TAG_LEN_BYTES as usize);
322 let message_size = data_size - tag_size;
323
324 populate_packet(&mut test_packet, tag_size, message_size);
325 test_packet
326 }
327
328 fn test_packet() -> fx_log_packet_t {
329 let mut packet: fx_log_packet_t = Default::default();
330 packet.metadata.pid = 1;
331 packet.metadata.tid = 2;
332 packet.metadata.time = 3;
333 packet.metadata.severity = LogLevelFilter::Debug as i32;
334 packet.metadata.dropped_logs = 10;
335 packet
336 }
337
338 fn populate_packet(packet: &mut fx_log_packet_t, tag_count: usize, message_size: usize) {
339 let tag_start = 1;
340 let tag_end = tag_start + tag_count;
341
342 packet.data[0] = tag_count as c_char;
343 packet.fill_data(tag_start..tag_end, b'T' as _);
344 packet.data[tag_end] = 0; let message_start = tag_start + tag_count + 1;
347 let message_end = message_start + message_size;
348 packet.fill_data(message_start..message_end, b'D' as _);
349 }
350
351 fn get_test_identity() -> ComponentIdentity {
352 ComponentIdentity::new(
353 ExtendedMoniker::parse_str("./fake-test-env/bleebloo").unwrap(),
354 "fuchsia-pkg://fuchsia.com/testing123#test-component.cm",
355 )
356 }
357}