1use super::config::StreamSink;
6use async_trait::async_trait;
7use cm_logger::scoped::ScopedLogger;
8use cm_types::NamespacePath;
9use fidl::prelude::*;
10use fuchsia_component::client::connect::connect_to_named_protocol_at_dir_root;
11use fuchsia_runtime::{HandleInfo, HandleType};
12use futures::StreamExt;
13use lazy_static::lazy_static;
14use log::warn;
15use namespace::Namespace;
16use once_cell::unsync::OnceCell;
17use socket_parsing::{NewlineChunker, NewlineChunkerError};
18use std::sync::Arc;
19use zx::HandleBased;
20use {fidl_fuchsia_logger as flogger, fidl_fuchsia_process as fproc, fuchsia_async as fasync};
21
22const STDOUT_FD: i32 = 1;
23const STDERR_FD: i32 = 2;
24
25lazy_static! {
26 static ref SVC_DIRECTORY_PATH: NamespacePath = "/svc".parse().unwrap();
27}
28
29const MAX_MESSAGE_SIZE: usize = 30720;
32
33pub fn bind_streams_to_syslog(
44 ns: &Namespace,
45 stdout_sink: StreamSink,
46 stderr_sink: StreamSink,
47) -> (Vec<fasync::Task<()>>, Vec<fproc::HandleInfo>) {
48 let mut tasks: Vec<fasync::Task<()>> = Vec::new();
49 let mut handles: Vec<fproc::HandleInfo> = Vec::new();
50
51 let logger = OnceCell::new();
54 let mut forward_stream = |sink, fd, level| {
55 if matches!(sink, StreamSink::Log) {
56 let (socket, handle_info) =
59 new_socket_bound_to_fd(fd).expect("failed to create socket");
60 handles.push(handle_info);
61
62 if let Some(l) = logger.get_or_init(|| create_namespace_logger(ns).map(Arc::new)) {
63 tasks.push(forward_socket_to_syslog(l.clone(), socket, level));
64 } else {
65 warn!("Tried forwarding file descriptor {fd} but didn't have a LogSink available.");
66 }
67 }
68 };
69
70 forward_stream(stdout_sink, STDOUT_FD, OutputLevel::Info);
71 forward_stream(stderr_sink, STDERR_FD, OutputLevel::Warn);
72
73 (tasks, handles)
74}
75
76fn create_namespace_logger(ns: &Namespace) -> Option<ScopedLogger> {
77 let svc_dir = ns.get(&SVC_DIRECTORY_PATH)?;
78 let logsink =
79 connect_to_named_protocol_at_dir_root(svc_dir, flogger::LogSinkMarker::PROTOCOL_NAME)
80 .ok()?;
81 ScopedLogger::create(logsink).ok()
82}
83
84fn forward_socket_to_syslog(
85 logger: Arc<ScopedLogger>,
86 socket: fasync::Socket,
87 level: OutputLevel,
88) -> fasync::Task<()> {
89 let mut writer = SyslogWriter::new(logger, level);
90 let task = fasync::Task::spawn(async move {
91 if let Err(error) = drain_lines(socket, &mut writer).await {
92 warn!(error:%; "Draining output stream failed");
93 }
94 });
95
96 task
97}
98
99fn new_socket_bound_to_fd(fd: i32) -> Result<(fasync::Socket, fproc::HandleInfo), zx::Status> {
100 let (tx, rx) = zx::Socket::create_stream();
101 let rx = fasync::Socket::from_socket(rx);
102 Ok((
103 rx,
104 fproc::HandleInfo {
105 handle: tx.into_handle(),
106 id: HandleInfo::new(HandleType::FileDescriptor, fd as u16).as_raw(),
107 },
108 ))
109}
110
111async fn drain_lines(
115 socket: fasync::Socket,
116 writer: &mut dyn LogWriter,
117) -> Result<(), NewlineChunkerError> {
118 let chunker = NewlineChunker::new(socket, MAX_MESSAGE_SIZE);
119 futures::pin_mut!(chunker);
120
121 while let Some(chunk_or_line) = chunker.next().await {
122 writer.write(&chunk_or_line?).await;
123 }
124
125 Ok(())
126}
127
128#[async_trait]
130trait LogWriter: Send {
131 async fn write(&mut self, bytes: &[u8]);
132}
133
134struct SyslogWriter {
135 logger: Arc<dyn log::Log + Send + Sync>,
136 level: OutputLevel,
137}
138
139#[derive(Copy, Clone)]
140enum OutputLevel {
141 Info,
142 Warn,
143}
144
145impl From<OutputLevel> for log::Level {
146 fn from(level: OutputLevel) -> log::Level {
147 match level {
148 OutputLevel::Info => log::Level::Info,
149 OutputLevel::Warn => log::Level::Warn,
150 }
151 }
152}
153
154impl SyslogWriter {
155 fn new(logger: Arc<dyn log::Log + Send + Sync>, level: OutputLevel) -> Self {
156 Self { logger, level }
157 }
158}
159
160#[async_trait]
161impl LogWriter for SyslogWriter {
162 async fn write(&mut self, bytes: &[u8]) {
163 let msg = String::from_utf8_lossy(&bytes);
164 self.logger.log(
165 &log::Record::builder().level(self.level.into()).args(format_args!("{msg}")).build(),
166 );
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use crate::tests::{create_fs_with_mock_logsink, MockServiceFs, MockServiceRequest};
174 use anyhow::{anyhow, format_err, Context, Error};
175 use async_trait::async_trait;
176 use diagnostics_message::MonikerWithUrl;
177 use fidl_fuchsia_component_runner as fcrunner;
178 use fidl_fuchsia_logger::LogSinkRequest;
179 use fuchsia_async::Task;
180 use futures::channel::mpsc;
181 use futures::{try_join, FutureExt, SinkExt};
182 use rand::distributions::{Alphanumeric, DistString as _};
183 use rand::thread_rng;
184 use std::sync::Mutex;
185
186 #[async_trait]
187 impl LogWriter for mpsc::Sender<String> {
188 async fn write(&mut self, bytes: &[u8]) {
189 let message =
190 std::str::from_utf8(&bytes).expect("Failed to decode bytes to utf8.").to_owned();
191 let () =
192 self.send(message).await.expect("Failed to send message to other end of mpsc.");
193 }
194 }
195
196 #[fuchsia::test]
197 async fn syslog_writer_decodes_valid_utf8_message() -> Result<(), Error> {
198 let (dir, ns_entries) = create_fs_with_mock_logsink()?;
199
200 let ((), actual) = try_join!(
201 write_to_syslog_or_panic(ns_entries, b"Hello World!"),
202 read_message_from_syslog(dir)
203 )?;
204
205 assert_eq!(actual, Some("Hello World!".to_owned()));
206 Ok(())
207 }
208
209 #[fuchsia::test]
210 async fn syslog_writer_decodes_non_utf8_message() -> Result<(), Error> {
211 let (dir, ns_entries) = create_fs_with_mock_logsink()?;
212
213 let ((), actual) = try_join!(
214 write_to_syslog_or_panic(ns_entries, b"Hello \xF0\x90\x80World!"),
215 read_message_from_syslog(dir)
216 )?;
217
218 assert_eq!(actual, Some("Hello �World!".to_owned()));
219 Ok(())
220 }
221
222 #[fuchsia::test]
223 async fn drain_lines_splits_into_max_size_chunks() -> Result<(), Error> {
224 let (tx, rx) = zx::Socket::create_stream();
225 let rx = fasync::Socket::from_socket(rx);
226 let (mut sender, recv) = create_mock_logger();
227 let msg = get_random_string(MAX_MESSAGE_SIZE * 4);
228
229 let () = take_and_write_to_socket(tx, &msg)?;
230 let (actual, ()) =
231 try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
232 drain_lines(rx, &mut sender).await.map_err(Into::into)
233 })?;
234
235 assert_eq!(
236 actual,
237 msg.as_bytes()
238 .chunks(MAX_MESSAGE_SIZE)
239 .map(|bytes| std::str::from_utf8(bytes).expect("Bytes are not utf8.").to_owned())
240 .collect::<Vec<String>>()
241 );
242
243 Ok(())
244 }
245
246 #[fuchsia::test]
247 async fn drain_lines_splits_at_newline() -> Result<(), Error> {
248 let (tx, rx) = zx::Socket::create_stream();
249 let rx = fasync::Socket::from_socket(rx);
250 let (mut sender, recv) = create_mock_logger();
251 let msg = std::iter::repeat_with(|| {
252 Alphanumeric.sample_string(&mut thread_rng(), MAX_MESSAGE_SIZE - 1)
253 })
254 .take(3)
255 .collect::<Vec<_>>()
256 .join("\n");
257
258 let () = take_and_write_to_socket(tx, &msg)?;
259 let (actual, ()) =
260 try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
261 drain_lines(rx, &mut sender).await.map_err(Into::into)
262 })?;
263
264 assert_eq!(actual, msg.split("\n").map(str::to_owned).collect::<Vec<String>>());
265 Ok(())
266 }
267
268 #[fuchsia::test]
269 async fn drain_lines_writes_when_message_is_received() -> Result<(), Error> {
270 let (tx, rx) = zx::Socket::create_stream();
271 let rx = fasync::Socket::from_socket(rx);
272 let (mut sender, mut recv) = create_mock_logger();
273 let messages: Vec<String> = vec!["Hello!\n".to_owned(), "World!\n".to_owned()];
274
275 let ((), ()) = try_join!(
276 async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
277 async move {
278 for mut message in messages.into_iter() {
279 let () = write_to_socket(&tx, &message)?;
280 let logged_messaged =
281 recv.next().await.context("Receiver channel closed. Got no message.")?;
282 message.pop();
284 assert_eq!(logged_messaged, message);
285 }
286
287 Ok(())
288 }
289 )?;
290
291 Ok(())
292 }
293
294 #[fuchsia::test]
295 async fn drain_lines_waits_for_entire_lines() -> Result<(), Error> {
296 let (tx, rx) = zx::Socket::create_stream();
297 let rx = fasync::Socket::from_socket(rx);
298 let (mut sender, mut recv) = create_mock_logger();
299
300 let ((), ()) = try_join!(
301 async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
302 async move {
303 let () = write_to_socket(&tx, "Hello\nWorld")?;
304 let logged_messaged =
305 recv.next().await.context("Receiver channel closed. Got no message.")?;
306 assert_eq!(logged_messaged, "Hello");
307 let () = write_to_socket(&tx, "Hello\nAgain")?;
308 std::mem::drop(tx);
309 let logged_messaged =
310 recv.next().await.context("Receiver channel closed. Got no message.")?;
311 assert_eq!(logged_messaged, "WorldHello");
312 let logged_messaged =
313 recv.next().await.context("Receiver channel closed. Got no message.")?;
314 assert_eq!(logged_messaged, "Again");
315 Ok(())
316 }
317 )?;
318
319 Ok(())
320 }
321
322 #[fuchsia::test]
323 async fn drain_lines_collapses_repeated_newlines() -> Result<(), Error> {
324 let (tx, rx) = zx::Socket::create_stream();
325 let rx = fasync::Socket::from_socket(rx);
326 let (mut sender, mut recv) = create_mock_logger();
327
328 let drainer = Task::spawn(async move { drain_lines(rx, &mut sender).await });
329
330 write_to_socket(&tx, "Hello\n\nWorld\n")?;
331 assert_eq!(recv.next().await.unwrap(), "Hello");
332 assert_eq!(recv.next().await.unwrap(), "World");
333
334 drop(tx);
335 drainer.await?;
336 assert_eq!(recv.next().await, None);
337
338 Ok(())
339 }
340
341 async fn write_to_syslog_or_panic(
342 ns_entries: Vec<fcrunner::ComponentNamespaceEntry>,
343 message: &[u8],
344 ) -> Result<(), Error> {
345 let ns = Namespace::try_from(ns_entries).context("Failed to create Namespace")?;
346 let logger = create_namespace_logger(&ns).context("Failed to create ScopedLogger")?;
347 let mut writer = SyslogWriter::new(Arc::new(logger), OutputLevel::Info);
348 writer.write(message).await;
349
350 Ok(())
351 }
352
353 pub fn get_message_logged_to_socket(socket: zx::Socket) -> Option<String> {
356 let mut buffer: [u8; 1024] = [0; 1024];
357 match socket.read(&mut buffer) {
358 Ok(read_len) => {
359 let msg = diagnostics_message::from_structured(
360 MonikerWithUrl {
361 url: "fuchsia-pkg://fuchsia.com/test-pkg#meta/test-component.cm".into(),
362 moniker: "test-pkg/test-component".try_into().unwrap(),
363 },
364 &buffer[..read_len],
365 )
366 .expect("must be able to decode a valid message from buffer");
367
368 msg.msg().map(String::from)
369 }
370 Err(_) => None,
371 }
372 }
373
374 async fn read_message_from_syslog(
375 dir: MockServiceFs<'static>,
376 ) -> Result<Option<String>, Error> {
377 let message_logged = Arc::new(Mutex::new(Option::<String>::None));
378 dir.for_each_concurrent(None, |request: MockServiceRequest| match request {
379 MockServiceRequest::LogSink(mut r) => {
380 let message_logged_copy = Arc::clone(&message_logged);
381 async move {
382 match r.next().await.expect("stream error").expect("fidl error") {
383 LogSinkRequest::Connect { .. } => {
384 panic!("Unexpected call to `Connect`");
385 }
386 LogSinkRequest::ConnectStructured { socket, .. } => {
387 *message_logged_copy.lock().unwrap() =
388 get_message_logged_to_socket(socket);
389 }
390 LogSinkRequest::WaitForInterestChange { .. } => {
391 }
393 LogSinkRequest::_UnknownMethod { .. } => {
394 panic!("Unexpected unknown method")
395 }
396 }
397 }
398 }
399 })
400 .await;
401
402 let message_logged =
403 message_logged.lock().map_err(|_| anyhow!("Failed to lock mutex"))?.clone();
404 Ok(message_logged)
405 }
406
407 fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
408 write_to_socket(&socket, &message)
409 }
410
411 fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
412 let bytes_written =
413 socket.write(message.as_bytes()).context("Failed to write to socket")?;
414 match bytes_written == message.len() {
415 true => Ok(()),
416 false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
417 }
418 }
419
420 fn create_mock_logger() -> (mpsc::Sender<String>, mpsc::Receiver<String>) {
421 mpsc::channel::<String>(20)
422 }
423
424 fn get_random_string(size: usize) -> String {
425 Alphanumeric.sample_string(&mut thread_rng(), size)
426 }
427}