archivist_lib/logs/servers/
log_stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::container::CursorItem;
7use crate::logs::error::LogsError;
8use crate::logs::repository::LogsRepository;
9use diagnostics_log_encoding::encode::{Encoder, MutableBuffer, ResizableBuffer};
10use diagnostics_log_encoding::{Header, FXT_HEADER_SIZE};
11use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
12use fidl_fuchsia_diagnostics::StreamMode;
13use futures::{AsyncWriteExt, Stream, StreamExt};
14use log::warn;
15use std::borrow::Cow;
16use std::io::Cursor;
17use std::sync::Arc;
18use zerocopy::FromBytes;
19use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync, fuchsia_trace as ftrace};
20
21pub struct LogStreamServer {
22    /// The repository holding the logs.
23    logs_repo: Arc<LogsRepository>,
24
25    /// Scope in which we spawn all of the server tasks.
26    scope: fasync::Scope,
27}
28
29impl LogStreamServer {
30    pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
31        Self { logs_repo, scope }
32    }
33
34    /// Spawn a task to handle requests from components reading the shared log.
35    pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
36        let logs_repo = Arc::clone(&self.logs_repo);
37        let scope = self.scope.to_handle();
38        self.scope.spawn(async move {
39            if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
40                warn!("error handling Log requests: {}", e);
41            }
42        });
43    }
44
45    /// Handle requests to `fuchsia.diagnostics.LogStream`. All request types read the
46    /// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
47    async fn handle_requests(
48        logs_repo: Arc<LogsRepository>,
49        mut stream: fdiagnostics::LogStreamRequestStream,
50        scope: fasync::ScopeHandle,
51    ) -> Result<(), LogsError> {
52        while let Some(request) = stream.next().await {
53            let request = request.map_err(|source| LogsError::HandlingRequests {
54                protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
55                source,
56            })?;
57
58            match request {
59                fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
60                    let logs = logs_repo.logs_cursor_raw(
61                        opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
62                        None,
63                        ftrace::Id::random(),
64                    );
65                    let opts = ExtendRecordOpts::from(opts);
66                    scope.spawn(Self::stream_logs(fasync::Socket::from_socket(socket), logs, opts));
67                }
68                fdiagnostics::LogStreamRequest::_UnknownMethod {
69                    ordinal,
70                    method_type,
71                    control_handle,
72                    ..
73                } => {
74                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
75                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
76                }
77            }
78        }
79        Ok(())
80    }
81
82    async fn stream_logs(
83        mut socket: fasync::Socket,
84        mut logs: impl Stream<Item = CursorItem> + Unpin,
85        opts: ExtendRecordOpts,
86    ) {
87        while let Some(CursorItem { rolled_out, message, identity }) = logs.next().await {
88            let response = extend_fxt_record(message.bytes(), identity.as_ref(), rolled_out, &opts);
89            let result = socket.write_all(&response).await;
90            if result.is_err() {
91                // Assume an error means the peer closed for now.
92                break;
93            }
94        }
95    }
96}
97
98#[derive(Default)]
99pub struct ExtendRecordOpts {
100    pub moniker: bool,
101    pub component_url: bool,
102    pub rolled_out: bool,
103}
104
105impl ExtendRecordOpts {
106    fn should_extend(&self) -> bool {
107        let Self { moniker, component_url, rolled_out } = self;
108        *moniker || *component_url || *rolled_out
109    }
110}
111
112impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
113    fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
114        let fdiagnostics::LogStreamOptions {
115            include_moniker,
116            include_component_url,
117            include_rolled_out,
118            mode: _,
119            __source_breaking: _,
120        } = opts;
121        Self {
122            moniker: include_moniker.unwrap_or(false),
123            component_url: include_component_url.unwrap_or(false),
124            rolled_out: include_rolled_out.unwrap_or(false),
125        }
126    }
127}
128
129pub fn extend_fxt_record<'a>(
130    fxt_record: &'a [u8],
131    identity: &ComponentIdentity,
132    rolled_out: u64,
133    opts: &ExtendRecordOpts,
134) -> Cow<'a, [u8]> {
135    if !opts.should_extend() {
136        return Cow::Borrowed(fxt_record);
137    }
138
139    let mut cursor = Cursor::new(ResizableBuffer::from(vec![0; fxt_record.len()]));
140    cursor.put_slice(fxt_record).expect("must fit");
141
142    let mut metadata_arguments = Encoder::new(cursor, Default::default());
143    if opts.moniker {
144        metadata_arguments
145            .write_raw_argument(fdiagnostics::MONIKER_ARG_NAME, identity.moniker.to_string())
146            .expect("infallible");
147    }
148    if opts.component_url {
149        metadata_arguments
150            .write_raw_argument(fdiagnostics::COMPONENT_URL_ARG_NAME, identity.url.as_ref())
151            .expect("infallible");
152    }
153    if opts.rolled_out && rolled_out > 0 {
154        metadata_arguments
155            .write_raw_argument(fdiagnostics::ROLLED_OUT_ARG_NAME, rolled_out)
156            .expect("infallible");
157    }
158
159    let buffer = metadata_arguments.take();
160    let length = buffer.cursor();
161    let mut buffer = buffer.into_inner().into_inner();
162
163    let (header, _) = Header::mut_from_prefix(&mut buffer[..FXT_HEADER_SIZE])
164        .expect("we validate the header when ingesting");
165    header.set_len(length);
166    buffer.resize(length, 0);
167    Cow::Owned(buffer)
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use diagnostics_log_encoding::encode::{EncoderOpts, TestRecord, WriteEventParams};
174    use diagnostics_log_encoding::parse::parse_record;
175    use diagnostics_log_encoding::Argument;
176    use diagnostics_log_types::Severity;
177    use test_case::test_case;
178
179    #[test_case(ExtendRecordOpts::default(), vec![] ; "no_additional_metadata")]
180    #[test_case(
181        ExtendRecordOpts { moniker: true, ..Default::default() },
182        vec![Argument::other(fdiagnostics::MONIKER_ARG_NAME, "UNKNOWN")]
183        ; "with_moniker")]
184    #[test_case(
185        ExtendRecordOpts { component_url: true, ..Default::default() },
186        vec![Argument::other(fdiagnostics::COMPONENT_URL_ARG_NAME, "fuchsia-pkg://UNKNOWN")]
187        ; "with_url")]
188    #[test_case(
189        ExtendRecordOpts { rolled_out: true, ..Default::default() },
190        vec![Argument::other(fdiagnostics::ROLLED_OUT_ARG_NAME, 42u64)]
191        ; "with_rolled_out")]
192    #[fuchsia::test]
193    fn extend_record_with_metadata(opts: ExtendRecordOpts, arguments: Vec<Argument<'static>>) {
194        let mut encoder = Encoder::new(Cursor::new([0u8; 4096]), EncoderOpts::default());
195        encoder
196            .write_event(WriteEventParams::<_, &str, _> {
197                event: TestRecord {
198                    severity: Severity::Warn as u8,
199                    timestamp: zx::BootInstant::from_nanos(1234567890),
200                    file: Some("foo.rs"),
201                    line: Some(123),
202                    record_arguments: vec![Argument::tag("hello"), Argument::message("testing")],
203                },
204                tags: &[],
205                metatags: std::iter::empty(),
206                pid: zx::Koid::from_raw(1),
207                tid: zx::Koid::from_raw(2),
208                dropped: 10,
209            })
210            .expect("wrote event");
211
212        let length = encoder.inner().cursor();
213        let original_record_bytes = &encoder.inner().get_ref()[..length];
214        let (mut expected_record, _) = parse_record(original_record_bytes).unwrap();
215
216        let extended_record_bytes =
217            extend_fxt_record(original_record_bytes, &ComponentIdentity::unknown(), 42, &opts);
218        let (extended_record, _) = parse_record(&extended_record_bytes).unwrap();
219
220        // The expected record is the original record plus the additional arguments that were
221        // requested.
222        expected_record.arguments.extend(arguments);
223        assert_eq!(extended_record, expected_record);
224    }
225}