archivist_lib/logs/servers/
log_stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::container::CursorItem;
7use crate::logs::error::LogsError;
8use crate::logs::repository::LogsRepository;
9use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
10use fidl_fuchsia_diagnostics::StreamMode;
11use futures::{AsyncWriteExt, Stream, StreamExt};
12use log::warn;
13use std::borrow::Cow;
14use std::sync::Arc;
15use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync, fuchsia_trace as ftrace};
16
17pub struct LogStreamServer {
18    /// The repository holding the logs.
19    logs_repo: Arc<LogsRepository>,
20
21    /// Scope in which we spawn all of the server tasks.
22    scope: fasync::Scope,
23}
24
25impl LogStreamServer {
26    pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
27        Self { logs_repo, scope }
28    }
29
30    /// Spawn a task to handle requests from components reading the shared log.
31    pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
32        let logs_repo = Arc::clone(&self.logs_repo);
33        let scope = self.scope.to_handle();
34        self.scope.spawn(async move {
35            if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
36                warn!("error handling Log requests: {}", e);
37            }
38        });
39    }
40
41    /// Handle requests to `fuchsia.diagnostics.LogStream`. All request types read the
42    /// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
43    async fn handle_requests(
44        logs_repo: Arc<LogsRepository>,
45        mut stream: fdiagnostics::LogStreamRequestStream,
46        scope: fasync::ScopeHandle,
47    ) -> Result<(), LogsError> {
48        while let Some(request) = stream.next().await {
49            let request = request.map_err(|source| LogsError::HandlingRequests {
50                protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
51                source,
52            })?;
53
54            match request {
55                fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
56                    let logs = logs_repo.logs_cursor_raw(
57                        opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
58                        None,
59                        ftrace::Id::random(),
60                    );
61                    let opts = ExtendRecordOpts::from(opts);
62                    scope.spawn(Self::stream_logs(fasync::Socket::from_socket(socket), logs, opts));
63                }
64                fdiagnostics::LogStreamRequest::_UnknownMethod {
65                    ordinal,
66                    method_type,
67                    control_handle,
68                    ..
69                } => {
70                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
71                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
72                }
73            }
74        }
75        Ok(())
76    }
77
78    async fn stream_logs(
79        mut socket: fasync::Socket,
80        mut logs: impl Stream<Item = CursorItem> + Unpin,
81        opts: ExtendRecordOpts,
82    ) {
83        while let Some(CursorItem { rolled_out, message, identity }) = logs.next().await {
84            let response = extend_fxt_record(message.bytes(), identity.as_ref(), rolled_out, &opts);
85            let result = socket.write_all(&response).await;
86            if result.is_err() {
87                // Assume an error means the peer closed for now.
88                break;
89            }
90        }
91    }
92}
93
94#[derive(Default)]
95pub struct ExtendRecordOpts {
96    pub moniker: bool,
97    pub component_url: bool,
98    pub rolled_out: bool,
99}
100
101impl ExtendRecordOpts {
102    fn should_extend(&self) -> bool {
103        let Self { moniker, component_url, rolled_out } = self;
104        *moniker || *component_url || *rolled_out
105    }
106}
107
108impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
109    fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
110        let fdiagnostics::LogStreamOptions {
111            include_moniker,
112            include_component_url,
113            include_rolled_out,
114            mode: _,
115            __source_breaking: _,
116        } = opts;
117        Self {
118            moniker: include_moniker.unwrap_or(false),
119            component_url: include_component_url.unwrap_or(false),
120            rolled_out: include_rolled_out.unwrap_or(false),
121        }
122    }
123}
124
125/// Calculates the smallest multiple of 8 that is greater than or equal to `len`.
126fn pad_to_8(len: usize) -> usize {
127    (len + 7) & !7
128}
129
130pub fn extend_fxt_record<'a>(
131    fxt_record: &'a [u8],
132    identity: &ComponentIdentity,
133    rolled_out: u64,
134    opts: &ExtendRecordOpts,
135) -> Cow<'a, [u8]> {
136    if !opts.should_extend() {
137        return Cow::Borrowed(fxt_record);
138    }
139
140    let moniker_str = if opts.moniker { Some(identity.moniker.to_string()) } else { None };
141    let moniker = moniker_str.as_deref().unwrap_or("");
142    let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
143    let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
144
145    let moniker_len = moniker.len() as u32;
146    let component_url_len = component_url.len() as u32;
147
148    let moniker_padded_len = pad_to_8(moniker_len as usize);
149    let component_url_padded_len = pad_to_8(component_url_len as usize);
150
151    let mut extended_buffer =
152        Vec::with_capacity(fxt_record.len() + 16 + moniker_padded_len + component_url_padded_len);
153    extended_buffer.extend_from_slice(fxt_record);
154
155    extended_buffer.extend_from_slice(&moniker_len.to_le_bytes());
156    extended_buffer.extend_from_slice(&component_url_len.to_le_bytes());
157    extended_buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
158
159    extended_buffer.extend_from_slice(moniker.as_bytes());
160
161    // These resize operations are needed because the bytes in component_url and
162    // moniker do not include padding, so we need to pad the end with zeroes.
163    extended_buffer.resize(extended_buffer.len() + moniker_padded_len - moniker_len as usize, 0);
164
165    extended_buffer.extend_from_slice(component_url.as_bytes());
166    extended_buffer
167        .resize(extended_buffer.len() + component_url_padded_len - component_url_len as usize, 0);
168
169    Cow::Owned(extended_buffer)
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use diagnostics_log_encoding::Argument;
176    use diagnostics_log_encoding::encode::{
177        Encoder, EncoderOpts, MutableBuffer, TestRecord, WriteEventParams,
178    };
179    use diagnostics_log_encoding::parse::parse_record;
180    use diagnostics_log_types::Severity;
181    use std::io::Cursor;
182    use test_case::test_case;
183
184    #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
185    #[test_case(
186        ExtendRecordOpts { moniker: true, ..Default::default() },
187        "UNKNOWN",
188        "",
189        0
190        ; "with_moniker")]
191    #[test_case(
192        ExtendRecordOpts { component_url: true, ..Default::default() },
193        "",
194        "fuchsia-pkg://UNKNOWN",
195        0
196        ; "with_url")]
197    #[test_case(
198        ExtendRecordOpts { rolled_out: true, ..Default::default() },
199        "",
200        "",
201        42
202        ; "with_rolled_out")]
203    #[test_case(
204        ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true },
205        "UNKNOWN",
206        "fuchsia-pkg://UNKNOWN",
207        42
208        ; "with_all")]
209    #[fuchsia::test]
210    fn extend_record_with_metadata(
211        opts: ExtendRecordOpts,
212        expected_moniker: &str,
213        expected_url: &str,
214        expected_rolled_out: u64,
215    ) {
216        let mut encoder = Encoder::new(Cursor::new([0u8; 4096]), EncoderOpts::default());
217        encoder
218            .write_event(WriteEventParams::<_, &str, _> {
219                event: TestRecord {
220                    severity: Severity::Warn as u8,
221                    timestamp: zx::BootInstant::from_nanos(1234567890),
222                    file: Some("foo.rs"),
223                    line: Some(123),
224                    record_arguments: vec![Argument::tag("hello"), Argument::message("testing")],
225                },
226                tags: &[],
227                metatags: std::iter::empty(),
228                pid: zx::Koid::from_raw(1),
229                tid: zx::Koid::from_raw(2),
230                dropped: 10,
231            })
232            .expect("wrote event");
233
234        let length = encoder.inner().cursor();
235        let original_record_bytes = &encoder.inner().get_ref()[..length];
236        let (expected_record, _) = parse_record(original_record_bytes).unwrap();
237
238        let extended_record_bytes =
239            extend_fxt_record(original_record_bytes, &ComponentIdentity::unknown(), 42, &opts);
240
241        if !opts.should_extend() {
242            assert_eq!(extended_record_bytes, original_record_bytes);
243            return;
244        }
245
246        let (record, rest) = parse_record(&extended_record_bytes).unwrap();
247        assert_eq!(record, expected_record);
248
249        let moniker_len = u32::from_le_bytes(rest[0..4].try_into().unwrap()) as usize;
250        let component_url_len = u32::from_le_bytes(rest[4..8].try_into().unwrap()) as usize;
251
252        let rolled_out = u64::from_le_bytes(rest[8..16].try_into().unwrap());
253        if opts.rolled_out {
254            assert_eq!(rolled_out, expected_rolled_out);
255        } else {
256            assert_eq!(rolled_out, 0);
257        }
258
259        let mut offset = 16;
260        let moniker = std::str::from_utf8(&rest[offset..offset + moniker_len]).unwrap();
261        assert_eq!(moniker, expected_moniker);
262        let moniker_padded_len = (moniker_len + 7) & !7;
263        offset += moniker_padded_len;
264
265        let url = std::str::from_utf8(&rest[offset..offset + component_url_len]).unwrap();
266        assert_eq!(url, expected_url);
267        let component_url_padded_len = (component_url_len + 7) & !7;
268        offset += component_url_padded_len;
269
270        assert_eq!(offset, rest.len());
271    }
272}