1use crate::logs::repository::LogsRepository;
6use anyhow::Error;
7use diagnostics_data::{Data, Logs};
8use fidl_fuchsia_diagnostics::{Selector, StreamMode};
9use fuchsia_trace as ftrace;
10use futures::StreamExt;
11use log::warn;
12use selectors::FastError;
13use std::borrow::Cow;
14use std::collections::HashSet;
15use std::fmt::Display;
16use std::io::{self, Write};
17use std::sync::Arc;
18
19const MAX_SERIAL_WRITE_SIZE: usize = 256;
20
21#[derive(Default)]
22pub struct SerialConfig {
23 selectors: Vec<Selector>,
24 denied_tags: HashSet<String>,
25}
26
27impl SerialConfig {
28 pub fn new<C, T>(allowed_components: Vec<C>, denied_tags: Vec<T>) -> Self
30 where
31 C: AsRef<str> + Display,
32 T: Into<String>,
33 {
34 let selectors = allowed_components
35 .into_iter()
36 .filter_map(|selector| {
37 match selectors::parse_component_selector::<FastError>(selector.as_ref()) {
38 Ok(s) => Some(Selector {
39 component_selector: Some(s),
40 tree_selector: None,
41 ..Selector::default()
42 }),
43 Err(err) => {
44 warn!(selector:%, err:?; "Failed to parse component selector");
45 None
46 }
47 }
48 })
49 .collect();
50 Self { selectors, denied_tags: HashSet::from_iter(denied_tags.into_iter().map(Into::into)) }
51 }
52
53 pub async fn write_logs<S: Write>(self, repo: Arc<LogsRepository>, mut sink: S) {
57 let Self { denied_tags, selectors } = self;
58 let mut log_stream = repo.logs_cursor(
59 StreamMode::SnapshotThenSubscribe,
60 Some(selectors),
61 ftrace::Id::random(),
62 );
63 while let Some(log) = log_stream.next().await {
64 SerialWriter::log(log.as_ref(), &denied_tags, &mut sink).ok();
65 }
66 }
67}
68
69#[derive(Default)]
71pub struct SerialSink;
72
73impl Write for SerialSink {
74 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
75 if cfg!(debug_assertions) {
76 debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
77 } else {
78 use std::sync::atomic::{AtomicBool, Ordering};
79 static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
80 if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
81 {
82 let size = buffer.len();
83 log::error!(
84 size;
85 "Skipping write to serial due to internal error. Exceeded max buffer size."
86 );
87 return Ok(buffer.len());
88 }
89 }
90 unsafe {
92 zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
93 }
94 Ok(buffer.len())
95 }
96
97 fn flush(&mut self) -> io::Result<()> {
98 Ok(())
99 }
100}
101
102struct SerialWriter<'a, S> {
103 buffer: Vec<u8>,
104 denied_tags: &'a HashSet<String>,
105 sink: &'a mut S,
106}
107
108impl<S: Write> Write for SerialWriter<'_, S> {
109 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
110 let count = (self.buffer.capacity() - self.buffer.len() - 1).min(data.len());
112 let actual_count = self.buffer.write(&data[..count])?;
113 debug_assert_eq!(actual_count, count);
114 if self.buffer.len() == self.buffer.capacity() - 1 {
115 self.flush()?;
116 }
117 Ok(actual_count)
118 }
119
120 fn flush(&mut self) -> io::Result<()> {
121 debug_assert!(self.buffer.len() < MAX_SERIAL_WRITE_SIZE);
122 let wrote = self.buffer.write(b"\n")?;
123 debug_assert_eq!(wrote, 1);
124 self.sink.write_all(self.buffer.as_slice())?;
125 self.buffer.clear();
126 Ok(())
127 }
128}
129
130impl<'a, S: Write> SerialWriter<'a, S> {
131 fn log(
132 log: &Data<Logs>,
133 denied_tags: &'a HashSet<String>,
134 sink: &'a mut S,
135 ) -> Result<(), Error> {
136 let mut this =
137 Self { buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE), sink, denied_tags };
138 write!(
139 &mut this,
140 "[{:05}.{:03}] {:05}:{:05}> [",
141 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
142 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
143 % 1000,
144 log.pid().unwrap_or(0),
145 log.tid().unwrap_or(0)
146 )?;
147
148 let empty_tags = log.tags().map(|tags| tags.is_empty()).unwrap_or(true);
149 if empty_tags {
150 write!(&mut this, "{}", log.component_name())?;
151 } else {
152 let tags = log.tags().unwrap();
154 for (i, tag) in tags.iter().enumerate() {
155 if this.denied_tags.contains(tag) {
156 return Ok(());
157 }
158 write!(&mut this, "{}", tag)?;
159 if i < tags.len() - 1 {
160 write!(&mut this, ", ")?;
161 }
162 }
163 }
164
165 write!(&mut this, "] {}: ", log.severity())?;
166 let mut pending_message_parts = [Cow::Borrowed(log.msg().unwrap_or(""))]
167 .into_iter()
168 .chain(log.payload_keys_strings().map(|s| Cow::Owned(format!(" {}", s))));
169 let mut pending_str = None;
170
171 loop {
172 let (data, offset) = match pending_str.take() {
173 Some((s, offset)) => (s, offset),
174 None => match pending_message_parts.next() {
175 Some(s) => (s, 0),
176 None => break,
177 },
178 };
179 let count = this.write(&data.as_bytes()[offset..])?;
180 if offset + count < data.len() {
181 pending_str = Some((data, offset + count));
182 }
183 }
184 if !this.buffer.is_empty() {
185 this.flush()?;
186 }
187 Ok(())
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::identity::ComponentIdentity;
195 use crate::logs::testing::make_message;
196 use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
197 use fuchsia_async as fasync;
198 use futures::channel::mpsc;
199 use moniker::ExtendedMoniker;
200 use zx::BootInstant;
201
202 struct TestSink {
203 snd: mpsc::UnboundedSender<String>,
204 }
205
206 impl TestSink {
207 fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
208 let (snd, rcv) = mpsc::unbounded();
209 (Self { snd }, rcv)
210 }
211 }
212
213 impl Write for TestSink {
214 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
215 let string = String::from_utf8(buf.to_vec()).expect("wrote valid utf8");
216 self.snd.unbounded_send(string).expect("sent item");
217 Ok(buf.len())
218 }
219
220 fn flush(&mut self) -> io::Result<()> {
221 Ok(())
222 }
223 }
224
225 #[fuchsia::test]
226 fn write_to_serial_handles_denied_tags() {
227 let log = LogsDataBuilder::new(BuilderArgs {
228 timestamp: BootInstant::from_nanos(1),
229 component_url: Some("url".into()),
230 moniker: "core/foo".try_into().unwrap(),
231 severity: Severity::Info,
232 })
233 .add_tag("denied-tag")
234 .build();
235 let denied_tags = HashSet::from_iter(["denied-tag".to_string()]);
236 let mut sink = Vec::new();
237 SerialWriter::log(&log, &denied_tags, &mut sink).expect("write succeeded");
238 assert!(sink.is_empty());
239 }
240
241 #[fuchsia::test]
242 fn write_to_serial_splits_lines() {
243 let message = concat!(
244 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
245 "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
246 "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
247 );
248 let log = LogsDataBuilder::new(BuilderArgs {
249 timestamp: BootInstant::from_nanos(123456789),
250 component_url: Some("url".into()),
251 moniker: "core/foo".try_into().unwrap(),
252 severity: Severity::Info,
253 })
254 .add_tag("bar")
255 .set_message(message)
256 .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
257 .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
258 .set_pid(1234)
259 .set_tid(5678)
260 .build();
261 let mut sink = Vec::new();
262 SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
263 assert_eq!(
264 String::from_utf8(sink).unwrap(),
265 format!(
266 "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
267 &message[..218],
268 &message[218..]
269 )
270 );
271 }
272
273 #[fuchsia::test]
274 fn when_no_tags_are_present_the_component_name_is_used() {
275 let log = LogsDataBuilder::new(BuilderArgs {
276 timestamp: BootInstant::from_nanos(123456789),
277 component_url: Some("url".into()),
278 moniker: "core/foo".try_into().unwrap(),
279 severity: Severity::Info,
280 })
281 .set_message("my msg")
282 .set_pid(1234)
283 .set_tid(5678)
284 .build();
285 let mut sink = Vec::new();
286 SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
287 assert_eq!(
288 String::from_utf8(sink).unwrap(),
289 "[00000.123] 01234:05678> [foo] INFO: my msg\n"
290 );
291 }
292
293 #[fuchsia::test]
294 async fn writes_ingested_logs() {
295 let serial_config = SerialConfig::new(vec!["bootstrap/**", "/core/foo"], vec!["foo"]);
296 let repo = LogsRepository::for_test(fasync::Scope::new());
297
298 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
299 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
300 "fuchsia-pkg://bootstrap-foo",
301 )));
302 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
303 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
304 "fuchsia-pkg://bootstrap-bar",
305 )));
306
307 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
308 ExtendedMoniker::parse_str("./core/foo").unwrap(),
309 "fuchsia-pkg://core-foo",
310 )));
311 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
312 ExtendedMoniker::parse_str("./core/baz").unwrap(),
313 "fuchsia-pkg://core-baz",
314 )));
315
316 bootstrap_foo_container.ingest_message(make_message(
317 "a",
318 None,
319 zx::BootInstant::from_nanos(1),
320 ));
321 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
322 let (sink, rcv) = TestSink::new();
323 let _serial_task = fasync::Task::spawn(serial_config.write_logs(Arc::clone(&repo), sink));
324 bootstrap_bar_container.ingest_message(make_message(
325 "b",
326 Some("foo"),
327 zx::BootInstant::from_nanos(3),
328 ));
329 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
330
331 let received = rcv.take(2).collect::<Vec<_>>().await;
332
333 assert_eq!(
337 received,
338 vec![
339 "[00000.000] 00001:00002> [foo] DEBUG: a\n",
340 "[00000.000] 00001:00002> [foo] DEBUG: c\n"
341 ]
342 );
343 }
344}