1use crate::identity::ComponentIdentity;
5use crate::logs::error::LogsError;
6use crate::logs::repository::LogsRepository;
7use crate::logs::shared_buffer::FxtMessage;
8use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
9use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
10use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker, RequestStream};
11use fidl_fuchsia_diagnostics::StreamMode;
12use fidl_fuchsia_diagnostics_types::Severity;
13use futures::{AsyncWriteExt, Stream, StreamExt};
14use log::warn;
15use std::collections::HashMap;
16use std::io::Cursor;
17use std::pin::pin;
18use std::sync::Arc;
19use zerocopy::{FromBytes, IntoBytes};
20use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync};
21
22#[derive(thiserror::Error, Debug)]
23enum StreamError {
24 #[error(transparent)]
25 Io(#[from] std::io::Error),
26}
27
28pub struct LogStreamServer {
29 logs_repo: Arc<LogsRepository>,
31
32 scope: fasync::Scope,
34}
35
36impl LogStreamServer {
37 pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
38 Self { logs_repo, scope }
39 }
40
41 pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
43 let logs_repo = Arc::clone(&self.logs_repo);
44 let scope = self.scope.to_handle();
45 self.scope.spawn(async move {
46 if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
47 warn!("error handling Log requests: {}", e);
48 }
49 });
50 }
51
52 async fn handle_requests(
55 logs_repo: Arc<LogsRepository>,
56 mut stream: fdiagnostics::LogStreamRequestStream,
57 scope: fasync::ScopeHandle,
58 ) -> Result<(), LogsError> {
59 while let Some(request) = stream.next().await {
60 let request = request.map_err(|source| LogsError::HandlingRequests {
61 protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
62 source,
63 })?;
64
65 match request {
66 fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
67 let logs = logs_repo.logs_cursor_raw(
68 opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
69 Vec::new(),
70 );
71 let opts = ExtendRecordOpts::from(opts);
72 if opts.subscribe_to_manifest {
73 if opts.moniker || opts.component_url || opts.rolled_out {
74 stream.control_handle().shutdown_with_epitaph(zx::Status::INVALID_ARGS);
75 return Ok(());
76 }
77
78 scope.spawn(async move {
79 let _ = Self::stream_logs_with_manifest(
80 fasync::Socket::from_socket(socket),
81 logs,
82 )
83 .await;
84 });
85 } else {
86 scope.spawn(Self::stream_logs(
87 fasync::Socket::from_socket(socket),
88 logs,
89 opts,
90 ));
91 }
92 }
93 fdiagnostics::LogStreamRequest::_UnknownMethod {
94 ordinal,
95 method_type,
96 control_handle,
97 ..
98 } => {
99 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
100 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
101 }
102 }
103 }
104 Ok(())
105 }
106
107 async fn stream_logs(
108 mut socket: fasync::Socket,
109 logs: impl Stream<Item = FxtMessage>,
110 opts: ExtendRecordOpts,
111 ) {
112 let mut logs = pin!(logs);
113 let mut buffer = Vec::new();
114 while let Some(message) = logs.next().await {
115 buffer.clear();
116 buffer.extend_from_slice(message.data());
117 extend_fxt_record(message.component_identity(), message.dropped(), &opts, &mut buffer);
118 let result = socket.write_all(&buffer).await;
119 if result.is_err() {
120 break;
122 }
123 }
124 }
125
126 async fn stream_logs_with_manifest(
127 mut socket: fasync::Socket,
128 logs: impl Stream<Item = FxtMessage>,
129 ) -> Result<(), StreamError> {
130 let mut logs = pin!(logs);
131 let mut sent_tags = HashMap::new();
132 while let Some(message) = logs.next().await {
133 let tag = message.tag();
134 match sent_tags.entry(tag) {
135 std::collections::hash_map::Entry::Vacant(e) => {
136 let identity = message.component_identity();
137 Self::send_component_change(&mut socket, tag, identity).await?;
138 e.insert(Arc::clone(message.component_identity()));
139 }
140 std::collections::hash_map::Entry::Occupied(mut e) => {
141 let identity = message.component_identity();
142 if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
143 Self::send_component_change(&mut socket, tag, identity).await?;
144 e.insert(Arc::clone(message.component_identity()));
145 }
146 }
147 }
148 socket.write_all(message.data()).await?;
149 }
150 Ok(())
151 }
152
153 async fn send_component_change(
154 socket: &mut fasync::Socket,
155 id: u64,
156 identity: &ComponentIdentity,
157 ) -> Result<(), std::io::Error> {
158 let mut encoder =
159 Encoder::new(Cursor::new(ResizableBuffer::from(Vec::new())), EncoderOpts::default());
160 let record = Record {
161 timestamp: zx::BootInstant::from_nanos(0),
162 severity: Severity::Info.into_primitive(),
163 arguments: vec![
164 Argument::other("moniker", identity.moniker.to_string()),
165 Argument::other("url", identity.url.as_str()),
166 ],
167 };
168 encoder.write_record(record).map_err(std::io::Error::other)?;
169
170 let mut buffer = encoder.take().into_inner().into_inner();
171 if buffer.len() >= 8 {
172 let mut header = Header::read_from_bytes(&buffer[0..8]).unwrap();
173 header.set_tag((id as u32) | LOG_CONTROL_BIT);
174 buffer[0..8].copy_from_slice(header.as_bytes());
175 }
176 socket.write_all(&buffer).await?;
177 Ok(())
178 }
179}
180
181#[derive(Default)]
182pub struct ExtendRecordOpts {
183 pub moniker: bool,
184 pub component_url: bool,
185 pub rolled_out: bool,
186 pub subscribe_to_manifest: bool,
187}
188
189impl ExtendRecordOpts {
190 fn should_extend(&self) -> bool {
191 let Self { moniker, component_url, rolled_out, subscribe_to_manifest: _ } = self;
192 *moniker || *component_url || *rolled_out
193 }
194}
195
196impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
197 fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
198 let fdiagnostics::LogStreamOptions {
199 include_moniker,
200 include_component_url,
201 include_rolled_out,
202 mode: _,
203 __source_breaking: _,
204 subscribe_to_manifest,
205 } = opts;
206 Self {
207 moniker: include_moniker.unwrap_or(false),
208 component_url: include_component_url.unwrap_or(false),
209 rolled_out: include_rolled_out.unwrap_or(false),
210 subscribe_to_manifest: subscribe_to_manifest.unwrap_or(false),
211 }
212 }
213}
214
215fn padding(len: usize) -> &'static [u8] {
217 &[0; 8][(len + 7) % 8 + 1..]
218}
219
220pub fn extend_fxt_record(
221 identity: &ComponentIdentity,
222 rolled_out: u64,
223 opts: &ExtendRecordOpts,
224 buffer: &mut Vec<u8>,
225) {
226 if !opts.should_extend() {
227 return;
228 }
229
230 let moniker = if opts.moniker { identity.moniker.as_ref() } else { "" };
231 let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
232 let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
233
234 let moniker_len = moniker.len() as u32;
235 let component_url_len = component_url.len() as u32;
236
237 buffer.extend_from_slice(&moniker_len.to_le_bytes());
238 buffer.extend_from_slice(&component_url_len.to_le_bytes());
239 buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
240
241 buffer.extend_from_slice(moniker.as_bytes());
242 buffer.extend_from_slice(padding(moniker.len()));
243
244 buffer.extend_from_slice(component_url.as_bytes());
245 buffer.extend_from_slice(padding(component_url.len()));
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::logs::testing::make_message;
252 use diagnostics_log_encoding::Value;
253 use diagnostics_log_encoding::parse::parse_record;
254 use futures::AsyncReadExt;
255 use moniker::ExtendedMoniker;
256 use test_case::test_case;
257 use zx;
258
259 #[fuchsia::test]
260 async fn log_stream_with_manifest() {
261 let repo = LogsRepository::for_test(fasync::Scope::new());
262 let identity = Arc::new(ComponentIdentity::new(
263 ExtendedMoniker::parse_str("./foo").unwrap(),
264 "fuchsia-pkg://foo",
265 ));
266 let container = repo.get_log_container(Arc::clone(&identity));
267 let container_tag = container.buffer().iob_tag() as u32;
268
269 let scope = fasync::Scope::new();
270 let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
271 let (proxy, stream) =
272 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
273 server.spawn(stream);
274
275 let (client_socket, server_socket) = zx::Socket::create_stream();
276 let mut client_socket = fasync::Socket::from_socket(client_socket);
277
278 let opts = fdiagnostics::LogStreamOptions {
279 subscribe_to_manifest: Some(true),
280 mode: Some(StreamMode::SnapshotThenSubscribe),
281 ..Default::default()
282 };
283 proxy.connect(server_socket, &opts).expect("connect");
284
285 container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
288
289 let mut buf = vec![0u8; 4096];
290 let mut offset = 0;
291
292 let (manifest_record, manifest_len) = loop {
294 if offset > 0
295 && let Ok((record, rest)) = parse_record(&buf[..offset])
296 {
297 let len = offset - rest.len();
298 break (record, len);
299 }
300 let n = client_socket.read(&mut buf[offset..]).await.expect("read");
301 assert!(n > 0, "socket closed before receiving manifest");
302 offset += n;
303 };
304
305 assert_eq!(manifest_record.arguments[0].name(), "moniker");
307 assert_eq!(manifest_record.arguments[0].value(), Value::Text("foo".into()));
308 assert_eq!(manifest_record.arguments[1].name(), "url");
309 assert_eq!(manifest_record.arguments[1].value(), Value::Text("fuchsia-pkg://foo".into()));
310
311 let (log_record, _log_len) = loop {
313 if offset > manifest_len
314 && let Ok((record, rest)) = parse_record(&buf[manifest_len..offset])
315 {
316 let len = offset - manifest_len - rest.len();
317 break (record, len);
318 }
319 let n = client_socket.read(&mut buf[offset..]).await.expect("read");
320 assert!(n > 0, "socket closed before receiving log record");
321 offset += n;
322 };
323
324 let header1 = Header::read_from_bytes(&buf[0..8]).unwrap();
326 let tag_id = header1.tag();
327 assert_ne!(tag_id & LOG_CONTROL_BIT, 0, "Manifest should have LOG_CONTROL_BIT set");
328
329 let header2 = Header::read_from_bytes(&buf[manifest_len..manifest_len + 8]).unwrap();
331 assert_eq!(
332 header2.tag() & LOG_CONTROL_BIT,
333 0,
334 "Log record should NOT have LOG_CONTROL_BIT set"
335 );
336 assert_eq!(
337 header2.tag(),
338 tag_id & !LOG_CONTROL_BIT,
339 "Log record tag should match Manifest tag ID"
340 );
341 assert_eq!(header2.tag(), container_tag, "Log record tag ID should equal IOB tag ID");
342
343 assert_eq!(log_record.arguments[2].value(), Value::Text("a".into()));
344 }
345
346 #[fuchsia::test]
347 async fn log_stream_with_manifest_reused_tag() {
348 use crate::logs::shared_buffer::create_ring_buffer;
349 let repo = LogsRepository::new(
351 create_ring_buffer(65536),
352 std::iter::empty(),
353 &Default::default(),
354 fasync::Scope::new(),
355 );
356
357 let scope = fasync::Scope::new();
358 let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
359 let (proxy, stream) =
360 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
361 server.spawn(stream);
362
363 let (client_socket, server_socket) = zx::Socket::create_stream();
364 let mut client_socket = fasync::Socket::from_socket(client_socket);
365
366 let opts = fdiagnostics::LogStreamOptions {
367 subscribe_to_manifest: Some(true),
368 mode: Some(StreamMode::SnapshotThenSubscribe),
369 ..Default::default()
370 };
371 proxy.connect(server_socket, &opts).expect("connect");
372
373 let identity_a = Arc::new(ComponentIdentity::new(
375 ExtendedMoniker::parse_str("./foo").unwrap(),
376 "fuchsia-pkg://foo",
377 ));
378 let container_a = repo.get_log_container(Arc::clone(&identity_a));
379 let tag_a = container_a.buffer().iob_tag();
380
381 container_a.ingest_message(make_message("msg_a", None, zx::BootInstant::from_nanos(1)));
383
384 let mut buf = vec![0u8; 65536];
385 let mut offset = 0;
386
387 async fn read_one_record(
389 socket: &mut fasync::Socket,
390 buf: &mut [u8],
391 offset: &mut usize,
392 ) -> (diagnostics_log_encoding::Record<'static>, usize) {
393 loop {
394 if *offset > 0
395 && let Ok((record, rest)) = parse_record(&buf[..*offset])
396 {
397 let len = *offset - rest.len();
398 let owned_record = record.into_owned();
399 buf.copy_within(len..*offset, 0);
401 *offset -= len;
402 return (owned_record, len);
404 }
405 let n = socket.read(&mut buf[*offset..]).await.expect("read");
406 assert!(n > 0, "socket closed unexpectedly");
407 *offset += n;
408 }
409 }
410
411 let (manifest_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
413 assert_eq!(manifest_a.arguments[0].value(), Value::Text("foo".into()));
414
415 let (log_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
416 assert_eq!(log_a.arguments[2].value(), Value::Text("msg_a".into()));
417
418 container_a.mark_stopped();
420 drop(container_a);
421
422 let identity_filler = Arc::new(ComponentIdentity::new(
424 ExtendedMoniker::parse_str("./filler").unwrap(),
425 "fuchsia-pkg://filler",
426 ));
427 let container_filler = repo.get_log_container(Arc::clone(&identity_filler));
428
429 let identity_b = Arc::new(ComponentIdentity::new(
430 ExtendedMoniker::parse_str("./bar").unwrap(),
431 "fuchsia-pkg://bar",
432 ));
433
434 let mut container_b;
435 loop {
436 container_filler.ingest_message(make_message(
438 "fill",
439 None,
440 zx::BootInstant::from_nanos(1),
441 ));
442
443 loop {
447 let mut temp_buf = [0u8; 1024];
448 let read_fut = client_socket.read(&mut temp_buf);
450 match futures::poll!(read_fut) {
451 std::task::Poll::Ready(Ok(n)) if n > 0 => {
452 }
456 _ => break, }
458 }
459
460 container_b = repo.get_log_container(Arc::clone(&identity_b));
462 if container_b.buffer().iob_tag() == tag_a {
463 break;
464 }
465
466 container_b.mark_stopped();
468 drop(container_b);
469
470 fasync::Timer::new(std::time::Duration::from_millis(10)).await;
472 }
473
474 container_b.ingest_message(make_message("msg_b", None, zx::BootInstant::from_nanos(2)));
476
477 loop {
486 let (record, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
487
488 if !record.arguments.is_empty()
490 && record.arguments[0].name() == "moniker"
491 && record.arguments[0].value() == Value::Text("bar".into())
492 {
493 break;
495 }
496 }
498
499 let (log_b, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
501 assert_eq!(log_b.arguments[2].value(), Value::Text("msg_b".into()));
502 }
503
504 #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
505 #[test_case(
506 ExtendRecordOpts { moniker: true, ..Default::default() },
507 "UNKNOWN",
508 "",
509 0
510 ; "with_moniker")]
511 #[test_case(
512 ExtendRecordOpts { component_url: true, ..Default::default() },
513 "",
514 "fuchsia-pkg://UNKNOWN",
515 0
516 ; "with_url")]
517 #[test_case(
518 ExtendRecordOpts { rolled_out: true, ..Default::default() },
519 "",
520 "",
521 42
522 ; "with_rolled_out")]
523 #[test_case(
524 ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true, subscribe_to_manifest: false },
525 "UNKNOWN",
526 "fuchsia-pkg://UNKNOWN",
527 42
528 ; "with_all")]
529 #[fuchsia::test]
530 fn extend_record_with_metadata(
531 opts: ExtendRecordOpts,
532 expected_moniker: &str,
533 expected_url: &str,
534 expected_rolled_out: u64,
535 ) {
536 let mut buffer = Vec::new();
537 extend_fxt_record(&ComponentIdentity::unknown(), 42, &opts, &mut buffer);
538
539 if !opts.should_extend() {
540 assert!(buffer.is_empty());
541 return;
542 }
543
544 let moniker_len = u32::from_le_bytes(buffer[0..4].try_into().unwrap()) as usize;
545 let component_url_len = u32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize;
546
547 let rolled_out = u64::from_le_bytes(buffer[8..16].try_into().unwrap());
548 if opts.rolled_out {
549 assert_eq!(rolled_out, expected_rolled_out);
550 } else {
551 assert_eq!(rolled_out, 0);
552 }
553
554 let mut offset = 16;
555 let moniker = std::str::from_utf8(&buffer[offset..offset + moniker_len]).unwrap();
556 assert_eq!(moniker, expected_moniker);
557 let moniker_padded_len = (moniker_len + 7) & !7;
558 offset += moniker_padded_len;
559
560 let url = std::str::from_utf8(&buffer[offset..offset + component_url_len]).unwrap();
561 assert_eq!(url, expected_url);
562 let component_url_padded_len = (component_url_len + 7) & !7;
563 offset += component_url_padded_len;
564
565 assert_eq!(offset, buffer.len());
566 }
567}