archivist_lib/logs/servers/
log_stream.rs1use crate::identity::ComponentIdentity;
6use crate::logs::container::CursorItem;
7use crate::logs::error::LogsError;
8use crate::logs::repository::LogsRepository;
9use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
10use fidl_fuchsia_diagnostics::StreamMode;
11use futures::{AsyncWriteExt, Stream, StreamExt};
12use log::warn;
13use std::borrow::Cow;
14use std::sync::Arc;
15use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync, fuchsia_trace as ftrace};
16
17pub struct LogStreamServer {
18 logs_repo: Arc<LogsRepository>,
20
21 scope: fasync::Scope,
23}
24
25impl LogStreamServer {
26 pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
27 Self { logs_repo, scope }
28 }
29
30 pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
32 let logs_repo = Arc::clone(&self.logs_repo);
33 let scope = self.scope.to_handle();
34 self.scope.spawn(async move {
35 if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
36 warn!("error handling Log requests: {}", e);
37 }
38 });
39 }
40
41 async fn handle_requests(
44 logs_repo: Arc<LogsRepository>,
45 mut stream: fdiagnostics::LogStreamRequestStream,
46 scope: fasync::ScopeHandle,
47 ) -> Result<(), LogsError> {
48 while let Some(request) = stream.next().await {
49 let request = request.map_err(|source| LogsError::HandlingRequests {
50 protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
51 source,
52 })?;
53
54 match request {
55 fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
56 let logs = logs_repo.logs_cursor_raw(
57 opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
58 None,
59 ftrace::Id::random(),
60 );
61 let opts = ExtendRecordOpts::from(opts);
62 scope.spawn(Self::stream_logs(fasync::Socket::from_socket(socket), logs, opts));
63 }
64 fdiagnostics::LogStreamRequest::_UnknownMethod {
65 ordinal,
66 method_type,
67 control_handle,
68 ..
69 } => {
70 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
71 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
72 }
73 }
74 }
75 Ok(())
76 }
77
78 async fn stream_logs(
79 mut socket: fasync::Socket,
80 mut logs: impl Stream<Item = CursorItem> + Unpin,
81 opts: ExtendRecordOpts,
82 ) {
83 while let Some(CursorItem { rolled_out, message, identity }) = logs.next().await {
84 let response = extend_fxt_record(message.bytes(), identity.as_ref(), rolled_out, &opts);
85 let result = socket.write_all(&response).await;
86 if result.is_err() {
87 break;
89 }
90 }
91 }
92}
93
94#[derive(Default)]
95pub struct ExtendRecordOpts {
96 pub moniker: bool,
97 pub component_url: bool,
98 pub rolled_out: bool,
99}
100
101impl ExtendRecordOpts {
102 fn should_extend(&self) -> bool {
103 let Self { moniker, component_url, rolled_out } = self;
104 *moniker || *component_url || *rolled_out
105 }
106}
107
108impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
109 fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
110 let fdiagnostics::LogStreamOptions {
111 include_moniker,
112 include_component_url,
113 include_rolled_out,
114 mode: _,
115 __source_breaking: _,
116 } = opts;
117 Self {
118 moniker: include_moniker.unwrap_or(false),
119 component_url: include_component_url.unwrap_or(false),
120 rolled_out: include_rolled_out.unwrap_or(false),
121 }
122 }
123}
124
125fn pad_to_8(len: usize) -> usize {
127 (len + 7) & !7
128}
129
130pub fn extend_fxt_record<'a>(
131 fxt_record: &'a [u8],
132 identity: &ComponentIdentity,
133 rolled_out: u64,
134 opts: &ExtendRecordOpts,
135) -> Cow<'a, [u8]> {
136 if !opts.should_extend() {
137 return Cow::Borrowed(fxt_record);
138 }
139
140 let moniker_str = if opts.moniker { Some(identity.moniker.to_string()) } else { None };
141 let moniker = moniker_str.as_deref().unwrap_or("");
142 let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
143 let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
144
145 let moniker_len = moniker.len() as u32;
146 let component_url_len = component_url.len() as u32;
147
148 let moniker_padded_len = pad_to_8(moniker_len as usize);
149 let component_url_padded_len = pad_to_8(component_url_len as usize);
150
151 let mut extended_buffer =
152 Vec::with_capacity(fxt_record.len() + 16 + moniker_padded_len + component_url_padded_len);
153 extended_buffer.extend_from_slice(fxt_record);
154
155 extended_buffer.extend_from_slice(&moniker_len.to_le_bytes());
156 extended_buffer.extend_from_slice(&component_url_len.to_le_bytes());
157 extended_buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
158
159 extended_buffer.extend_from_slice(moniker.as_bytes());
160
161 extended_buffer.resize(extended_buffer.len() + moniker_padded_len - moniker_len as usize, 0);
164
165 extended_buffer.extend_from_slice(component_url.as_bytes());
166 extended_buffer
167 .resize(extended_buffer.len() + component_url_padded_len - component_url_len as usize, 0);
168
169 Cow::Owned(extended_buffer)
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use diagnostics_log_encoding::Argument;
176 use diagnostics_log_encoding::encode::{
177 Encoder, EncoderOpts, MutableBuffer, TestRecord, WriteEventParams,
178 };
179 use diagnostics_log_encoding::parse::parse_record;
180 use diagnostics_log_types::Severity;
181 use std::io::Cursor;
182 use test_case::test_case;
183
184 #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
185 #[test_case(
186 ExtendRecordOpts { moniker: true, ..Default::default() },
187 "UNKNOWN",
188 "",
189 0
190 ; "with_moniker")]
191 #[test_case(
192 ExtendRecordOpts { component_url: true, ..Default::default() },
193 "",
194 "fuchsia-pkg://UNKNOWN",
195 0
196 ; "with_url")]
197 #[test_case(
198 ExtendRecordOpts { rolled_out: true, ..Default::default() },
199 "",
200 "",
201 42
202 ; "with_rolled_out")]
203 #[test_case(
204 ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true },
205 "UNKNOWN",
206 "fuchsia-pkg://UNKNOWN",
207 42
208 ; "with_all")]
209 #[fuchsia::test]
210 fn extend_record_with_metadata(
211 opts: ExtendRecordOpts,
212 expected_moniker: &str,
213 expected_url: &str,
214 expected_rolled_out: u64,
215 ) {
216 let mut encoder = Encoder::new(Cursor::new([0u8; 4096]), EncoderOpts::default());
217 encoder
218 .write_event(WriteEventParams::<_, &str, _> {
219 event: TestRecord {
220 severity: Severity::Warn as u8,
221 timestamp: zx::BootInstant::from_nanos(1234567890),
222 file: Some("foo.rs"),
223 line: Some(123),
224 record_arguments: vec![Argument::tag("hello"), Argument::message("testing")],
225 },
226 tags: &[],
227 metatags: std::iter::empty(),
228 pid: zx::Koid::from_raw(1),
229 tid: zx::Koid::from_raw(2),
230 dropped: 10,
231 })
232 .expect("wrote event");
233
234 let length = encoder.inner().cursor();
235 let original_record_bytes = &encoder.inner().get_ref()[..length];
236 let (expected_record, _) = parse_record(original_record_bytes).unwrap();
237
238 let extended_record_bytes =
239 extend_fxt_record(original_record_bytes, &ComponentIdentity::unknown(), 42, &opts);
240
241 if !opts.should_extend() {
242 assert_eq!(extended_record_bytes, original_record_bytes);
243 return;
244 }
245
246 let (record, rest) = parse_record(&extended_record_bytes).unwrap();
247 assert_eq!(record, expected_record);
248
249 let moniker_len = u32::from_le_bytes(rest[0..4].try_into().unwrap()) as usize;
250 let component_url_len = u32::from_le_bytes(rest[4..8].try_into().unwrap()) as usize;
251
252 let rolled_out = u64::from_le_bytes(rest[8..16].try_into().unwrap());
253 if opts.rolled_out {
254 assert_eq!(rolled_out, expected_rolled_out);
255 } else {
256 assert_eq!(rolled_out, 0);
257 }
258
259 let mut offset = 16;
260 let moniker = std::str::from_utf8(&rest[offset..offset + moniker_len]).unwrap();
261 assert_eq!(moniker, expected_moniker);
262 let moniker_padded_len = (moniker_len + 7) & !7;
263 offset += moniker_padded_len;
264
265 let url = std::str::from_utf8(&rest[offset..offset + component_url_len]).unwrap();
266 assert_eq!(url, expected_url);
267 let component_url_padded_len = (component_url_len + 7) & !7;
268 offset += component_url_padded_len;
269
270 assert_eq!(offset, rest.len());
271 }
272}