elf_runner/
stdout.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::config::StreamSink;
6use async_trait::async_trait;
7use cm_logger::scoped::ScopedLogger;
8use cm_types::NamespacePath;
9use fidl::prelude::*;
10use fuchsia_component::client::connect_to_named_protocol_at_dir_root;
11use fuchsia_runtime::{HandleInfo, HandleType};
12use futures::StreamExt;
13use lazy_static::lazy_static;
14use log::warn;
15use namespace::Namespace;
16use once_cell::unsync::OnceCell;
17use socket_parsing::{NewlineChunker, NewlineChunkerError};
18use std::sync::Arc;
19use zx::HandleBased;
20use {fidl_fuchsia_logger as flogger, fidl_fuchsia_process as fproc, fuchsia_async as fasync};
21
22const STDOUT_FD: i32 = 1;
23const STDERR_FD: i32 = 2;
24
25lazy_static! {
26    static ref SVC_DIRECTORY_PATH: NamespacePath = "/svc".parse().unwrap();
27}
28
29/// Max size for message when draining input stream socket. This number is
30/// slightly smaller than size allowed by Archivist (LogSink service implementation).
31const MAX_MESSAGE_SIZE: usize = 30720;
32
33/// Bind stdout or stderr streams to syslog. This function binds either or both
34/// output streams to syslog depending on value provided for each streams'
35/// StreamSink. If the value for an output stream is set to StreamSink::Log,
36/// that stream's file descriptor will be bound to syslog. All writes on that
37// fd will be forwarded to syslog and will register as log entries. For stdout,
38// the messages will be tagged with severity INFO. For stderr, the messages
39// will be tagged with severity WARN. A task is created to listen to writes on
40// the appropriate file descriptor and forward the message to syslog. This
41// function returns both the task for each file descriptor and its
42// corresponding HandleInfo.
43pub fn bind_streams_to_syslog(
44    ns: &Namespace,
45    stdout_sink: StreamSink,
46    stderr_sink: StreamSink,
47) -> (Vec<fasync::Task<()>>, Vec<fproc::HandleInfo>) {
48    let mut tasks: Vec<fasync::Task<()>> = Vec::new();
49    let mut handles: Vec<fproc::HandleInfo> = Vec::new();
50
51    // connect to the namespace's logger if we'll need it, wrap in OnceCell so we only do it once
52    // (can't use Lazy here because we need to capture `ns`)
53    let logger = OnceCell::new();
54    let mut forward_stream = |sink, fd, level| {
55        if matches!(sink, StreamSink::Log) {
56            // create the handle before dealing with the logger so components still receive an inert
57            // handle if connecting to LogSink fails
58            let (socket, handle_info) =
59                new_socket_bound_to_fd(fd).expect("failed to create socket");
60            handles.push(handle_info);
61
62            if let Some(l) = logger.get_or_init(|| create_namespace_logger(ns).map(Arc::new)) {
63                tasks.push(forward_socket_to_syslog(l.clone(), socket, level));
64            } else {
65                warn!("Tried forwarding file descriptor {fd} but didn't have a LogSink available.");
66            }
67        }
68    };
69
70    forward_stream(stdout_sink, STDOUT_FD, OutputLevel::Info);
71    forward_stream(stderr_sink, STDERR_FD, OutputLevel::Warn);
72
73    (tasks, handles)
74}
75
76fn create_namespace_logger(ns: &Namespace) -> Option<ScopedLogger> {
77    let svc_dir = ns.get(&SVC_DIRECTORY_PATH)?;
78    let logsink = connect_to_named_protocol_at_dir_root::<flogger::LogSinkMarker>(
79        svc_dir,
80        flogger::LogSinkMarker::PROTOCOL_NAME,
81    )
82    .ok()?;
83    ScopedLogger::create(logsink).ok()
84}
85
86fn forward_socket_to_syslog(
87    logger: Arc<ScopedLogger>,
88    socket: fasync::Socket,
89    level: OutputLevel,
90) -> fasync::Task<()> {
91    let mut writer = SyslogWriter::new(logger, level);
92    let task = fasync::Task::spawn(async move {
93        if let Err(error) = drain_lines(socket, &mut writer).await {
94            warn!(error:%; "Draining output stream failed");
95        }
96    });
97
98    task
99}
100
101fn new_socket_bound_to_fd(fd: i32) -> Result<(fasync::Socket, fproc::HandleInfo), zx::Status> {
102    let (tx, rx) = zx::Socket::create_stream();
103    let rx = fasync::Socket::from_socket(rx);
104    Ok((
105        rx,
106        fproc::HandleInfo {
107            handle: tx.into_handle(),
108            id: HandleInfo::new(HandleType::FileDescriptor, fd as u16).as_raw(),
109        },
110    ))
111}
112
113/// Drains all bytes from socket and writes messages to writer. Bytes read
114/// are split into lines and separated into chunks no greater than
115/// MAX_MESSAGE_SIZE.
116async fn drain_lines(
117    socket: fasync::Socket,
118    writer: &mut dyn LogWriter,
119) -> Result<(), NewlineChunkerError> {
120    let chunker = NewlineChunker::new(socket, MAX_MESSAGE_SIZE);
121    futures::pin_mut!(chunker);
122
123    while let Some(chunk_or_line) = chunker.next().await {
124        writer.write(&chunk_or_line?).await;
125    }
126
127    Ok(())
128}
129
130/// Object capable of writing a stream of bytes.
131#[async_trait]
132trait LogWriter: Send {
133    async fn write(&mut self, bytes: &[u8]);
134}
135
136struct SyslogWriter {
137    logger: Arc<dyn log::Log + Send + Sync>,
138    level: OutputLevel,
139}
140
141#[derive(Copy, Clone)]
142enum OutputLevel {
143    Info,
144    Warn,
145}
146
147impl From<OutputLevel> for log::Level {
148    fn from(level: OutputLevel) -> log::Level {
149        match level {
150            OutputLevel::Info => log::Level::Info,
151            OutputLevel::Warn => log::Level::Warn,
152        }
153    }
154}
155
156impl SyslogWriter {
157    fn new(logger: Arc<dyn log::Log + Send + Sync>, level: OutputLevel) -> Self {
158        Self { logger, level }
159    }
160}
161
162#[async_trait]
163impl LogWriter for SyslogWriter {
164    async fn write(&mut self, bytes: &[u8]) {
165        let msg = String::from_utf8_lossy(&bytes);
166        self.logger.log(
167            &log::Record::builder().level(self.level.into()).args(format_args!("{msg}")).build(),
168        );
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::tests::{create_fs_with_mock_logsink, MockServiceFs, MockServiceRequest};
176    use anyhow::{anyhow, format_err, Context, Error};
177    use async_trait::async_trait;
178    use diagnostics_message::MonikerWithUrl;
179    use fidl_fuchsia_component_runner as fcrunner;
180    use fidl_fuchsia_logger::LogSinkRequest;
181    use fuchsia_async::Task;
182    use futures::channel::mpsc;
183    use futures::{try_join, FutureExt, SinkExt};
184    use rand::distributions::{Alphanumeric, DistString as _};
185    use rand::thread_rng;
186    use std::sync::Mutex;
187
188    #[async_trait]
189    impl LogWriter for mpsc::Sender<String> {
190        async fn write(&mut self, bytes: &[u8]) {
191            let message =
192                std::str::from_utf8(&bytes).expect("Failed to decode bytes to utf8.").to_owned();
193            let () =
194                self.send(message).await.expect("Failed to send message to other end of mpsc.");
195        }
196    }
197
198    #[fuchsia::test]
199    async fn syslog_writer_decodes_valid_utf8_message() -> Result<(), Error> {
200        let (dir, ns_entries) = create_fs_with_mock_logsink()?;
201
202        let ((), actual) = try_join!(
203            write_to_syslog_or_panic(ns_entries, b"Hello World!"),
204            read_message_from_syslog(dir)
205        )?;
206
207        assert_eq!(actual, Some("Hello World!".to_owned()));
208        Ok(())
209    }
210
211    #[fuchsia::test]
212    async fn syslog_writer_decodes_non_utf8_message() -> Result<(), Error> {
213        let (dir, ns_entries) = create_fs_with_mock_logsink()?;
214
215        let ((), actual) = try_join!(
216            write_to_syslog_or_panic(ns_entries, b"Hello \xF0\x90\x80World!"),
217            read_message_from_syslog(dir)
218        )?;
219
220        assert_eq!(actual, Some("Hello �World!".to_owned()));
221        Ok(())
222    }
223
224    #[fuchsia::test]
225    async fn drain_lines_splits_into_max_size_chunks() -> Result<(), Error> {
226        let (tx, rx) = zx::Socket::create_stream();
227        let rx = fasync::Socket::from_socket(rx);
228        let (mut sender, recv) = create_mock_logger();
229        let msg = get_random_string(MAX_MESSAGE_SIZE * 4);
230
231        let () = take_and_write_to_socket(tx, &msg)?;
232        let (actual, ()) =
233            try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
234                drain_lines(rx, &mut sender).await.map_err(Into::into)
235            })?;
236
237        assert_eq!(
238            actual,
239            msg.as_bytes()
240                .chunks(MAX_MESSAGE_SIZE)
241                .map(|bytes| std::str::from_utf8(bytes).expect("Bytes are not utf8.").to_owned())
242                .collect::<Vec<String>>()
243        );
244
245        Ok(())
246    }
247
248    #[fuchsia::test]
249    async fn drain_lines_splits_at_newline() -> Result<(), Error> {
250        let (tx, rx) = zx::Socket::create_stream();
251        let rx = fasync::Socket::from_socket(rx);
252        let (mut sender, recv) = create_mock_logger();
253        let msg = std::iter::repeat_with(|| {
254            Alphanumeric.sample_string(&mut thread_rng(), MAX_MESSAGE_SIZE - 1)
255        })
256        .take(3)
257        .collect::<Vec<_>>()
258        .join("\n");
259
260        let () = take_and_write_to_socket(tx, &msg)?;
261        let (actual, ()) =
262            try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
263                drain_lines(rx, &mut sender).await.map_err(Into::into)
264            })?;
265
266        assert_eq!(actual, msg.split("\n").map(str::to_owned).collect::<Vec<String>>());
267        Ok(())
268    }
269
270    #[fuchsia::test]
271    async fn drain_lines_writes_when_message_is_received() -> Result<(), Error> {
272        let (tx, rx) = zx::Socket::create_stream();
273        let rx = fasync::Socket::from_socket(rx);
274        let (mut sender, mut recv) = create_mock_logger();
275        let messages: Vec<String> = vec!["Hello!\n".to_owned(), "World!\n".to_owned()];
276
277        let ((), ()) = try_join!(
278            async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
279            async move {
280                for mut message in messages.into_iter() {
281                    let () = write_to_socket(&tx, &message)?;
282                    let logged_messaged =
283                        recv.next().await.context("Receiver channel closed. Got no message.")?;
284                    // Logged message should strip '\n' so we need to do the same before assertion.
285                    message.pop();
286                    assert_eq!(logged_messaged, message);
287                }
288
289                Ok(())
290            }
291        )?;
292
293        Ok(())
294    }
295
296    #[fuchsia::test]
297    async fn drain_lines_waits_for_entire_lines() -> Result<(), Error> {
298        let (tx, rx) = zx::Socket::create_stream();
299        let rx = fasync::Socket::from_socket(rx);
300        let (mut sender, mut recv) = create_mock_logger();
301
302        let ((), ()) = try_join!(
303            async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
304            async move {
305                let () = write_to_socket(&tx, "Hello\nWorld")?;
306                let logged_messaged =
307                    recv.next().await.context("Receiver channel closed. Got no message.")?;
308                assert_eq!(logged_messaged, "Hello");
309                let () = write_to_socket(&tx, "Hello\nAgain")?;
310                std::mem::drop(tx);
311                let logged_messaged =
312                    recv.next().await.context("Receiver channel closed. Got no message.")?;
313                assert_eq!(logged_messaged, "WorldHello");
314                let logged_messaged =
315                    recv.next().await.context("Receiver channel closed. Got no message.")?;
316                assert_eq!(logged_messaged, "Again");
317                Ok(())
318            }
319        )?;
320
321        Ok(())
322    }
323
324    #[fuchsia::test]
325    async fn drain_lines_collapses_repeated_newlines() -> Result<(), Error> {
326        let (tx, rx) = zx::Socket::create_stream();
327        let rx = fasync::Socket::from_socket(rx);
328        let (mut sender, mut recv) = create_mock_logger();
329
330        let drainer = Task::spawn(async move { drain_lines(rx, &mut sender).await });
331
332        write_to_socket(&tx, "Hello\n\nWorld\n")?;
333        assert_eq!(recv.next().await.unwrap(), "Hello");
334        assert_eq!(recv.next().await.unwrap(), "World");
335
336        drop(tx);
337        drainer.await?;
338        assert_eq!(recv.next().await, None);
339
340        Ok(())
341    }
342
343    async fn write_to_syslog_or_panic(
344        ns_entries: Vec<fcrunner::ComponentNamespaceEntry>,
345        message: &[u8],
346    ) -> Result<(), Error> {
347        let ns = Namespace::try_from(ns_entries).context("Failed to create Namespace")?;
348        let logger = create_namespace_logger(&ns).context("Failed to create ScopedLogger")?;
349        let mut writer = SyslogWriter::new(Arc::new(logger), OutputLevel::Info);
350        writer.write(message).await;
351
352        Ok(())
353    }
354
355    /// Retrieve message logged to socket. The wire format is expected to
356    /// match with the LogSink protocol format.
357    pub fn get_message_logged_to_socket(socket: zx::Socket) -> Option<String> {
358        let mut buffer: [u8; 1024] = [0; 1024];
359        match socket.read(&mut buffer) {
360            Ok(read_len) => {
361                let msg = diagnostics_message::from_structured(
362                    MonikerWithUrl {
363                        url: "fuchsia-pkg://fuchsia.com/test-pkg#meta/test-component.cm".into(),
364                        moniker: "test-pkg/test-component".try_into().unwrap(),
365                    },
366                    &buffer[..read_len],
367                )
368                .expect("must be able to decode a valid message from buffer");
369
370                msg.msg().map(String::from)
371            }
372            Err(_) => None,
373        }
374    }
375
376    async fn read_message_from_syslog(
377        dir: MockServiceFs<'static>,
378    ) -> Result<Option<String>, Error> {
379        let message_logged = Arc::new(Mutex::new(Option::<String>::None));
380        dir.for_each_concurrent(None, |request: MockServiceRequest| match request {
381            MockServiceRequest::LogSink(mut r) => {
382                let message_logged_copy = Arc::clone(&message_logged);
383                async move {
384                    match r.next().await.expect("stream error").expect("fidl error") {
385                        LogSinkRequest::Connect { .. } => {
386                            panic!("Unexpected call to `Connect`");
387                        }
388                        LogSinkRequest::ConnectStructured { socket, .. } => {
389                            *message_logged_copy.lock().unwrap() =
390                                get_message_logged_to_socket(socket);
391                        }
392                        LogSinkRequest::WaitForInterestChange { .. } => {
393                            // we expect this request to come but asserting on it is flakey
394                        }
395                        LogSinkRequest::_UnknownMethod { .. } => {
396                            panic!("Unexpected unknown method")
397                        }
398                    }
399                }
400            }
401        })
402        .await;
403
404        let message_logged =
405            message_logged.lock().map_err(|_| anyhow!("Failed to lock mutex"))?.clone();
406        Ok(message_logged)
407    }
408
409    fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
410        write_to_socket(&socket, &message)
411    }
412
413    fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
414        let bytes_written =
415            socket.write(message.as_bytes()).context("Failed to write to socket")?;
416        match bytes_written == message.len() {
417            true => Ok(()),
418            false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
419        }
420    }
421
422    fn create_mock_logger() -> (mpsc::Sender<String>, mpsc::Receiver<String>) {
423        mpsc::channel::<String>(20)
424    }
425
426    fn get_random_string(size: usize) -> String {
427        Alphanumeric.sample_string(&mut thread_rng(), size)
428    }
429}