1use crate::logs::repository::LogsRepository;
6use crate::logs::servers::LogFreezeServer;
7use anyhow::Error;
8use diagnostics_data::{Data, Logs};
9use fidl_fuchsia_diagnostics::{Selector, StreamMode};
10use fidl_fuchsia_diagnostics_system::SerialLogControlRequestStream;
11use fuchsia_async::OnSignals;
12use fuchsia_trace as ftrace;
13use futures::channel::mpsc::UnboundedReceiver;
14use futures::future::{select, Either};
15use futures::{FutureExt, StreamExt};
16use log::warn;
17use selectors::FastError;
18use std::borrow::Cow;
19use std::collections::HashSet;
20use std::fmt::Display;
21use std::io::{self, Write};
22use std::pin::pin;
23use std::sync::Arc;
24use zx::Signals;
25
26const MAX_SERIAL_WRITE_SIZE: usize = 256;
27
28pub async fn launch_serial(
32 allow_serial_log_tags: Vec<String>,
33 deny_serial_log_tags: Vec<String>,
34 logs_repo: Arc<LogsRepository>,
35 writer: impl Write,
36 mut freeze_receiver: UnboundedReceiver<SerialLogControlRequestStream>,
37) {
38 let mut write_logs_to_serial =
39 pin!(SerialConfig::new(allow_serial_log_tags, deny_serial_log_tags)
40 .write_logs(logs_repo, writer)
41 .fuse());
42 loop {
43 let log_freezer_future = pin!(async {
44 let stream = (freeze_receiver.next().await)?;
46 let (client, server) = zx::EventPair::create();
47 LogFreezeServer::new(client).wait_for_client_freeze_request(stream).await;
49 Some(server)
50 }
51 .fuse());
52 let maybe_frozen_token = select(&mut write_logs_to_serial, log_freezer_future).await;
53 if let Either::Right((Some(token), _)) = maybe_frozen_token {
54 let _ = OnSignals::new(&token, Signals::EVENTPAIR_PEER_CLOSED).await;
57 } else {
58 break;
60 }
61 }
62}
63
64#[derive(Default)]
65pub struct SerialConfig {
66 selectors: Vec<Selector>,
67 denied_tags: HashSet<String>,
68}
69
70impl SerialConfig {
71 pub fn new<C, T>(allowed_components: Vec<C>, denied_tags: Vec<T>) -> Self
73 where
74 C: AsRef<str> + Display,
75 T: Into<String>,
76 {
77 let selectors = allowed_components
78 .into_iter()
79 .filter_map(|selector| {
80 match selectors::parse_component_selector::<FastError>(selector.as_ref()) {
81 Ok(s) => Some(Selector {
82 component_selector: Some(s),
83 tree_selector: None,
84 ..Selector::default()
85 }),
86 Err(err) => {
87 warn!(selector:%, err:?; "Failed to parse component selector");
88 None
89 }
90 }
91 })
92 .collect();
93 Self { selectors, denied_tags: HashSet::from_iter(denied_tags.into_iter().map(Into::into)) }
94 }
95
96 pub async fn write_logs<S: Write>(self, repo: Arc<LogsRepository>, mut sink: S) {
100 let Self { denied_tags, selectors } = self;
101 let mut log_stream = repo.logs_cursor(
102 StreamMode::SnapshotThenSubscribe,
103 Some(selectors),
104 ftrace::Id::random(),
105 );
106 while let Some(log) = log_stream.next().await {
107 SerialWriter::log(log.as_ref(), &denied_tags, &mut sink).ok();
108 }
109 }
110}
111
112#[derive(Default)]
114pub struct SerialSink;
115
116impl Write for SerialSink {
117 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
118 if cfg!(debug_assertions) {
119 debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
120 } else {
121 use std::sync::atomic::{AtomicBool, Ordering};
122 static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
123 if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
124 {
125 let size = buffer.len();
126 log::error!(
127 size;
128 "Skipping write to serial due to internal error. Exceeded max buffer size."
129 );
130 return Ok(buffer.len());
131 }
132 }
133 unsafe {
135 zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
136 }
137 Ok(buffer.len())
138 }
139
140 fn flush(&mut self) -> io::Result<()> {
141 Ok(())
142 }
143}
144
145struct SerialWriter<'a, S> {
146 buffer: Vec<u8>,
147 denied_tags: &'a HashSet<String>,
148 sink: &'a mut S,
149}
150
151impl<S: Write> Write for SerialWriter<'_, S> {
152 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
153 let count = (self.buffer.capacity() - self.buffer.len() - 1).min(data.len());
155 let actual_count = self.buffer.write(&data[..count])?;
156 debug_assert_eq!(actual_count, count);
157 if self.buffer.len() == self.buffer.capacity() - 1 {
158 self.flush()?;
159 }
160 Ok(actual_count)
161 }
162
163 fn flush(&mut self) -> io::Result<()> {
164 debug_assert!(self.buffer.len() < MAX_SERIAL_WRITE_SIZE);
165 let wrote = self.buffer.write(b"\n")?;
166 debug_assert_eq!(wrote, 1);
167 self.sink.write_all(self.buffer.as_slice())?;
168 self.buffer.clear();
169 Ok(())
170 }
171}
172
173impl<'a, S: Write> SerialWriter<'a, S> {
174 fn log(
175 log: &Data<Logs>,
176 denied_tags: &'a HashSet<String>,
177 sink: &'a mut S,
178 ) -> Result<(), Error> {
179 let mut this =
180 Self { buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE), sink, denied_tags };
181 write!(
182 &mut this,
183 "[{:05}.{:03}] {:05}:{:05}> [",
184 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
185 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
186 % 1000,
187 log.pid().unwrap_or(0),
188 log.tid().unwrap_or(0)
189 )?;
190
191 let empty_tags = log.tags().map(|tags| tags.is_empty()).unwrap_or(true);
192 if empty_tags {
193 write!(&mut this, "{}", log.component_name())?;
194 } else {
195 let tags = log.tags().unwrap();
197 for (i, tag) in tags.iter().enumerate() {
198 if this.denied_tags.contains(tag) {
199 return Ok(());
200 }
201 write!(&mut this, "{tag}")?;
202 if i < tags.len() - 1 {
203 write!(&mut this, ", ")?;
204 }
205 }
206 }
207
208 write!(&mut this, "] {}: ", log.severity())?;
209 let mut pending_message_parts = [Cow::Borrowed(log.msg().unwrap_or(""))]
210 .into_iter()
211 .chain(log.payload_keys_strings().map(|s| Cow::Owned(format!(" {s}"))));
212 let mut pending_str = None;
213
214 loop {
215 let (data, offset) = match pending_str.take() {
216 Some((s, offset)) => (s, offset),
217 None => match pending_message_parts.next() {
218 Some(s) => (s, 0),
219 None => break,
220 },
221 };
222 let count = this.write(&data.as_bytes()[offset..])?;
223 if offset + count < data.len() {
224 pending_str = Some((data, offset + count));
225 }
226 }
227 if !this.buffer.is_empty() {
228 this.flush()?;
229 }
230 Ok(())
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use fidl::endpoints::create_proxy_and_stream;
237 use fidl_fuchsia_diagnostics_system::SerialLogControlMarker;
238 use fuchsia_async::{self as fasync};
239 use futures::channel::mpsc::{self, unbounded};
240 use futures::SinkExt;
241 use std::future::{poll_fn, Future};
242 use std::task::Poll;
243
244 use super::*;
245 use crate::identity::ComponentIdentity;
246 use crate::logs::testing::make_message;
247 use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
248 use futures::FutureExt;
249 use moniker::ExtendedMoniker;
250 use std::pin::pin;
251 use zx::BootInstant;
252
253 struct TestSink {
254 snd: mpsc::UnboundedSender<String>,
255 }
256
257 impl TestSink {
258 fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
259 let (snd, rcv) = mpsc::unbounded();
260 (Self { snd }, rcv)
261 }
262 }
263
264 impl Write for TestSink {
265 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
266 let string = String::from_utf8(buf.to_vec()).expect("wrote valid utf8");
267 self.snd.unbounded_send(string).expect("sent item");
268 Ok(buf.len())
269 }
270
271 fn flush(&mut self) -> io::Result<()> {
272 Ok(())
273 }
274 }
275
276 #[fuchsia::test]
277 fn write_to_serial_handles_denied_tags() {
278 let log = LogsDataBuilder::new(BuilderArgs {
279 timestamp: BootInstant::from_nanos(1),
280 component_url: Some("url".into()),
281 moniker: "core/foo".try_into().unwrap(),
282 severity: Severity::Info,
283 })
284 .add_tag("denied-tag")
285 .build();
286 let denied_tags = HashSet::from_iter(["denied-tag".to_string()]);
287 let mut sink = Vec::new();
288 SerialWriter::log(&log, &denied_tags, &mut sink).expect("write succeeded");
289 assert!(sink.is_empty());
290 }
291
292 #[fuchsia::test]
293 fn write_to_serial_splits_lines() {
294 let message = concat!(
295 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
296 "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
297 "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
298 );
299 let log = LogsDataBuilder::new(BuilderArgs {
300 timestamp: BootInstant::from_nanos(123456789),
301 component_url: Some("url".into()),
302 moniker: "core/foo".try_into().unwrap(),
303 severity: Severity::Info,
304 })
305 .add_tag("bar")
306 .set_message(message)
307 .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
308 .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
309 .set_pid(1234)
310 .set_tid(5678)
311 .build();
312 let mut sink = Vec::new();
313 SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
314 assert_eq!(
315 String::from_utf8(sink).unwrap(),
316 format!(
317 "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
318 &message[..218],
319 &message[218..]
320 )
321 );
322 }
323
324 #[fuchsia::test]
325 fn when_no_tags_are_present_the_component_name_is_used() {
326 let log = LogsDataBuilder::new(BuilderArgs {
327 timestamp: BootInstant::from_nanos(123456789),
328 component_url: Some("url".into()),
329 moniker: "core/foo".try_into().unwrap(),
330 severity: Severity::Info,
331 })
332 .set_message("my msg")
333 .set_pid(1234)
334 .set_tid(5678)
335 .build();
336 let mut sink = Vec::new();
337 SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
338 assert_eq!(
339 String::from_utf8(sink).unwrap(),
340 "[00000.123] 01234:05678> [foo] INFO: my msg\n"
341 );
342 }
343
344 async fn poll_once<F: Future + Unpin>(mut future: F) {
345 poll_fn(|context| {
346 let _ = future.poll_unpin(context);
347 Poll::Ready(())
348 })
349 .await;
350 }
351
352 #[fuchsia::test]
353 async fn pauses_logs_correctly() {
354 let repo = LogsRepository::for_test(fasync::Scope::new());
355
356 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
357 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
358 "fuchsia-pkg://bootstrap-foo",
359 )));
360 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
361 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
362 "fuchsia-pkg://bootstrap-bar",
363 )));
364
365 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
366 ExtendedMoniker::parse_str("./core/foo").unwrap(),
367 "fuchsia-pkg://core-foo",
368 )));
369 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
370 ExtendedMoniker::parse_str("./core/baz").unwrap(),
371 "fuchsia-pkg://core-baz",
372 )));
373
374 bootstrap_foo_container.ingest_message(make_message(
375 "a",
376 None,
377 zx::BootInstant::from_nanos(1),
378 ));
379
380 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
381 let (sink, mut rcv) = TestSink::new();
382 let cloned_repo = Arc::clone(&repo);
383 let (mut sender, receiver) = unbounded();
384 let mut serial_task = pin!(async move {
385 let allowed = vec!["bootstrap/**".into(), "/core/foo".into()];
386 let denied = vec!["foo".into()];
387 launch_serial(allowed, denied, cloned_repo, sink, receiver).await;
388 }
389 .fuse());
390 bootstrap_bar_container.ingest_message(make_message(
391 "b",
392 Some("foo"),
393 zx::BootInstant::from_nanos(3),
394 ));
395
396 poll_once(&mut serial_task).await;
397 let received = rcv.next().now_or_never().unwrap().unwrap();
398
399 assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
400
401 let (client, server) = create_proxy_and_stream::<SerialLogControlMarker>();
402 sender.send(server).await.unwrap();
403 let freeze_token = futures::select! {
404 _ = serial_task => None,
405 token = client.freeze_serial_forwarding().fuse() => Some(token),
406 }
407 .unwrap();
408 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
409 let received_future = rcv.next();
410 poll_once(&mut serial_task).await;
411
412 assert!(received_future.now_or_never().is_none());
413 drop(freeze_token);
414 poll_once(&mut serial_task).await;
415 let received = rcv.next().now_or_never().unwrap().unwrap();
416
417 assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: c\n");
418 }
419
420 #[fuchsia::test]
421 async fn writes_ingested_logs() {
422 let serial_config = SerialConfig::new(vec!["bootstrap/**", "/core/foo"], vec!["foo"]);
423 let repo = LogsRepository::for_test(fasync::Scope::new());
424
425 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
426 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
427 "fuchsia-pkg://bootstrap-foo",
428 )));
429 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
430 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
431 "fuchsia-pkg://bootstrap-bar",
432 )));
433
434 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
435 ExtendedMoniker::parse_str("./core/foo").unwrap(),
436 "fuchsia-pkg://core-foo",
437 )));
438 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
439 ExtendedMoniker::parse_str("./core/baz").unwrap(),
440 "fuchsia-pkg://core-baz",
441 )));
442
443 bootstrap_foo_container.ingest_message(make_message(
444 "a",
445 None,
446 zx::BootInstant::from_nanos(1),
447 ));
448 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
449 let (sink, rcv) = TestSink::new();
450 let mut serial_task = pin!(serial_config.write_logs(Arc::clone(&repo), sink));
451 bootstrap_bar_container.ingest_message(make_message(
452 "b",
453 Some("foo"),
454 zx::BootInstant::from_nanos(3),
455 ));
456 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
457 poll_fn(|context| loop {
458 if Poll::Pending == serial_task.poll_unpin(context) {
459 return Poll::Ready(());
460 }
461 })
462 .await;
463 let received = rcv.take(2).collect::<Vec<_>>().now_or_never().unwrap();
464
465 assert_eq!(
469 received,
470 vec![
471 "[00000.000] 00001:00002> [foo] DEBUG: a\n",
472 "[00000.000] 00001:00002> [foo] DEBUG: c\n"
473 ]
474 );
475 }
476}