archivist_lib/logs/servers/
log_stream.rs1use crate::identity::ComponentIdentity;
6use crate::logs::container::CursorItem;
7use crate::logs::error::LogsError;
8use crate::logs::repository::LogsRepository;
9use diagnostics_log_encoding::encode::{Encoder, MutableBuffer, ResizableBuffer};
10use diagnostics_log_encoding::{Header, FXT_HEADER_SIZE};
11use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
12use fidl_fuchsia_diagnostics::StreamMode;
13use futures::{AsyncWriteExt, Stream, StreamExt};
14use log::warn;
15use std::borrow::Cow;
16use std::io::Cursor;
17use std::sync::Arc;
18use zerocopy::FromBytes;
19use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync, fuchsia_trace as ftrace};
20
21pub struct LogStreamServer {
22 logs_repo: Arc<LogsRepository>,
24
25 scope: fasync::Scope,
27}
28
29impl LogStreamServer {
30 pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
31 Self { logs_repo, scope }
32 }
33
34 pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
36 let logs_repo = Arc::clone(&self.logs_repo);
37 let scope = self.scope.to_handle();
38 self.scope.spawn(async move {
39 if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
40 warn!("error handling Log requests: {}", e);
41 }
42 });
43 }
44
45 async fn handle_requests(
48 logs_repo: Arc<LogsRepository>,
49 mut stream: fdiagnostics::LogStreamRequestStream,
50 scope: fasync::ScopeHandle,
51 ) -> Result<(), LogsError> {
52 while let Some(request) = stream.next().await {
53 let request = request.map_err(|source| LogsError::HandlingRequests {
54 protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
55 source,
56 })?;
57
58 match request {
59 fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
60 let logs = logs_repo.logs_cursor_raw(
61 opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
62 None,
63 ftrace::Id::random(),
64 );
65 let opts = ExtendRecordOpts::from(opts);
66 scope.spawn(Self::stream_logs(fasync::Socket::from_socket(socket), logs, opts));
67 }
68 fdiagnostics::LogStreamRequest::_UnknownMethod {
69 ordinal,
70 method_type,
71 control_handle,
72 ..
73 } => {
74 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
75 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
76 }
77 }
78 }
79 Ok(())
80 }
81
82 async fn stream_logs(
83 mut socket: fasync::Socket,
84 mut logs: impl Stream<Item = CursorItem> + Unpin,
85 opts: ExtendRecordOpts,
86 ) {
87 while let Some(CursorItem { rolled_out, message, identity }) = logs.next().await {
88 let response = extend_fxt_record(message.bytes(), identity.as_ref(), rolled_out, &opts);
89 let result = socket.write_all(&response).await;
90 if result.is_err() {
91 break;
93 }
94 }
95 }
96}
97
98#[derive(Default)]
99pub struct ExtendRecordOpts {
100 pub moniker: bool,
101 pub component_url: bool,
102 pub rolled_out: bool,
103}
104
105impl ExtendRecordOpts {
106 fn should_extend(&self) -> bool {
107 let Self { moniker, component_url, rolled_out } = self;
108 *moniker || *component_url || *rolled_out
109 }
110}
111
112impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
113 fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
114 let fdiagnostics::LogStreamOptions {
115 include_moniker,
116 include_component_url,
117 include_rolled_out,
118 mode: _,
119 __source_breaking: _,
120 } = opts;
121 Self {
122 moniker: include_moniker.unwrap_or(false),
123 component_url: include_component_url.unwrap_or(false),
124 rolled_out: include_rolled_out.unwrap_or(false),
125 }
126 }
127}
128
129pub fn extend_fxt_record<'a>(
130 fxt_record: &'a [u8],
131 identity: &ComponentIdentity,
132 rolled_out: u64,
133 opts: &ExtendRecordOpts,
134) -> Cow<'a, [u8]> {
135 if !opts.should_extend() {
136 return Cow::Borrowed(fxt_record);
137 }
138
139 let mut cursor = Cursor::new(ResizableBuffer::from(vec![0; fxt_record.len()]));
140 cursor.put_slice(fxt_record).expect("must fit");
141
142 let mut metadata_arguments = Encoder::new(cursor, Default::default());
143 if opts.moniker {
144 metadata_arguments
145 .write_raw_argument(fdiagnostics::MONIKER_ARG_NAME, identity.moniker.to_string())
146 .expect("infallible");
147 }
148 if opts.component_url {
149 metadata_arguments
150 .write_raw_argument(fdiagnostics::COMPONENT_URL_ARG_NAME, identity.url.as_ref())
151 .expect("infallible");
152 }
153 if opts.rolled_out && rolled_out > 0 {
154 metadata_arguments
155 .write_raw_argument(fdiagnostics::ROLLED_OUT_ARG_NAME, rolled_out)
156 .expect("infallible");
157 }
158
159 let buffer = metadata_arguments.take();
160 let length = buffer.cursor();
161 let mut buffer = buffer.into_inner().into_inner();
162
163 let (header, _) = Header::mut_from_prefix(&mut buffer[..FXT_HEADER_SIZE])
164 .expect("we validate the header when ingesting");
165 header.set_len(length);
166 buffer.resize(length, 0);
167 Cow::Owned(buffer)
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use diagnostics_log_encoding::encode::{EncoderOpts, TestRecord, WriteEventParams};
174 use diagnostics_log_encoding::parse::parse_record;
175 use diagnostics_log_encoding::Argument;
176 use diagnostics_log_types::Severity;
177 use test_case::test_case;
178
179 #[test_case(ExtendRecordOpts::default(), vec![] ; "no_additional_metadata")]
180 #[test_case(
181 ExtendRecordOpts { moniker: true, ..Default::default() },
182 vec![Argument::other(fdiagnostics::MONIKER_ARG_NAME, "UNKNOWN")]
183 ; "with_moniker")]
184 #[test_case(
185 ExtendRecordOpts { component_url: true, ..Default::default() },
186 vec![Argument::other(fdiagnostics::COMPONENT_URL_ARG_NAME, "fuchsia-pkg://UNKNOWN")]
187 ; "with_url")]
188 #[test_case(
189 ExtendRecordOpts { rolled_out: true, ..Default::default() },
190 vec![Argument::other(fdiagnostics::ROLLED_OUT_ARG_NAME, 42u64)]
191 ; "with_rolled_out")]
192 #[fuchsia::test]
193 fn extend_record_with_metadata(opts: ExtendRecordOpts, arguments: Vec<Argument<'static>>) {
194 let mut encoder = Encoder::new(Cursor::new([0u8; 4096]), EncoderOpts::default());
195 encoder
196 .write_event(WriteEventParams::<_, &str, _> {
197 event: TestRecord {
198 severity: Severity::Warn as u8,
199 timestamp: zx::BootInstant::from_nanos(1234567890),
200 file: Some("foo.rs"),
201 line: Some(123),
202 record_arguments: vec![Argument::tag("hello"), Argument::message("testing")],
203 },
204 tags: &[],
205 metatags: std::iter::empty(),
206 pid: zx::Koid::from_raw(1),
207 tid: zx::Koid::from_raw(2),
208 dropped: 10,
209 })
210 .expect("wrote event");
211
212 let length = encoder.inner().cursor();
213 let original_record_bytes = &encoder.inner().get_ref()[..length];
214 let (mut expected_record, _) = parse_record(original_record_bytes).unwrap();
215
216 let extended_record_bytes =
217 extend_fxt_record(original_record_bytes, &ComponentIdentity::unknown(), 42, &opts);
218 let (extended_record, _) = parse_record(&extended_record_bytes).unwrap();
219
220 expected_record.arguments.extend(arguments);
223 assert_eq!(extended_record, expected_record);
224 }
225}