Skip to main content

archivist_lib/logs/servers/
log_stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::identity::ComponentIdentity;
5use crate::logs::error::LogsError;
6use crate::logs::repository::LogsRepository;
7use crate::logs::shared_buffer::FxtMessage;
8use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
9use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
10use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker, RequestStream};
11use fidl_fuchsia_diagnostics::StreamMode;
12use fidl_fuchsia_diagnostics_types::Severity;
13use futures::{AsyncWriteExt, Stream, StreamExt};
14use log::warn;
15use std::collections::HashMap;
16use std::io::Cursor;
17use std::pin::pin;
18use std::sync::Arc;
19use zerocopy::{FromBytes, IntoBytes};
20use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync};
21
22#[derive(thiserror::Error, Debug)]
23enum StreamError {
24    #[error(transparent)]
25    Io(#[from] std::io::Error),
26}
27
28pub struct LogStreamServer {
29    /// The repository holding the logs.
30    logs_repo: Arc<LogsRepository>,
31
32    /// Scope in which we spawn all of the server tasks.
33    scope: fasync::Scope,
34}
35
36impl LogStreamServer {
37    pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
38        Self { logs_repo, scope }
39    }
40
41    /// Spawn a task to handle requests from components reading the shared log.
42    pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
43        let logs_repo = Arc::clone(&self.logs_repo);
44        let scope = self.scope.to_handle();
45        self.scope.spawn(async move {
46            if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
47                warn!("error handling Log requests: {}", e);
48            }
49        });
50    }
51
52    /// Handle requests to `fuchsia.diagnostics.LogStream`. All request types read the
53    /// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
54    async fn handle_requests(
55        logs_repo: Arc<LogsRepository>,
56        mut stream: fdiagnostics::LogStreamRequestStream,
57        scope: fasync::ScopeHandle,
58    ) -> Result<(), LogsError> {
59        while let Some(request) = stream.next().await {
60            let request = request.map_err(|source| LogsError::HandlingRequests {
61                protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
62                source,
63            })?;
64
65            match request {
66                fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
67                    let logs = logs_repo.logs_cursor_raw(
68                        opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
69                        Vec::new(),
70                    );
71                    let opts = ExtendRecordOpts::from(opts);
72                    if opts.subscribe_to_manifest {
73                        if opts.moniker || opts.component_url || opts.rolled_out {
74                            stream.control_handle().shutdown_with_epitaph(zx::Status::INVALID_ARGS);
75                            return Ok(());
76                        }
77
78                        scope.spawn(async move {
79                            let _ = Self::stream_logs_with_manifest(
80                                fasync::Socket::from_socket(socket),
81                                logs,
82                            )
83                            .await;
84                        });
85                    } else {
86                        scope.spawn(Self::stream_logs(
87                            fasync::Socket::from_socket(socket),
88                            logs,
89                            opts,
90                        ));
91                    }
92                }
93                fdiagnostics::LogStreamRequest::_UnknownMethod {
94                    ordinal,
95                    method_type,
96                    control_handle,
97                    ..
98                } => {
99                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
100                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
101                }
102            }
103        }
104        Ok(())
105    }
106
107    async fn stream_logs(
108        mut socket: fasync::Socket,
109        logs: impl Stream<Item = FxtMessage>,
110        opts: ExtendRecordOpts,
111    ) {
112        let mut logs = pin!(logs);
113        let mut buffer = Vec::new();
114        while let Some(message) = logs.next().await {
115            buffer.clear();
116            buffer.extend_from_slice(message.data());
117            extend_fxt_record(message.component_identity(), message.dropped(), &opts, &mut buffer);
118            let result = socket.write_all(&buffer).await;
119            if result.is_err() {
120                // Assume an error means the peer closed for now.
121                break;
122            }
123        }
124    }
125
126    async fn stream_logs_with_manifest(
127        mut socket: fasync::Socket,
128        logs: impl Stream<Item = FxtMessage>,
129    ) -> Result<(), StreamError> {
130        let mut logs = pin!(logs);
131        let mut sent_tags = HashMap::new();
132        while let Some(message) = logs.next().await {
133            let tag = message.tag();
134            match sent_tags.entry(tag) {
135                std::collections::hash_map::Entry::Vacant(e) => {
136                    let identity = message.component_identity();
137                    Self::send_component_change(&mut socket, tag, identity).await?;
138                    e.insert(Arc::clone(message.component_identity()));
139                }
140                std::collections::hash_map::Entry::Occupied(mut e) => {
141                    let identity = message.component_identity();
142                    if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
143                        Self::send_component_change(&mut socket, tag, identity).await?;
144                        e.insert(Arc::clone(message.component_identity()));
145                    }
146                }
147            }
148            socket.write_all(message.data()).await?;
149        }
150        Ok(())
151    }
152
153    async fn send_component_change(
154        socket: &mut fasync::Socket,
155        id: u64,
156        identity: &ComponentIdentity,
157    ) -> Result<(), std::io::Error> {
158        let mut encoder =
159            Encoder::new(Cursor::new(ResizableBuffer::from(Vec::new())), EncoderOpts::default());
160        let record = Record {
161            timestamp: zx::BootInstant::from_nanos(0),
162            severity: Severity::Info.into_primitive(),
163            arguments: vec![
164                Argument::other("moniker", identity.moniker.to_string()),
165                Argument::other("url", identity.url.as_str()),
166            ],
167        };
168        encoder.write_record(record).map_err(std::io::Error::other)?;
169
170        let mut buffer = encoder.take().into_inner().into_inner();
171        if buffer.len() >= 8 {
172            let mut header = Header::read_from_bytes(&buffer[0..8]).unwrap();
173            header.set_tag((id as u32) | LOG_CONTROL_BIT);
174            buffer[0..8].copy_from_slice(header.as_bytes());
175        }
176        socket.write_all(&buffer).await?;
177        Ok(())
178    }
179}
180
181#[derive(Default)]
182pub struct ExtendRecordOpts {
183    pub moniker: bool,
184    pub component_url: bool,
185    pub rolled_out: bool,
186    pub subscribe_to_manifest: bool,
187}
188
189impl ExtendRecordOpts {
190    fn should_extend(&self) -> bool {
191        let Self { moniker, component_url, rolled_out, subscribe_to_manifest: _ } = self;
192        *moniker || *component_url || *rolled_out
193    }
194}
195
196impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
197    fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
198        let fdiagnostics::LogStreamOptions {
199            include_moniker,
200            include_component_url,
201            include_rolled_out,
202            mode: _,
203            __source_breaking: _,
204            subscribe_to_manifest,
205        } = opts;
206        Self {
207            moniker: include_moniker.unwrap_or(false),
208            component_url: include_component_url.unwrap_or(false),
209            rolled_out: include_rolled_out.unwrap_or(false),
210            subscribe_to_manifest: subscribe_to_manifest.unwrap_or(false),
211        }
212    }
213}
214
215/// Returns zero padding for `len`.
216fn padding(len: usize) -> &'static [u8] {
217    &[0; 8][(len + 7) % 8 + 1..]
218}
219
220pub fn extend_fxt_record(
221    identity: &ComponentIdentity,
222    rolled_out: u64,
223    opts: &ExtendRecordOpts,
224    buffer: &mut Vec<u8>,
225) {
226    if !opts.should_extend() {
227        return;
228    }
229
230    let moniker = if opts.moniker { identity.moniker.as_ref() } else { "" };
231    let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
232    let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
233
234    let moniker_len = moniker.len() as u32;
235    let component_url_len = component_url.len() as u32;
236
237    buffer.extend_from_slice(&moniker_len.to_le_bytes());
238    buffer.extend_from_slice(&component_url_len.to_le_bytes());
239    buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
240
241    buffer.extend_from_slice(moniker.as_bytes());
242    buffer.extend_from_slice(padding(moniker.len()));
243
244    buffer.extend_from_slice(component_url.as_bytes());
245    buffer.extend_from_slice(padding(component_url.len()));
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::logs::testing::make_message;
252    use diagnostics_log_encoding::Value;
253    use diagnostics_log_encoding::parse::parse_record;
254    use futures::AsyncReadExt;
255    use moniker::ExtendedMoniker;
256    use test_case::test_case;
257    use zx;
258
259    #[fuchsia::test]
260    async fn log_stream_with_manifest() {
261        let repo = LogsRepository::for_test(fasync::Scope::new());
262        let identity = Arc::new(ComponentIdentity::new(
263            ExtendedMoniker::parse_str("./foo").unwrap(),
264            "fuchsia-pkg://foo",
265        ));
266        let container = repo.get_log_container(Arc::clone(&identity));
267        let container_tag = container.buffer().iob_tag() as u32;
268
269        let scope = fasync::Scope::new();
270        let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
271        let (proxy, stream) =
272            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
273        server.spawn(stream);
274
275        let (client_socket, server_socket) = zx::Socket::create_stream();
276        let mut client_socket = fasync::Socket::from_socket(client_socket);
277
278        let opts = fdiagnostics::LogStreamOptions {
279            subscribe_to_manifest: Some(true),
280            mode: Some(StreamMode::SnapshotThenSubscribe),
281            ..Default::default()
282        };
283        proxy.connect(server_socket, &opts).expect("connect");
284
285        // Wait for connection to be established/handled?
286        // We can just ingest. The cursor should pick it up.
287        container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
288
289        let mut buf = vec![0u8; 4096];
290        let mut offset = 0;
291
292        // 1. Read Manifest
293        let (manifest_record, manifest_len) = loop {
294            if offset > 0
295                && let Ok((record, rest)) = parse_record(&buf[..offset])
296            {
297                let len = offset - rest.len();
298                break (record, len);
299            }
300            let n = client_socket.read(&mut buf[offset..]).await.expect("read");
301            assert!(n > 0, "socket closed before receiving manifest");
302            offset += n;
303        };
304
305        // Check manifest arguments
306        assert_eq!(manifest_record.arguments[0].name(), "moniker");
307        assert_eq!(manifest_record.arguments[0].value(), Value::Text("foo".into()));
308        assert_eq!(manifest_record.arguments[1].name(), "url");
309        assert_eq!(manifest_record.arguments[1].value(), Value::Text("fuchsia-pkg://foo".into()));
310
311        // 2. Read Log Record
312        let (log_record, _log_len) = loop {
313            if offset > manifest_len
314                && let Ok((record, rest)) = parse_record(&buf[manifest_len..offset])
315            {
316                let len = offset - manifest_len - rest.len();
317                break (record, len);
318            }
319            let n = client_socket.read(&mut buf[offset..]).await.expect("read");
320            assert!(n > 0, "socket closed before receiving log record");
321            offset += n;
322        };
323
324        // Verify header tag of first record (Manifest)
325        let header1 = Header::read_from_bytes(&buf[0..8]).unwrap();
326        let tag_id = header1.tag();
327        assert_ne!(tag_id & LOG_CONTROL_BIT, 0, "Manifest should have LOG_CONTROL_BIT set");
328
329        // Verify header tag of second record (Log)
330        let header2 = Header::read_from_bytes(&buf[manifest_len..manifest_len + 8]).unwrap();
331        assert_eq!(
332            header2.tag() & LOG_CONTROL_BIT,
333            0,
334            "Log record should NOT have LOG_CONTROL_BIT set"
335        );
336        assert_eq!(
337            header2.tag(),
338            tag_id & !LOG_CONTROL_BIT,
339            "Log record tag should match Manifest tag ID"
340        );
341        assert_eq!(header2.tag(), container_tag, "Log record tag ID should equal IOB tag ID");
342
343        assert_eq!(log_record.arguments[2].value(), Value::Text("a".into()));
344    }
345
346    #[fuchsia::test]
347    async fn log_stream_with_manifest_reused_tag() {
348        use crate::logs::shared_buffer::create_ring_buffer;
349        // Use a small buffer to facilitate rolling out logs.
350        let repo = LogsRepository::new(
351            create_ring_buffer(65536),
352            std::iter::empty(),
353            &Default::default(),
354            fasync::Scope::new(),
355        );
356
357        let scope = fasync::Scope::new();
358        let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
359        let (proxy, stream) =
360            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
361        server.spawn(stream);
362
363        let (client_socket, server_socket) = zx::Socket::create_stream();
364        let mut client_socket = fasync::Socket::from_socket(client_socket);
365
366        let opts = fdiagnostics::LogStreamOptions {
367            subscribe_to_manifest: Some(true),
368            mode: Some(StreamMode::SnapshotThenSubscribe),
369            ..Default::default()
370        };
371        proxy.connect(server_socket, &opts).expect("connect");
372
373        // 1. Setup Identity A
374        let identity_a = Arc::new(ComponentIdentity::new(
375            ExtendedMoniker::parse_str("./foo").unwrap(),
376            "fuchsia-pkg://foo",
377        ));
378        let container_a = repo.get_log_container(Arc::clone(&identity_a));
379        let tag_a = container_a.buffer().iob_tag();
380
381        // 2. Ingest A
382        container_a.ingest_message(make_message("msg_a", None, zx::BootInstant::from_nanos(1)));
383
384        let mut buf = vec![0u8; 65536];
385        let mut offset = 0;
386
387        // Helper to read one record from the socket
388        async fn read_one_record(
389            socket: &mut fasync::Socket,
390            buf: &mut [u8],
391            offset: &mut usize,
392        ) -> (diagnostics_log_encoding::Record<'static>, usize) {
393            loop {
394                if *offset > 0
395                    && let Ok((record, rest)) = parse_record(&buf[..*offset])
396                {
397                    let len = *offset - rest.len();
398                    let owned_record = record.into_owned();
399                    // Shift buffer
400                    buf.copy_within(len..*offset, 0);
401                    *offset -= len;
402                    // Return owned record to avoid lifetime issues
403                    return (owned_record, len);
404                }
405                let n = socket.read(&mut buf[*offset..]).await.expect("read");
406                assert!(n > 0, "socket closed unexpectedly");
407                *offset += n;
408            }
409        }
410
411        // 3. Read Manifest A and Log A
412        let (manifest_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
413        assert_eq!(manifest_a.arguments[0].value(), Value::Text("foo".into()));
414
415        let (log_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
416        assert_eq!(log_a.arguments[2].value(), Value::Text("msg_a".into()));
417
418        // 4. Mark A inactive and release
419        container_a.mark_stopped();
420        drop(container_a);
421
422        // 5. Force rollout A by ingesting filler logs
423        let identity_filler = Arc::new(ComponentIdentity::new(
424            ExtendedMoniker::parse_str("./filler").unwrap(),
425            "fuchsia-pkg://filler",
426        ));
427        let container_filler = repo.get_log_container(Arc::clone(&identity_filler));
428
429        let identity_b = Arc::new(ComponentIdentity::new(
430            ExtendedMoniker::parse_str("./bar").unwrap(),
431            "fuchsia-pkg://bar",
432        ));
433
434        let mut container_b;
435        loop {
436            // Ingest filler
437            container_filler.ingest_message(make_message(
438                "fill",
439                None,
440                zx::BootInstant::from_nanos(1),
441            ));
442
443            // Drain socket to ensure flow
444
445            // Read available data
446            loop {
447                let mut temp_buf = [0u8; 1024];
448                // Use poll_read to not block
449                let read_fut = client_socket.read(&mut temp_buf);
450                match futures::poll!(read_fut) {
451                    std::task::Poll::Ready(Ok(n)) if n > 0 => {
452                        // We just discard filler data for now, but we need to watch for B?
453                        // No, B hasn't been created/ingested yet.
454                        // We are just draining filler.
455                    }
456                    _ => break, // No more data or pending
457                }
458            }
459
460            // Try to allocate B
461            container_b = repo.get_log_container(Arc::clone(&identity_b));
462            if container_b.buffer().iob_tag() == tag_a {
463                break;
464            }
465
466            // Failed to reuse, clean up B
467            container_b.mark_stopped();
468            drop(container_b);
469
470            // Yield to let cleanup tasks run
471            fasync::Timer::new(std::time::Duration::from_millis(10)).await;
472        }
473
474        // 6. Ingest B
475        container_b.ingest_message(make_message("msg_b", None, zx::BootInstant::from_nanos(2)));
476
477        // 7. Read Manifest B + Log B
478        // Note: Our buffer `buf` might contain leftover filler data or partial records.
479        // But we discarded filler data in the loop above (into temp_buf).
480        // `buf` and `offset` state from `read_one_record` was preserved.
481        // Wait, `read_one_record` modifies `buf` and `offset` (shifts data).
482        // So `buf` should be clean or contain partial data.
483
484        // We need to read until we find Manifest B.
485        loop {
486            let (record, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
487
488            // Is it Manifest B?
489            if !record.arguments.is_empty()
490                && record.arguments[0].name() == "moniker"
491                && record.arguments[0].value() == Value::Text("bar".into())
492            {
493                // Found Manifest B!
494                break;
495            }
496            // Otherwise it's filler log or Manifest Filler
497        }
498
499        // Next should be Log B
500        let (log_b, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
501        assert_eq!(log_b.arguments[2].value(), Value::Text("msg_b".into()));
502    }
503
504    #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
505    #[test_case(
506        ExtendRecordOpts { moniker: true, ..Default::default() },
507        "UNKNOWN",
508        "",
509        0
510        ; "with_moniker")]
511    #[test_case(
512        ExtendRecordOpts { component_url: true, ..Default::default() },
513        "",
514        "fuchsia-pkg://UNKNOWN",
515        0
516        ; "with_url")]
517    #[test_case(
518        ExtendRecordOpts { rolled_out: true, ..Default::default() },
519        "",
520        "",
521        42
522        ; "with_rolled_out")]
523    #[test_case(
524        ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true, subscribe_to_manifest: false },
525        "UNKNOWN",
526        "fuchsia-pkg://UNKNOWN",
527        42
528        ; "with_all")]
529    #[fuchsia::test]
530    fn extend_record_with_metadata(
531        opts: ExtendRecordOpts,
532        expected_moniker: &str,
533        expected_url: &str,
534        expected_rolled_out: u64,
535    ) {
536        let mut buffer = Vec::new();
537        extend_fxt_record(&ComponentIdentity::unknown(), 42, &opts, &mut buffer);
538
539        if !opts.should_extend() {
540            assert!(buffer.is_empty());
541            return;
542        }
543
544        let moniker_len = u32::from_le_bytes(buffer[0..4].try_into().unwrap()) as usize;
545        let component_url_len = u32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize;
546
547        let rolled_out = u64::from_le_bytes(buffer[8..16].try_into().unwrap());
548        if opts.rolled_out {
549            assert_eq!(rolled_out, expected_rolled_out);
550        } else {
551            assert_eq!(rolled_out, 0);
552        }
553
554        let mut offset = 16;
555        let moniker = std::str::from_utf8(&buffer[offset..offset + moniker_len]).unwrap();
556        assert_eq!(moniker, expected_moniker);
557        let moniker_padded_len = (moniker_len + 7) & !7;
558        offset += moniker_padded_len;
559
560        let url = std::str::from_utf8(&buffer[offset..offset + component_url_len]).unwrap();
561        assert_eq!(url, expected_url);
562        let component_url_padded_len = (component_url_len + 7) & !7;
563        offset += component_url_padded_len;
564
565        assert_eq!(offset, buffer.len());
566    }
567}