1use super::config::StreamSink;
6use async_trait::async_trait;
7use cm_logger::scoped::ScopedLogger;
8use cm_types::NamespacePath;
9use fidl::prelude::*;
10use fuchsia_component::client::connect_to_named_protocol_at_dir_root;
11use fuchsia_runtime::{HandleInfo, HandleType};
12use futures::StreamExt;
13use lazy_static::lazy_static;
14use log::warn;
15use namespace::Namespace;
16use once_cell::unsync::OnceCell;
17use socket_parsing::{NewlineChunker, NewlineChunkerError};
18use std::sync::Arc;
19use zx::HandleBased;
20use {fidl_fuchsia_logger as flogger, fidl_fuchsia_process as fproc, fuchsia_async as fasync};
21
22const STDOUT_FD: i32 = 1;
23const STDERR_FD: i32 = 2;
24
25lazy_static! {
26 static ref SVC_DIRECTORY_PATH: NamespacePath = "/svc".parse().unwrap();
27}
28
29const MAX_MESSAGE_SIZE: usize = 30720;
32
33pub fn bind_streams_to_syslog(
44 ns: &Namespace,
45 stdout_sink: StreamSink,
46 stderr_sink: StreamSink,
47) -> (Vec<fasync::Task<()>>, Vec<fproc::HandleInfo>) {
48 let mut tasks: Vec<fasync::Task<()>> = Vec::new();
49 let mut handles: Vec<fproc::HandleInfo> = Vec::new();
50
51 let logger = OnceCell::new();
54 let mut forward_stream = |sink, fd, level| {
55 if matches!(sink, StreamSink::Log) {
56 let (socket, handle_info) =
59 new_socket_bound_to_fd(fd).expect("failed to create socket");
60 handles.push(handle_info);
61
62 if let Some(l) = logger.get_or_init(|| create_namespace_logger(ns).map(Arc::new)) {
63 tasks.push(forward_socket_to_syslog(l.clone(), socket, level));
64 } else {
65 warn!("Tried forwarding file descriptor {fd} but didn't have a LogSink available.");
66 }
67 }
68 };
69
70 forward_stream(stdout_sink, STDOUT_FD, OutputLevel::Info);
71 forward_stream(stderr_sink, STDERR_FD, OutputLevel::Warn);
72
73 (tasks, handles)
74}
75
76fn create_namespace_logger(ns: &Namespace) -> Option<ScopedLogger> {
77 let svc_dir = ns.get(&SVC_DIRECTORY_PATH)?;
78 let logsink = connect_to_named_protocol_at_dir_root::<flogger::LogSinkMarker>(
79 svc_dir,
80 flogger::LogSinkMarker::PROTOCOL_NAME,
81 )
82 .ok()?;
83 ScopedLogger::create(logsink).ok()
84}
85
86fn forward_socket_to_syslog(
87 logger: Arc<ScopedLogger>,
88 socket: fasync::Socket,
89 level: OutputLevel,
90) -> fasync::Task<()> {
91 let mut writer = SyslogWriter::new(logger, level);
92 let task = fasync::Task::spawn(async move {
93 if let Err(error) = drain_lines(socket, &mut writer).await {
94 warn!(error:%; "Draining output stream failed");
95 }
96 });
97
98 task
99}
100
101fn new_socket_bound_to_fd(fd: i32) -> Result<(fasync::Socket, fproc::HandleInfo), zx::Status> {
102 let (tx, rx) = zx::Socket::create_stream();
103 let rx = fasync::Socket::from_socket(rx);
104 Ok((
105 rx,
106 fproc::HandleInfo {
107 handle: tx.into_handle(),
108 id: HandleInfo::new(HandleType::FileDescriptor, fd as u16).as_raw(),
109 },
110 ))
111}
112
113async fn drain_lines(
117 socket: fasync::Socket,
118 writer: &mut dyn LogWriter,
119) -> Result<(), NewlineChunkerError> {
120 let chunker = NewlineChunker::new(socket, MAX_MESSAGE_SIZE);
121 futures::pin_mut!(chunker);
122
123 while let Some(chunk_or_line) = chunker.next().await {
124 writer.write(&chunk_or_line?).await;
125 }
126
127 Ok(())
128}
129
130#[async_trait]
132trait LogWriter: Send {
133 async fn write(&mut self, bytes: &[u8]);
134}
135
136struct SyslogWriter {
137 logger: Arc<dyn log::Log + Send + Sync>,
138 level: OutputLevel,
139}
140
141#[derive(Copy, Clone)]
142enum OutputLevel {
143 Info,
144 Warn,
145}
146
147impl From<OutputLevel> for log::Level {
148 fn from(level: OutputLevel) -> log::Level {
149 match level {
150 OutputLevel::Info => log::Level::Info,
151 OutputLevel::Warn => log::Level::Warn,
152 }
153 }
154}
155
156impl SyslogWriter {
157 fn new(logger: Arc<dyn log::Log + Send + Sync>, level: OutputLevel) -> Self {
158 Self { logger, level }
159 }
160}
161
162#[async_trait]
163impl LogWriter for SyslogWriter {
164 async fn write(&mut self, bytes: &[u8]) {
165 let msg = String::from_utf8_lossy(&bytes);
166 self.logger.log(
167 &log::Record::builder().level(self.level.into()).args(format_args!("{msg}")).build(),
168 );
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use crate::tests::{create_fs_with_mock_logsink, MockServiceFs, MockServiceRequest};
176 use anyhow::{anyhow, format_err, Context, Error};
177 use async_trait::async_trait;
178 use diagnostics_message::MonikerWithUrl;
179 use fidl_fuchsia_component_runner as fcrunner;
180 use fidl_fuchsia_logger::LogSinkRequest;
181 use fuchsia_async::Task;
182 use futures::channel::mpsc;
183 use futures::{try_join, FutureExt, SinkExt};
184 use rand::distributions::{Alphanumeric, DistString as _};
185 use rand::thread_rng;
186 use std::sync::Mutex;
187
188 #[async_trait]
189 impl LogWriter for mpsc::Sender<String> {
190 async fn write(&mut self, bytes: &[u8]) {
191 let message =
192 std::str::from_utf8(&bytes).expect("Failed to decode bytes to utf8.").to_owned();
193 let () =
194 self.send(message).await.expect("Failed to send message to other end of mpsc.");
195 }
196 }
197
198 #[fuchsia::test]
199 async fn syslog_writer_decodes_valid_utf8_message() -> Result<(), Error> {
200 let (dir, ns_entries) = create_fs_with_mock_logsink()?;
201
202 let ((), actual) = try_join!(
203 write_to_syslog_or_panic(ns_entries, b"Hello World!"),
204 read_message_from_syslog(dir)
205 )?;
206
207 assert_eq!(actual, Some("Hello World!".to_owned()));
208 Ok(())
209 }
210
211 #[fuchsia::test]
212 async fn syslog_writer_decodes_non_utf8_message() -> Result<(), Error> {
213 let (dir, ns_entries) = create_fs_with_mock_logsink()?;
214
215 let ((), actual) = try_join!(
216 write_to_syslog_or_panic(ns_entries, b"Hello \xF0\x90\x80World!"),
217 read_message_from_syslog(dir)
218 )?;
219
220 assert_eq!(actual, Some("Hello �World!".to_owned()));
221 Ok(())
222 }
223
224 #[fuchsia::test]
225 async fn drain_lines_splits_into_max_size_chunks() -> Result<(), Error> {
226 let (tx, rx) = zx::Socket::create_stream();
227 let rx = fasync::Socket::from_socket(rx);
228 let (mut sender, recv) = create_mock_logger();
229 let msg = get_random_string(MAX_MESSAGE_SIZE * 4);
230
231 let () = take_and_write_to_socket(tx, &msg)?;
232 let (actual, ()) =
233 try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
234 drain_lines(rx, &mut sender).await.map_err(Into::into)
235 })?;
236
237 assert_eq!(
238 actual,
239 msg.as_bytes()
240 .chunks(MAX_MESSAGE_SIZE)
241 .map(|bytes| std::str::from_utf8(bytes).expect("Bytes are not utf8.").to_owned())
242 .collect::<Vec<String>>()
243 );
244
245 Ok(())
246 }
247
248 #[fuchsia::test]
249 async fn drain_lines_splits_at_newline() -> Result<(), Error> {
250 let (tx, rx) = zx::Socket::create_stream();
251 let rx = fasync::Socket::from_socket(rx);
252 let (mut sender, recv) = create_mock_logger();
253 let msg = std::iter::repeat_with(|| {
254 Alphanumeric.sample_string(&mut thread_rng(), MAX_MESSAGE_SIZE - 1)
255 })
256 .take(3)
257 .collect::<Vec<_>>()
258 .join("\n");
259
260 let () = take_and_write_to_socket(tx, &msg)?;
261 let (actual, ()) =
262 try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
263 drain_lines(rx, &mut sender).await.map_err(Into::into)
264 })?;
265
266 assert_eq!(actual, msg.split("\n").map(str::to_owned).collect::<Vec<String>>());
267 Ok(())
268 }
269
270 #[fuchsia::test]
271 async fn drain_lines_writes_when_message_is_received() -> Result<(), Error> {
272 let (tx, rx) = zx::Socket::create_stream();
273 let rx = fasync::Socket::from_socket(rx);
274 let (mut sender, mut recv) = create_mock_logger();
275 let messages: Vec<String> = vec!["Hello!\n".to_owned(), "World!\n".to_owned()];
276
277 let ((), ()) = try_join!(
278 async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
279 async move {
280 for mut message in messages.into_iter() {
281 let () = write_to_socket(&tx, &message)?;
282 let logged_messaged =
283 recv.next().await.context("Receiver channel closed. Got no message.")?;
284 message.pop();
286 assert_eq!(logged_messaged, message);
287 }
288
289 Ok(())
290 }
291 )?;
292
293 Ok(())
294 }
295
296 #[fuchsia::test]
297 async fn drain_lines_waits_for_entire_lines() -> Result<(), Error> {
298 let (tx, rx) = zx::Socket::create_stream();
299 let rx = fasync::Socket::from_socket(rx);
300 let (mut sender, mut recv) = create_mock_logger();
301
302 let ((), ()) = try_join!(
303 async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
304 async move {
305 let () = write_to_socket(&tx, "Hello\nWorld")?;
306 let logged_messaged =
307 recv.next().await.context("Receiver channel closed. Got no message.")?;
308 assert_eq!(logged_messaged, "Hello");
309 let () = write_to_socket(&tx, "Hello\nAgain")?;
310 std::mem::drop(tx);
311 let logged_messaged =
312 recv.next().await.context("Receiver channel closed. Got no message.")?;
313 assert_eq!(logged_messaged, "WorldHello");
314 let logged_messaged =
315 recv.next().await.context("Receiver channel closed. Got no message.")?;
316 assert_eq!(logged_messaged, "Again");
317 Ok(())
318 }
319 )?;
320
321 Ok(())
322 }
323
324 #[fuchsia::test]
325 async fn drain_lines_collapses_repeated_newlines() -> Result<(), Error> {
326 let (tx, rx) = zx::Socket::create_stream();
327 let rx = fasync::Socket::from_socket(rx);
328 let (mut sender, mut recv) = create_mock_logger();
329
330 let drainer = Task::spawn(async move { drain_lines(rx, &mut sender).await });
331
332 write_to_socket(&tx, "Hello\n\nWorld\n")?;
333 assert_eq!(recv.next().await.unwrap(), "Hello");
334 assert_eq!(recv.next().await.unwrap(), "World");
335
336 drop(tx);
337 drainer.await?;
338 assert_eq!(recv.next().await, None);
339
340 Ok(())
341 }
342
343 async fn write_to_syslog_or_panic(
344 ns_entries: Vec<fcrunner::ComponentNamespaceEntry>,
345 message: &[u8],
346 ) -> Result<(), Error> {
347 let ns = Namespace::try_from(ns_entries).context("Failed to create Namespace")?;
348 let logger = create_namespace_logger(&ns).context("Failed to create ScopedLogger")?;
349 let mut writer = SyslogWriter::new(Arc::new(logger), OutputLevel::Info);
350 writer.write(message).await;
351
352 Ok(())
353 }
354
355 pub fn get_message_logged_to_socket(socket: zx::Socket) -> Option<String> {
358 let mut buffer: [u8; 1024] = [0; 1024];
359 match socket.read(&mut buffer) {
360 Ok(read_len) => {
361 let msg = diagnostics_message::from_structured(
362 MonikerWithUrl {
363 url: "fuchsia-pkg://fuchsia.com/test-pkg#meta/test-component.cm".into(),
364 moniker: "test-pkg/test-component".try_into().unwrap(),
365 },
366 &buffer[..read_len],
367 )
368 .expect("must be able to decode a valid message from buffer");
369
370 msg.msg().map(String::from)
371 }
372 Err(_) => None,
373 }
374 }
375
376 async fn read_message_from_syslog(
377 dir: MockServiceFs<'static>,
378 ) -> Result<Option<String>, Error> {
379 let message_logged = Arc::new(Mutex::new(Option::<String>::None));
380 dir.for_each_concurrent(None, |request: MockServiceRequest| match request {
381 MockServiceRequest::LogSink(mut r) => {
382 let message_logged_copy = Arc::clone(&message_logged);
383 async move {
384 match r.next().await.expect("stream error").expect("fidl error") {
385 LogSinkRequest::Connect { .. } => {
386 panic!("Unexpected call to `Connect`");
387 }
388 LogSinkRequest::ConnectStructured { socket, .. } => {
389 *message_logged_copy.lock().unwrap() =
390 get_message_logged_to_socket(socket);
391 }
392 LogSinkRequest::WaitForInterestChange { .. } => {
393 }
395 LogSinkRequest::_UnknownMethod { .. } => {
396 panic!("Unexpected unknown method")
397 }
398 }
399 }
400 }
401 })
402 .await;
403
404 let message_logged =
405 message_logged.lock().map_err(|_| anyhow!("Failed to lock mutex"))?.clone();
406 Ok(message_logged)
407 }
408
409 fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
410 write_to_socket(&socket, &message)
411 }
412
413 fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
414 let bytes_written =
415 socket.write(message.as_bytes()).context("Failed to write to socket")?;
416 match bytes_written == message.len() {
417 true => Ok(()),
418 false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
419 }
420 }
421
422 fn create_mock_logger() -> (mpsc::Sender<String>, mpsc::Receiver<String>) {
423 mpsc::channel::<String>(20)
424 }
425
426 fn get_random_string(size: usize) -> String {
427 Alphanumeric.sample_string(&mut thread_rng(), size)
428 }
429}