elf_runner/
stdout.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::config::StreamSink;
6use async_trait::async_trait;
7use cm_logger::scoped::ScopedLogger;
8use cm_types::NamespacePath;
9use fidl::prelude::*;
10use fuchsia_component::client::connect::connect_to_named_protocol_at_dir_root;
11use fuchsia_runtime::{HandleInfo, HandleType};
12use futures::StreamExt;
13use lazy_static::lazy_static;
14use log::warn;
15use namespace::Namespace;
16use once_cell::unsync::OnceCell;
17use socket_parsing::{NewlineChunker, NewlineChunkerError};
18use std::sync::Arc;
19use zx::HandleBased;
20use {fidl_fuchsia_logger as flogger, fidl_fuchsia_process as fproc, fuchsia_async as fasync};
21
22const STDOUT_FD: i32 = 1;
23const STDERR_FD: i32 = 2;
24
25lazy_static! {
26    static ref SVC_DIRECTORY_PATH: NamespacePath = "/svc".parse().unwrap();
27}
28
29/// Max size for message when draining input stream socket. This number is
30/// slightly smaller than size allowed by Archivist (LogSink service implementation).
31const MAX_MESSAGE_SIZE: usize = 30720;
32
33/// Bind stdout or stderr streams to syslog. This function binds either or both
34/// output streams to syslog depending on value provided for each streams'
35/// StreamSink. If the value for an output stream is set to StreamSink::Log,
36/// that stream's file descriptor will be bound to syslog. All writes on that
37// fd will be forwarded to syslog and will register as log entries. For stdout,
38// the messages will be tagged with severity INFO. For stderr, the messages
39// will be tagged with severity WARN. A task is created to listen to writes on
40// the appropriate file descriptor and forward the message to syslog. This
41// function returns both the task for each file descriptor and its
42// corresponding HandleInfo.
43pub fn bind_streams_to_syslog(
44    ns: &Namespace,
45    stdout_sink: StreamSink,
46    stderr_sink: StreamSink,
47) -> (Vec<fasync::Task<()>>, Vec<fproc::HandleInfo>) {
48    let mut tasks: Vec<fasync::Task<()>> = Vec::new();
49    let mut handles: Vec<fproc::HandleInfo> = Vec::new();
50
51    // connect to the namespace's logger if we'll need it, wrap in OnceCell so we only do it once
52    // (can't use Lazy here because we need to capture `ns`)
53    let logger = OnceCell::new();
54    let mut forward_stream = |sink, fd, level| {
55        if matches!(sink, StreamSink::Log) {
56            // create the handle before dealing with the logger so components still receive an inert
57            // handle if connecting to LogSink fails
58            let (socket, handle_info) =
59                new_socket_bound_to_fd(fd).expect("failed to create socket");
60            handles.push(handle_info);
61
62            if let Some(l) = logger.get_or_init(|| create_namespace_logger(ns).map(Arc::new)) {
63                tasks.push(forward_socket_to_syslog(l.clone(), socket, level));
64            } else {
65                warn!("Tried forwarding file descriptor {fd} but didn't have a LogSink available.");
66            }
67        }
68    };
69
70    forward_stream(stdout_sink, STDOUT_FD, OutputLevel::Info);
71    forward_stream(stderr_sink, STDERR_FD, OutputLevel::Warn);
72
73    (tasks, handles)
74}
75
76fn create_namespace_logger(ns: &Namespace) -> Option<ScopedLogger> {
77    let svc_dir = ns.get(&SVC_DIRECTORY_PATH)?;
78    let logsink =
79        connect_to_named_protocol_at_dir_root(svc_dir, flogger::LogSinkMarker::PROTOCOL_NAME)
80            .ok()?;
81    ScopedLogger::create(logsink).ok()
82}
83
84fn forward_socket_to_syslog(
85    logger: Arc<ScopedLogger>,
86    socket: fasync::Socket,
87    level: OutputLevel,
88) -> fasync::Task<()> {
89    let mut writer = SyslogWriter::new(logger, level);
90    let task = fasync::Task::spawn(async move {
91        if let Err(error) = drain_lines(socket, &mut writer).await {
92            warn!(error:%; "Draining output stream failed");
93        }
94    });
95
96    task
97}
98
99fn new_socket_bound_to_fd(fd: i32) -> Result<(fasync::Socket, fproc::HandleInfo), zx::Status> {
100    let (tx, rx) = zx::Socket::create_stream();
101    let rx = fasync::Socket::from_socket(rx);
102    Ok((
103        rx,
104        fproc::HandleInfo {
105            handle: tx.into_handle(),
106            id: HandleInfo::new(HandleType::FileDescriptor, fd as u16).as_raw(),
107        },
108    ))
109}
110
111/// Drains all bytes from socket and writes messages to writer. Bytes read
112/// are split into lines and separated into chunks no greater than
113/// MAX_MESSAGE_SIZE.
114async fn drain_lines(
115    socket: fasync::Socket,
116    writer: &mut dyn LogWriter,
117) -> Result<(), NewlineChunkerError> {
118    let chunker = NewlineChunker::new(socket, MAX_MESSAGE_SIZE);
119    futures::pin_mut!(chunker);
120
121    while let Some(chunk_or_line) = chunker.next().await {
122        writer.write(&chunk_or_line?).await;
123    }
124
125    Ok(())
126}
127
128/// Object capable of writing a stream of bytes.
129#[async_trait]
130trait LogWriter: Send {
131    async fn write(&mut self, bytes: &[u8]);
132}
133
134struct SyslogWriter {
135    logger: Arc<dyn log::Log + Send + Sync>,
136    level: OutputLevel,
137}
138
139#[derive(Copy, Clone)]
140enum OutputLevel {
141    Info,
142    Warn,
143}
144
145impl From<OutputLevel> for log::Level {
146    fn from(level: OutputLevel) -> log::Level {
147        match level {
148            OutputLevel::Info => log::Level::Info,
149            OutputLevel::Warn => log::Level::Warn,
150        }
151    }
152}
153
154impl SyslogWriter {
155    fn new(logger: Arc<dyn log::Log + Send + Sync>, level: OutputLevel) -> Self {
156        Self { logger, level }
157    }
158}
159
160#[async_trait]
161impl LogWriter for SyslogWriter {
162    async fn write(&mut self, bytes: &[u8]) {
163        let msg = String::from_utf8_lossy(&bytes);
164        self.logger.log(
165            &log::Record::builder().level(self.level.into()).args(format_args!("{msg}")).build(),
166        );
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use crate::tests::{create_fs_with_mock_logsink, MockServiceFs, MockServiceRequest};
174    use anyhow::{anyhow, format_err, Context, Error};
175    use async_trait::async_trait;
176    use diagnostics_message::MonikerWithUrl;
177    use fidl_fuchsia_component_runner as fcrunner;
178    use fidl_fuchsia_logger::LogSinkRequest;
179    use fuchsia_async::Task;
180    use futures::channel::mpsc;
181    use futures::{try_join, FutureExt, SinkExt};
182    use rand::distributions::{Alphanumeric, DistString as _};
183    use rand::thread_rng;
184    use std::sync::Mutex;
185
186    #[async_trait]
187    impl LogWriter for mpsc::Sender<String> {
188        async fn write(&mut self, bytes: &[u8]) {
189            let message =
190                std::str::from_utf8(&bytes).expect("Failed to decode bytes to utf8.").to_owned();
191            let () =
192                self.send(message).await.expect("Failed to send message to other end of mpsc.");
193        }
194    }
195
196    #[fuchsia::test]
197    async fn syslog_writer_decodes_valid_utf8_message() -> Result<(), Error> {
198        let (dir, ns_entries) = create_fs_with_mock_logsink()?;
199
200        let ((), actual) = try_join!(
201            write_to_syslog_or_panic(ns_entries, b"Hello World!"),
202            read_message_from_syslog(dir)
203        )?;
204
205        assert_eq!(actual, Some("Hello World!".to_owned()));
206        Ok(())
207    }
208
209    #[fuchsia::test]
210    async fn syslog_writer_decodes_non_utf8_message() -> Result<(), Error> {
211        let (dir, ns_entries) = create_fs_with_mock_logsink()?;
212
213        let ((), actual) = try_join!(
214            write_to_syslog_or_panic(ns_entries, b"Hello \xF0\x90\x80World!"),
215            read_message_from_syslog(dir)
216        )?;
217
218        assert_eq!(actual, Some("Hello �World!".to_owned()));
219        Ok(())
220    }
221
222    #[fuchsia::test]
223    async fn drain_lines_splits_into_max_size_chunks() -> Result<(), Error> {
224        let (tx, rx) = zx::Socket::create_stream();
225        let rx = fasync::Socket::from_socket(rx);
226        let (mut sender, recv) = create_mock_logger();
227        let msg = get_random_string(MAX_MESSAGE_SIZE * 4);
228
229        let () = take_and_write_to_socket(tx, &msg)?;
230        let (actual, ()) =
231            try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
232                drain_lines(rx, &mut sender).await.map_err(Into::into)
233            })?;
234
235        assert_eq!(
236            actual,
237            msg.as_bytes()
238                .chunks(MAX_MESSAGE_SIZE)
239                .map(|bytes| std::str::from_utf8(bytes).expect("Bytes are not utf8.").to_owned())
240                .collect::<Vec<String>>()
241        );
242
243        Ok(())
244    }
245
246    #[fuchsia::test]
247    async fn drain_lines_splits_at_newline() -> Result<(), Error> {
248        let (tx, rx) = zx::Socket::create_stream();
249        let rx = fasync::Socket::from_socket(rx);
250        let (mut sender, recv) = create_mock_logger();
251        let msg = std::iter::repeat_with(|| {
252            Alphanumeric.sample_string(&mut thread_rng(), MAX_MESSAGE_SIZE - 1)
253        })
254        .take(3)
255        .collect::<Vec<_>>()
256        .join("\n");
257
258        let () = take_and_write_to_socket(tx, &msg)?;
259        let (actual, ()) =
260            try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
261                drain_lines(rx, &mut sender).await.map_err(Into::into)
262            })?;
263
264        assert_eq!(actual, msg.split("\n").map(str::to_owned).collect::<Vec<String>>());
265        Ok(())
266    }
267
268    #[fuchsia::test]
269    async fn drain_lines_writes_when_message_is_received() -> Result<(), Error> {
270        let (tx, rx) = zx::Socket::create_stream();
271        let rx = fasync::Socket::from_socket(rx);
272        let (mut sender, mut recv) = create_mock_logger();
273        let messages: Vec<String> = vec!["Hello!\n".to_owned(), "World!\n".to_owned()];
274
275        let ((), ()) = try_join!(
276            async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
277            async move {
278                for mut message in messages.into_iter() {
279                    let () = write_to_socket(&tx, &message)?;
280                    let logged_messaged =
281                        recv.next().await.context("Receiver channel closed. Got no message.")?;
282                    // Logged message should strip '\n' so we need to do the same before assertion.
283                    message.pop();
284                    assert_eq!(logged_messaged, message);
285                }
286
287                Ok(())
288            }
289        )?;
290
291        Ok(())
292    }
293
294    #[fuchsia::test]
295    async fn drain_lines_waits_for_entire_lines() -> Result<(), Error> {
296        let (tx, rx) = zx::Socket::create_stream();
297        let rx = fasync::Socket::from_socket(rx);
298        let (mut sender, mut recv) = create_mock_logger();
299
300        let ((), ()) = try_join!(
301            async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
302            async move {
303                let () = write_to_socket(&tx, "Hello\nWorld")?;
304                let logged_messaged =
305                    recv.next().await.context("Receiver channel closed. Got no message.")?;
306                assert_eq!(logged_messaged, "Hello");
307                let () = write_to_socket(&tx, "Hello\nAgain")?;
308                std::mem::drop(tx);
309                let logged_messaged =
310                    recv.next().await.context("Receiver channel closed. Got no message.")?;
311                assert_eq!(logged_messaged, "WorldHello");
312                let logged_messaged =
313                    recv.next().await.context("Receiver channel closed. Got no message.")?;
314                assert_eq!(logged_messaged, "Again");
315                Ok(())
316            }
317        )?;
318
319        Ok(())
320    }
321
322    #[fuchsia::test]
323    async fn drain_lines_collapses_repeated_newlines() -> Result<(), Error> {
324        let (tx, rx) = zx::Socket::create_stream();
325        let rx = fasync::Socket::from_socket(rx);
326        let (mut sender, mut recv) = create_mock_logger();
327
328        let drainer = Task::spawn(async move { drain_lines(rx, &mut sender).await });
329
330        write_to_socket(&tx, "Hello\n\nWorld\n")?;
331        assert_eq!(recv.next().await.unwrap(), "Hello");
332        assert_eq!(recv.next().await.unwrap(), "World");
333
334        drop(tx);
335        drainer.await?;
336        assert_eq!(recv.next().await, None);
337
338        Ok(())
339    }
340
341    async fn write_to_syslog_or_panic(
342        ns_entries: Vec<fcrunner::ComponentNamespaceEntry>,
343        message: &[u8],
344    ) -> Result<(), Error> {
345        let ns = Namespace::try_from(ns_entries).context("Failed to create Namespace")?;
346        let logger = create_namespace_logger(&ns).context("Failed to create ScopedLogger")?;
347        let mut writer = SyslogWriter::new(Arc::new(logger), OutputLevel::Info);
348        writer.write(message).await;
349
350        Ok(())
351    }
352
353    /// Retrieve message logged to socket. The wire format is expected to
354    /// match with the LogSink protocol format.
355    pub fn get_message_logged_to_socket(socket: zx::Socket) -> Option<String> {
356        let mut buffer: [u8; 1024] = [0; 1024];
357        match socket.read(&mut buffer) {
358            Ok(read_len) => {
359                let msg = diagnostics_message::from_structured(
360                    MonikerWithUrl {
361                        url: "fuchsia-pkg://fuchsia.com/test-pkg#meta/test-component.cm".into(),
362                        moniker: "test-pkg/test-component".try_into().unwrap(),
363                    },
364                    &buffer[..read_len],
365                )
366                .expect("must be able to decode a valid message from buffer");
367
368                msg.msg().map(String::from)
369            }
370            Err(_) => None,
371        }
372    }
373
374    async fn read_message_from_syslog(
375        dir: MockServiceFs<'static>,
376    ) -> Result<Option<String>, Error> {
377        let message_logged = Arc::new(Mutex::new(Option::<String>::None));
378        dir.for_each_concurrent(None, |request: MockServiceRequest| match request {
379            MockServiceRequest::LogSink(mut r) => {
380                let message_logged_copy = Arc::clone(&message_logged);
381                async move {
382                    match r.next().await.expect("stream error").expect("fidl error") {
383                        LogSinkRequest::Connect { .. } => {
384                            panic!("Unexpected call to `Connect`");
385                        }
386                        LogSinkRequest::ConnectStructured { socket, .. } => {
387                            *message_logged_copy.lock().unwrap() =
388                                get_message_logged_to_socket(socket);
389                        }
390                        LogSinkRequest::WaitForInterestChange { .. } => {
391                            // we expect this request to come but asserting on it is flakey
392                        }
393                        LogSinkRequest::_UnknownMethod { .. } => {
394                            panic!("Unexpected unknown method")
395                        }
396                    }
397                }
398            }
399        })
400        .await;
401
402        let message_logged =
403            message_logged.lock().map_err(|_| anyhow!("Failed to lock mutex"))?.clone();
404        Ok(message_logged)
405    }
406
407    fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
408        write_to_socket(&socket, &message)
409    }
410
411    fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
412        let bytes_written =
413            socket.write(message.as_bytes()).context("Failed to write to socket")?;
414        match bytes_written == message.len() {
415            true => Ok(()),
416            false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
417        }
418    }
419
420    fn create_mock_logger() -> (mpsc::Sender<String>, mpsc::Receiver<String>) {
421        mpsc::channel::<String>(20)
422    }
423
424    fn get_random_string(size: usize) -> String {
425        Alphanumeric.sample_string(&mut thread_rng(), size)
426    }
427}