1use crate::identity::ComponentIdentity;
6use crate::logs::stats::LogStreamStats;
7use crate::logs::stored_message::StoredMessage;
8use derivative::Derivative;
9use diagnostics_log_encoding::{Header, FXT_HEADER_SIZE, TRACING_FORMAT_LOG_RECORD_TYPE};
10use fidl_fuchsia_diagnostics::StreamMode;
11use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
12use fuchsia_async as fasync;
13use fuchsia_async::condition::{Condition, WakerEntry};
14use fuchsia_sync::Mutex;
15use futures::Stream;
16use log::debug;
17use pin_project::{pin_project, pinned_drop};
18use std::ops::{Deref, DerefMut, Range};
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use zerocopy::FromBytes;
23use zx::AsHandleRef as _;
24
25const MIN_BUFFER_SIZE: usize = (MAX_DATAGRAM_LEN_BYTES * 2) as usize;
28
29pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
30
31pub struct SharedBuffer {
32 inner: Condition<InnerAndSockets>,
33
34 on_inactive: OnInactive,
36
37 port: zx::Port,
39
40 event: zx::Event,
42
43 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
45}
46
47struct InnerAndSockets(Inner, Slab<Socket>);
50
51impl Deref for InnerAndSockets {
52 type Target = Inner;
53 fn deref(&self) -> &Inner {
54 &self.0
55 }
56}
57
58impl DerefMut for InnerAndSockets {
59 fn deref_mut(&mut self) -> &mut Inner {
60 &mut self.0
61 }
62}
63
64struct Inner {
65 buffer: Buffer,
67
68 head: u64,
71
72 tail: u64,
74
75 containers: Containers,
77}
78
79impl SharedBuffer {
80 pub fn new(capacity: usize, on_inactive: OnInactive) -> Arc<Self> {
81 let this = Arc::new(Self {
82 inner: Condition::new(InnerAndSockets(
83 Inner {
84 buffer: Buffer::new(capacity),
85 head: 0,
86 tail: 0,
87 containers: Containers::default(),
88 },
89 Slab::default(),
90 )),
91 on_inactive,
92 port: zx::Port::create(),
93 event: zx::Event::create(),
94 socket_thread: Mutex::default(),
95 });
96 *this.socket_thread.lock() = Some({
97 let this = Arc::clone(&this);
98 let ehandle = fasync::EHandle::local();
99 std::thread::spawn(move || this.socket_thread(ehandle))
100 });
101 this
102 }
103
104 pub fn new_container_buffer(
105 self: &Arc<Self>,
106 identity: Arc<ComponentIdentity>,
107 stats: Arc<LogStreamStats>,
108 ) -> ContainerBuffer {
109 ContainerBuffer {
110 shared_buffer: Arc::clone(self),
111 container_id: self.inner.lock().containers.new_container(identity, stats),
112 }
113 }
114
115 #[cfg(test)]
117 pub fn container_count(&self) -> usize {
118 self.inner.lock().0.containers.len()
119 }
120
121 pub async fn terminate(&self) {
123 self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
124 let join_handle = self.socket_thread.lock().take().unwrap();
125 fasync::unblock(|| {
126 let _ = join_handle.join();
127 })
128 .await;
129 }
130
131 fn socket_thread(&self, ehandle: fasync::EHandle) {
132 let mut sockets_ready = Vec::new();
133
134 const TERMINATE_KEY: u64 = u64::MAX;
136 self.event
137 .wait_async_handle(
138 &self.port,
139 TERMINATE_KEY,
140 zx::Signals::USER_0,
141 zx::WaitAsyncOpts::empty(),
142 )
143 .unwrap();
144
145 let mut terminate = false;
146 let mut finished = false;
147 let mut on_inactive = OnInactiveNotifier::new(self);
148
149 while !finished {
150 let mut deadline = if terminate {
151 finished = true;
153 zx::MonotonicInstant::INFINITE_PAST
154 } else {
155 #[cfg(not(debug_assertions))]
158 let _ = self.event.wait_handle(
159 zx::Signals::USER_0,
160 zx::MonotonicInstant::after(zx::Duration::from_millis(200)),
161 );
162
163 zx::MonotonicInstant::INFINITE
164 };
165
166 loop {
168 match self.port.wait(deadline) {
169 Ok(packet) => {
170 if packet.key() == TERMINATE_KEY {
171 terminate = true;
172 } else {
173 sockets_ready.push(SocketId(packet.key() as u32))
174 }
175 }
176 Err(zx::Status::TIMED_OUT) => break,
177 Err(status) => panic!("port wait error: {status:?}"),
178 }
179 deadline = zx::MonotonicInstant::INFINITE_PAST;
180 }
181
182 let mut inner = self.inner.lock();
183
184 {
185 let InnerAndSockets(inner, sockets) = &mut *inner;
186
187 for socket_id in sockets_ready.drain(..) {
188 let Some(socket) = sockets.get(socket_id.0) else {
189 continue;
191 };
192
193 if inner.read_socket(socket, &mut on_inactive) {
194 let container_id = socket.container_id;
195 if let Some(container) = inner.containers.get_mut(container_id) {
196 container.remove_socket(socket_id, sockets);
197 if !container.is_active() {
198 if container.should_free() {
199 inner.containers.free(container_id);
200 } else {
201 on_inactive.push(&container.identity);
202 }
203 }
204 }
205 } else {
206 socket
207 .socket
208 .wait_async_handle(
209 &self.port,
210 socket_id.0 as u64,
211 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
212 zx::WaitAsyncOpts::empty(),
213 )
214 .unwrap();
215 }
216 }
217 }
218
219 let wake_tasks = || {
220 for waker in inner.drain_wakers() {
221 waker.wake();
222 }
223
224 std::mem::drop(inner);
225
226 on_inactive.notify();
227 };
228
229 if cfg!(test) {
230 wake_tasks()
232 } else {
233 ehandle.poll_tasks(wake_tasks);
236 }
237 }
238 }
239}
240
241impl Inner {
242 fn ingest(
244 &mut self,
245 msg: &[u8],
246 container_id: ContainerId,
247 on_inactive: &mut OnInactiveNotifier<'_>,
248 ) {
249 if msg.len() < std::mem::size_of::<Header>() {
250 debug!("message too short ({})", msg.len());
251 if let Some(container) = self.containers.get(container_id) {
252 container.stats.increment_invalid(msg.len());
253 }
254 return;
255 }
256
257 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
258
259 let msg_len = header.size_words() as usize * 8;
262
263 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
265 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
266 if let Some(container) = self.containers.get(container_id) {
267 container.stats.increment_invalid(msg.len());
268 }
269 return;
270 }
271
272 let mut space = self.space();
274 let total_len = msg_len + FXT_HEADER_SIZE;
275
276 while space < total_len {
277 space += self.pop(on_inactive).expect("No messages in buffer!");
278 }
279
280 let Some(container_info) = self.containers.get_mut(container_id) else {
281 return;
282 };
283
284 if container_info.msg_ids.end == container_info.msg_ids.start {
285 container_info.first_index = self.head;
286 }
287
288 let msg_id = container_info.msg_ids.end;
289 container_info.msg_ids.end += 1;
290
291 let (container_msg_id, remaining) =
292 u64::mut_from_prefix(self.buffer.slice_from_index_mut(self.head)).unwrap();
293 *container_msg_id = ((container_id.0 as u64) << 32) | (msg_id & 0xffff_ffff);
294 remaining[..msg_len].copy_from_slice(&msg[..msg_len]);
295
296 self.head += total_len as u64;
297 }
298
299 fn space(&self) -> usize {
300 self.buffer.len() - (self.head - self.tail) as usize
301 }
302
303 fn ensure_space(&mut self, required: usize, on_inactive: &mut OnInactiveNotifier<'_>) {
304 let mut space = self.space();
305 while space < required {
306 space += self.pop(on_inactive).expect("No messages in buffer!");
307 }
308 }
309
310 fn pop(&mut self, on_inactive: &mut OnInactiveNotifier<'_>) -> Option<usize> {
317 if self.head == self.tail {
318 return None;
319 }
320 let (container_id, msg_id, record, timestamp) = self.parse_record(self.tail);
321 let total_len = record.len() + FXT_HEADER_SIZE;
322 self.tail += total_len as u64;
323 let container = self.containers.get_mut(container_id).unwrap();
324 container.stats.increment_rolled_out(total_len);
325 assert_eq!(container.msg_ids.start as u32, msg_id);
326 container.msg_ids.start += 1;
327 if let Some(timestamp) = timestamp {
328 container.last_rolled_out_timestamp = timestamp;
329 }
330 if !container.is_active() {
331 if container.should_free() {
332 self.containers.free(container_id);
333 } else {
334 on_inactive.push(&container.identity);
335 }
336 }
337 Some(total_len)
338 }
339
340 fn parse_record(&self, index: u64) -> (ContainerId, u32, &[u8], Option<zx::BootInstant>) {
342 let buf = self.buffer.slice_from_index(index);
343 let (container_msg_id, msg) = u64::read_from_prefix(buf).unwrap();
344 let (header, remainder) = u64::read_from_prefix(msg).unwrap();
345 let header = Header(header);
346 let record_len = header.size_words() as usize * 8;
347 (
348 ContainerId((container_msg_id >> 32) as u32),
349 container_msg_id as u32,
350 &msg[..record_len],
351 i64::read_from_prefix(remainder).map(|(t, _)| zx::BootInstant::from_nanos(t)).ok(),
352 )
353 }
354
355 fn read_socket(&mut self, socket: &Socket, on_inactive: &mut OnInactiveNotifier<'_>) -> bool {
357 loop {
358 self.ensure_space(FXT_HEADER_SIZE + MAX_DATAGRAM_LEN_BYTES as usize, on_inactive);
359
360 let dest = self.buffer.slice_from_index_mut(self.head);
361
362 let amount_read = match socket
364 .socket
365 .read(&mut dest[FXT_HEADER_SIZE..FXT_HEADER_SIZE + MAX_DATAGRAM_LEN_BYTES as usize])
366 {
367 Ok(a) => a,
368 Err(zx::Status::SHOULD_WAIT) => return false,
369 Err(_) => return true,
370 };
371
372 let Some(container) = self.containers.get_mut(socket.container_id) else {
373 return true;
375 };
376
377 if amount_read < 8 {
378 container.stats.increment_invalid(amount_read);
379 continue;
380 }
381 let header = Header::read_from_bytes(
382 &dest[FXT_HEADER_SIZE..FXT_HEADER_SIZE + std::mem::size_of::<Header>()],
383 )
384 .unwrap();
385 let msg_len = header.size_words() as usize * 8;
386 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != amount_read {
387 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, amount_read);
388 container.stats.increment_invalid(amount_read);
389 continue;
390 }
391
392 container.stats.ingest_message(amount_read, header.severity().into());
393
394 if container.msg_ids.end == container.msg_ids.start {
395 container.first_index = self.head;
396 }
397
398 let msg_id = container.msg_ids.end;
399 container.msg_ids.end += 1;
400
401 let (container_msg_id, _) = u64::mut_from_prefix(dest).unwrap();
405 *container_msg_id = ((socket.container_id.0 as u64) << 32) | (msg_id & 0xffff_ffff);
406
407 self.head += (FXT_HEADER_SIZE + amount_read) as u64;
408 }
409 }
410}
411
412#[derive(Default)]
413struct Containers {
414 slab: Slab<ContainerInfo>,
415}
416
417#[derive(Clone, Copy, Debug)]
418struct ContainerId(u32);
419
420impl Containers {
421 #[cfg(test)]
422 fn len(&self) -> usize {
423 self.slab.len()
424 }
425
426 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
427 self.slab.get(id.0)
428 }
429
430 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
431 self.slab.get_mut(id.0)
432 }
433
434 fn new_container(
435 &mut self,
436 identity: Arc<ComponentIdentity>,
437 stats: Arc<LogStreamStats>,
438 ) -> ContainerId {
439 ContainerId(self.slab.insert(|_| ContainerInfo::new(identity, stats)))
440 }
441
442 fn free(&mut self, id: ContainerId) {
443 self.slab.free(id.0)
444 }
445}
446
447#[derive(Derivative)]
448#[derivative(Debug)]
449struct ContainerInfo {
450 refs: usize,
452
453 identity: Arc<ComponentIdentity>,
455
456 first_index: u64,
460
461 msg_ids: Range<u64>,
463
464 terminated: bool,
466
467 #[derivative(Debug = "ignore")]
469 stats: Arc<LogStreamStats>,
470
471 last_rolled_out_timestamp: zx::BootInstant,
473
474 first_socket_id: SocketId,
476}
477
478impl ContainerInfo {
479 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>) -> Self {
480 Self {
481 refs: 0,
482 identity,
483 first_index: 0,
484 msg_ids: 0..0,
485 terminated: false,
486 stats,
487 last_rolled_out_timestamp: zx::BootInstant::ZERO,
488 first_socket_id: SocketId::NULL,
489 }
490 }
491
492 fn should_free(&self) -> bool {
493 self.terminated && self.refs == 0 && !self.is_active()
494 }
495
496 fn is_active(&self) -> bool {
497 self.msg_ids.end != self.msg_ids.start || self.first_socket_id != SocketId::NULL
498 }
499
500 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
504 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
505 if prev == SocketId::NULL {
506 self.first_socket_id = next;
507 } else {
508 sockets.get_mut(prev.0).unwrap().next = next;
509 }
510 if next != SocketId::NULL {
511 sockets
512 .get_mut(next.0)
513 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
514 .prev = prev;
515 }
516 sockets.free(socket_id.0);
517 debug!(identity:% = self.identity; "Socket closed.");
518 }
519}
520
521pub struct ContainerBuffer {
522 shared_buffer: Arc<SharedBuffer>,
523 container_id: ContainerId,
524}
525
526impl ContainerBuffer {
527 pub fn push_back(&self, msg: &[u8]) {
531 let mut on_inactive = OnInactiveNotifier::new(&self.shared_buffer);
532 let mut inner = self.shared_buffer.inner.lock();
533
534 inner.ingest(msg, self.container_id, &mut on_inactive);
535
536 for waker in inner.drain_wakers() {
537 waker.wake();
538 }
539 }
540
541 pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
543 let mut inner = self.shared_buffer.inner.lock();
544 let head = inner.head;
545 let Some(container) = inner.containers.get_mut(self.container_id) else {
546 return None;
548 };
549
550 container.refs += 1;
551 let stats = Arc::clone(&container.stats);
552
553 let (index, next_id, end) = match mode {
554 StreamMode::Snapshot => {
555 (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
556 }
557 StreamMode::Subscribe => (head, container.msg_ids.end, CursorEnd::Stream),
558 StreamMode::SnapshotThenSubscribe => {
559 (container.first_index, container.msg_ids.start, CursorEnd::Stream)
560 }
561 };
562
563 Some(Cursor {
564 index,
565 container_id: self.container_id,
566 next_id,
567 end,
568 buffer: Arc::clone(&self.shared_buffer),
569 waker_entry: WakerEntry::new(),
570 stats,
571 })
572 }
573
574 pub fn terminate(&self) {
578 let mut guard = self.shared_buffer.inner.lock();
579 let InnerAndSockets(inner, sockets) = &mut *guard;
580 if let Some(container) = inner.containers.get_mut(self.container_id) {
581 container.terminated = true;
582 if container.first_socket_id != SocketId::NULL {
583 loop {
584 container.remove_socket(container.first_socket_id, sockets);
585 if container.first_socket_id == SocketId::NULL {
586 break;
587 }
588 }
589 }
590 if container.should_free() {
591 inner.containers.free(self.container_id);
592 }
593 for waker in guard.drain_wakers() {
594 waker.wake();
595 }
596 }
597 }
598
599 pub fn is_active(&self) -> bool {
601 self.shared_buffer.inner.lock().containers.get(self.container_id).is_some_and(|c| {
602 c.msg_ids.start != c.msg_ids.end || c.first_socket_id != SocketId::NULL
603 })
604 }
605
606 pub fn add_socket(&self, socket: zx::Socket) {
608 let mut guard = self.shared_buffer.inner.lock();
609 let InnerAndSockets(inner, sockets) = &mut *guard;
610 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
611 let next = container.first_socket_id;
612 let socket_id = SocketId(sockets.insert(|socket_id| {
613 socket
614 .wait_async_handle(
615 &self.shared_buffer.port,
616 socket_id as u64,
617 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
618 zx::WaitAsyncOpts::empty(),
619 )
620 .unwrap();
621 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
622 }));
623 if next != SocketId::NULL {
624 sockets.get_mut(next.0).unwrap().prev = socket_id;
625 }
626 container.first_socket_id = socket_id;
627 }
628}
629
630impl Drop for ContainerBuffer {
631 fn drop(&mut self) {
632 self.terminate();
633 }
634}
635
636#[pin_project(PinnedDrop)]
637#[derive(Derivative)]
638#[derivative(Debug)]
639pub struct Cursor {
640 index: u64,
642
643 container_id: ContainerId,
644
645 next_id: u64,
647
648 end: CursorEnd,
650
651 #[derivative(Debug = "ignore")]
652 buffer: Arc<SharedBuffer>,
653
654 #[pin]
656 #[derivative(Debug = "ignore")]
657 waker_entry: WakerEntry<InnerAndSockets>,
658
659 #[derivative(Debug = "ignore")]
660 stats: Arc<LogStreamStats>,
661}
662
663#[derive(Debug, PartialEq)]
665pub enum LazyItem<T> {
666 Next(Arc<T>),
668 ItemsRolledOut(u64, zx::BootInstant),
670}
671
672impl Stream for Cursor {
673 type Item = LazyItem<StoredMessage>;
674
675 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
676 let mut this = self.project();
677 let mut on_inactive = OnInactiveNotifier::new(this.buffer);
678 let mut guard = this.buffer.inner.lock();
679 let InnerAndSockets(inner, sockets) = &mut *guard;
680
681 let mut container = match inner.containers.get(*this.container_id) {
682 None => return Poll::Ready(None),
683 Some(container) => container,
684 };
685
686 let end_id = match &mut this.end {
687 CursorEnd::Snapshot(None) => {
688 let mut socket_id = container.first_socket_id;
691 while socket_id != SocketId::NULL {
692 let socket = sockets.get(socket_id.0).unwrap();
693 let next = socket.next;
694 if inner.read_socket(socket, &mut on_inactive) {
695 let container = inner.containers.get_mut(*this.container_id).unwrap();
696 container.remove_socket(socket_id, sockets);
697 if !container.is_active() {
698 on_inactive.push(&container.identity);
699 }
700 }
701 socket_id = next;
702 }
703
704 container = inner.containers.get(*this.container_id).unwrap();
705 *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
706 container.msg_ids.end
707 }
708 CursorEnd::Snapshot(Some(end)) => *end,
709 CursorEnd::Stream => u64::MAX,
710 };
711
712 if *this.next_id == end_id {
713 return Poll::Ready(None);
714 }
715
716 if container.msg_ids.start > *this.next_id {
718 let mut next_id = container.msg_ids.start;
719 if end_id < next_id {
720 next_id = end_id;
721 }
722 let rolled_out = next_id - *this.next_id;
723 *this.next_id = next_id;
724 return Poll::Ready(Some(LazyItem::ItemsRolledOut(
725 rolled_out,
726 container.last_rolled_out_timestamp,
727 )));
728 }
729
730 if inner.tail > *this.index {
731 *this.index = inner.tail;
732 }
733
734 if container.first_index > *this.index {
736 *this.index = container.first_index;
737 }
738 if *this.next_id == container.msg_ids.end {
739 *this.index = inner.head;
740 } else {
741 while *this.index < inner.head {
743 let (container_id, msg_id, record, _timestamp) = inner.parse_record(*this.index);
744 let record_len = record.len();
745
746 *this.index += record_len as u64 + 8;
748 assert!(*this.index <= inner.head);
749
750 if container_id.0 == this.container_id.0 {
751 assert_eq!(*this.next_id as u32, msg_id);
752 *this.next_id += 1;
753 if let Some(msg) = StoredMessage::new(record.into(), this.stats) {
754 return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
755 } else {
756 }
758 }
759 }
760 }
761
762 if container.terminated {
763 Poll::Ready(None)
764 } else {
765 guard.add_waker(this.waker_entry, cx.waker().clone());
766 Poll::Pending
767 }
768 }
769}
770
771#[pinned_drop]
772impl PinnedDrop for Cursor {
773 fn drop(self: Pin<&mut Self>) {
774 let mut inner = self.buffer.inner.lock();
775 if let Some(container) = inner.containers.get_mut(self.container_id) {
776 container.refs -= 1;
777 if container.should_free() {
778 inner.containers.free(self.container_id);
779 }
780 }
781 }
782}
783
784#[derive(Debug)]
785enum CursorEnd {
786 Snapshot(Option<u64>),
789 Stream,
790}
791
792struct Buffer {
794 _vmo: zx::Vmo,
795 _vmar: zx::Vmar,
796 base: usize,
797 len: usize,
798}
799
800impl Buffer {
801 fn new(capacity: usize) -> Self {
802 let capacity = std::cmp::max(
803 MIN_BUFFER_SIZE,
804 capacity.next_multiple_of(zx::system_get_page_size() as usize),
805 );
806
807 let vmo = zx::Vmo::create(capacity as u64).unwrap();
808 let name = zx::Name::new("LogBuffer").unwrap();
809 vmo.set_name(&name).unwrap();
810 let root_vmar = fuchsia_runtime::vmar_root_self();
811
812 let (vmar, base) = root_vmar
816 .allocate(
817 0,
818 capacity + 65536,
819 zx::VmarFlags::CAN_MAP_READ
820 | zx::VmarFlags::CAN_MAP_WRITE
821 | zx::VmarFlags::CAN_MAP_SPECIFIC,
822 )
823 .unwrap();
824 vmar.map(
825 0,
826 &vmo,
827 0,
828 capacity,
829 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
830 )
831 .unwrap();
832 vmar.map(
833 capacity,
834 &vmo,
835 0,
836 65536,
837 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
838 )
839 .unwrap();
840 Self { _vmo: vmo, _vmar: vmar, base, len: capacity }
841 }
842
843 fn len(&self) -> usize {
844 self.len
845 }
846
847 fn slice_from_index(&self, index: u64) -> &[u8] {
850 let index = index as usize % self.len;
851 unsafe { std::slice::from_raw_parts((self.base + index) as *const u8, 65536) }
853 }
854
855 fn slice_from_index_mut(&mut self, index: u64) -> &mut [u8] {
858 let index = index as usize % self.len;
859 unsafe { std::slice::from_raw_parts_mut((self.base + index) as *mut u8, 65536) }
861 }
862}
863
864struct Slab<T> {
866 slab: Vec<Slot<T>>,
867 free_index: usize,
868}
869
870impl<T> Default for Slab<T> {
871 fn default() -> Self {
872 Self { slab: Vec::new(), free_index: usize::MAX }
873 }
874}
875
876enum Slot<T> {
877 Used(T),
878 Free(usize),
879}
880
881impl<T> Slab<T> {
882 #[cfg(test)]
884 fn len(&self) -> usize {
885 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
886 }
887
888 fn free(&mut self, index: u32) {
889 let index = index as usize;
890 assert!(matches!(
891 std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)),
892 Slot::Used(_)
893 ));
894 self.free_index = index;
895 }
896
897 fn get(&self, id: u32) -> Option<&T> {
898 self.slab.get(id as usize).and_then(|s| match s {
899 Slot::Used(s) => Some(s),
900 _ => None,
901 })
902 }
903
904 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
905 self.slab.get_mut(id as usize).and_then(|s| match s {
906 Slot::Used(s) => Some(s),
907 _ => None,
908 })
909 }
910
911 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
912 let free_index = self.free_index;
913 if free_index != usize::MAX {
914 self.free_index = match std::mem::replace(
915 &mut self.slab[free_index],
916 Slot::Used(value(free_index as u32)),
917 ) {
918 Slot::Free(next) => next,
919 _ => unreachable!(),
920 };
921 free_index as u32
922 } else {
923 assert!(self.slab.len() < u32::MAX as usize);
926 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
927 (self.slab.len() - 1) as u32
928 }
929 }
930}
931
932#[derive(Clone, Copy, Debug, Eq, PartialEq)]
933struct SocketId(u32);
934
935impl SocketId {
936 const NULL: Self = SocketId(0xffff_ffff);
937}
938
939struct Socket {
940 socket: zx::Socket,
941 container_id: ContainerId,
942 prev: SocketId,
944 next: SocketId,
945}
946
947struct OnInactiveNotifier<'a>(&'a SharedBuffer, Vec<Arc<ComponentIdentity>>);
949
950impl<'a> OnInactiveNotifier<'a> {
951 fn new(buffer: &'a SharedBuffer) -> Self {
952 Self(buffer, Vec::new())
953 }
954
955 fn push(&mut self, identity: &Arc<ComponentIdentity>) {
956 self.1.push(Arc::clone(identity));
957 }
958
959 fn notify(&mut self) {
960 for identity in self.1.drain(..) {
961 (*self.0.on_inactive)(identity);
962 }
963 }
964}
965
966impl Drop for OnInactiveNotifier<'_> {
967 fn drop(&mut self) {
968 self.notify();
969 }
970}
971
972#[cfg(test)]
973mod tests {
974 use super::SharedBuffer;
975 use crate::logs::shared_buffer::LazyItem;
976 use crate::logs::stats::LogStreamStats;
977 use crate::logs::testing::make_message;
978 use assert_matches::assert_matches;
979 use diagnostics_assertions::{assert_data_tree, AnyProperty};
980 use fidl_fuchsia_diagnostics::StreamMode;
981 use fuchsia_async as fasync;
982 use fuchsia_inspect::{Inspector, InspectorConfig};
983 use fuchsia_inspect_derive::WithInspect;
984 use futures::channel::mpsc;
985 use futures::poll;
986 use futures::stream::{FuturesUnordered, StreamExt as _};
987 use std::pin::pin;
988 use std::sync::atomic::{AtomicU64, Ordering};
989 use std::sync::Arc;
990
991 #[fuchsia::test]
992 async fn push_one_message() {
993 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
994 let container_buffer =
995 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
996 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
997 container_buffer.push_back(msg.bytes());
998
999 let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1001 assert_eq!(
1002 cursor
1003 .map(|item| {
1004 match item {
1005 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1006 _ => panic!("Unexpected item {item:?}"),
1007 }
1008 })
1009 .count()
1010 .await,
1011 1
1012 );
1013 }
1014
1015 #[fuchsia::test]
1016 async fn message_too_short() {
1017 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1018
1019 let container_buffer =
1020 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1021 container_buffer.push_back(&[0]);
1022
1023 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1024 }
1025
1026 #[fuchsia::test]
1027 async fn bad_type() {
1028 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1029 let container_buffer =
1030 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1031 container_buffer.push_back(&[0x77; 16]);
1032
1033 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1034 }
1035
1036 #[fuchsia::test]
1037 async fn message_truncated() {
1038 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1039 let container_buffer =
1040 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1041 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1042 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1043
1044 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1045 }
1046
1047 #[fuchsia::test]
1048 async fn buffer_wrapping() {
1049 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1050 let container_buffer =
1051 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1052
1053 let msg1 = make_message("a", None, zx::BootInstant::from_nanos(1));
1055 container_buffer.push_back(msg1.bytes());
1056
1057 let delta = 32760 - msg1.bytes().len();
1060 let msg2 = make_message(
1061 std::str::from_utf8(&vec![66; 1 + delta]).unwrap(),
1062 None,
1063 zx::BootInstant::from_nanos(2),
1064 );
1065 container_buffer.push_back(msg2.bytes());
1066
1067 let delta = 65528 - 2 * (msg1.bytes().len() + 8) - (msg2.bytes().len() + 8);
1069 let msg3 = make_message(
1070 std::str::from_utf8(&vec![67; 1 + delta]).unwrap(),
1071 None,
1072 zx::BootInstant::from_nanos(3),
1073 );
1074 container_buffer.push_back(msg3.bytes());
1075
1076 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 3);
1077
1078 let msg4 = make_message("d", None, zx::BootInstant::from_nanos(4));
1080 container_buffer.push_back(msg4.bytes());
1081
1082 let mut expected = [msg2.bytes(), msg3.bytes(), msg4.bytes()].into_iter();
1083 assert_eq!(
1084 container_buffer
1085 .cursor(StreamMode::Snapshot)
1086 .unwrap()
1087 .map(|item| match item {
1088 LazyItem::Next(item) => assert_eq!(item.bytes(), expected.next().unwrap()),
1089 _ => panic!("Unexpected item {item:?}"),
1090 })
1091 .count()
1092 .await,
1093 3
1094 );
1095 }
1096
1097 #[fuchsia::test]
1098 async fn on_inactive() {
1099 let identity = Arc::new(vec!["a"].into());
1100 let on_inactive = Arc::new(AtomicU64::new(0));
1101 let buffer = {
1102 let on_inactive = Arc::clone(&on_inactive);
1103 let identity = Arc::clone(&identity);
1104 Arc::new(SharedBuffer::new(
1105 65536,
1106 Box::new(move |i| {
1107 assert_eq!(i, identity);
1108 on_inactive.fetch_add(1, Ordering::Relaxed);
1109 }),
1110 ))
1111 };
1112 let container_a = buffer.new_container_buffer(identity, Arc::default());
1113 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1114
1115 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1116 container_a.push_back(msg.bytes());
1117
1118 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1120 container_b.push_back(msg.bytes());
1121 }
1122
1123 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1124 }
1125
1126 #[fuchsia::test]
1127 async fn terminate_drops_container() {
1128 async {}.await;
1130
1131 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1132
1133 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1135 assert_eq!(buffer.container_count(), 1);
1136 container_a.terminate();
1137
1138 assert_eq!(buffer.container_count(), 0);
1139
1140 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1142 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1143 container_a.push_back(msg.bytes());
1144 assert_eq!(buffer.container_count(), 1);
1145 container_a.terminate();
1146
1147 assert_eq!(buffer.container_count(), 1);
1149
1150 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1152 assert_eq!(buffer.container_count(), 2);
1153
1154 while buffer.container_count() != 1 {
1157 container_b.push_back(msg.bytes());
1158 }
1159
1160 assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1161 }
1162
1163 #[fuchsia::test(allow_stalls = false)]
1164 async fn cursor_subscribe() {
1165 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1166 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1167 let container =
1168 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1169 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1170 container.push_back(msg.bytes());
1171
1172 let (sender, mut receiver) = mpsc::unbounded();
1173
1174 {
1176 let container = Arc::clone(&container);
1177 fasync::Task::spawn(async move {
1178 let mut cursor = pin!(container.cursor(mode).unwrap());
1179 while let Some(item) = cursor.next().await {
1180 sender.unbounded_send(item).unwrap();
1181 }
1182 })
1183 .detach();
1184 }
1185
1186 if mode == StreamMode::SnapshotThenSubscribe {
1188 assert_matches!(
1189 receiver.next().await,
1190 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1191 );
1192 }
1193
1194 assert!(fasync::TestExecutor::poll_until_stalled(receiver.next()).await.is_pending());
1195
1196 container.push_back(msg.bytes());
1197
1198 assert_matches!(
1200 receiver.next().await,
1201 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1202 );
1203
1204 container.terminate();
1205
1206 assert!(receiver.next().await.is_none());
1208 }
1209 }
1210
1211 #[fuchsia::test(allow_stalls = false)]
1212 async fn cursor_rolled_out() {
1213 for pass in 0..2 {
1216 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1217 let container_a =
1218 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1219 let container_b =
1220 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1221 let container_c =
1222 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1223 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1224
1225 container_a.push_back(msg.bytes());
1226 container_a.push_back(msg.bytes());
1227 container_b.push_back(msg.bytes());
1228 container_a.push_back(msg.bytes());
1229
1230 let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1231
1232 if pass == 0 {
1234 assert!(cursor.next().await.is_some());
1235 }
1236
1237 while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1239 container_c.push_back(msg.bytes());
1240 }
1241
1242 assert_matches!(
1244 cursor.next().await,
1245 Some(LazyItem::ItemsRolledOut(rolled_out, t))
1246 if t == zx::BootInstant::from_nanos(1) && rolled_out == pass + 1
1247 );
1248
1249 assert_eq!(cursor.count().await, 1);
1251 }
1252 }
1253
1254 #[fuchsia::test]
1255 async fn drained_post_termination_cursors() {
1256 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1257 let container =
1258 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1259 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1260
1261 let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1262 let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1263
1264 container.push_back(msg.bytes());
1265 container.push_back(msg.bytes());
1266 container.push_back(msg.bytes());
1267 container.push_back(msg.bytes());
1268 container.push_back(msg.bytes());
1269
1270 let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1271 assert!(cursor_a.next().await.is_some());
1272 assert!(cursor_b.next().await.is_some());
1273 assert!(cursor_c.next().await.is_some());
1274
1275 container.terminate();
1276
1277 assert_eq!(cursor_a.count().await, 4);
1279 assert_eq!(cursor_b.count().await, 4);
1280 assert_eq!(cursor_c.count().await, 4);
1281 }
1282
1283 #[fuchsia::test]
1284 async fn empty_post_termination_cursors() {
1285 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1286 let container =
1287 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1288
1289 let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1290 let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1291 let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1292
1293 container.terminate();
1294
1295 assert_eq!(cursor_a.count().await, 0);
1296 assert_eq!(cursor_b.count().await, 0);
1297 assert_eq!(cursor_c.count().await, 0);
1298 }
1299
1300 #[fuchsia::test]
1301 async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1302 let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1303 let container_a =
1304 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1305 let container_b =
1306 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1307 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1308 container_a.push_back(msg.bytes());
1309 container_a.push_back(msg.bytes());
1310 container_a.push_back(msg.bytes());
1311 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1312
1313 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1315 container_b.push_back(msg.bytes());
1316 }
1317
1318 assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1319
1320 assert!(poll!(cursor.next()).is_pending());
1321
1322 container_a.terminate();
1323 assert_eq!(cursor.count().await, 0);
1324 }
1325
1326 #[fuchsia::test]
1327 async fn recycled_container_slot() {
1328 let buffer = Arc::new(SharedBuffer::new(65536, Box::new(|_| {})));
1329 let container_a =
1330 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1331 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1332 container_a.push_back(msg.bytes());
1333
1334 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1335 assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1336
1337 let container_b =
1339 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1340 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1341 container_b.push_back(msg.bytes());
1342 }
1343
1344 container_a.terminate();
1345
1346 let container_c =
1349 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1350 container_c.push_back(msg.bytes());
1351 container_c.push_back(msg.bytes());
1352
1353 assert_matches!(cursor.next().await, None);
1355 }
1356
1357 #[fuchsia::test]
1358 async fn socket_increments_logstats() {
1359 let inspector = Inspector::new(InspectorConfig::default());
1360 let stats: Arc<LogStreamStats> =
1361 Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1362 let buffer = Arc::new(SharedBuffer::new(65536, Box::new(|_| {})));
1363 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1364 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1365
1366 let (local, remote) = zx::Socket::create_datagram();
1367 container_a.add_socket(remote);
1368
1369 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1370
1371 let mut futures = FuturesUnordered::new();
1374 futures.push(async move {
1375 let mut cursor_a = pin!(cursor_a);
1376 cursor_a.next().await
1377 });
1378 let mut next = futures.next();
1379 assert!(futures::poll!(&mut next).is_pending());
1380
1381 local.write(msg.bytes()).unwrap();
1382
1383 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1384
1385 assert_eq!(
1386 cursor_b
1387 .map(|item| {
1388 match item {
1389 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1390 _ => panic!("Unexpected item {item:?}"),
1391 }
1392 })
1393 .count()
1394 .await,
1395 1
1396 );
1397
1398 next.await;
1400 assert_data_tree!(
1402 inspector,
1403 root: contains {
1404 test: {
1405 url: "",
1406 last_timestamp: AnyProperty,
1407 sockets_closed: 0u64,
1408 sockets_opened: 0u64,
1409 invalid: {
1410 number: 0u64,
1411 bytes: 0u64,
1412 },
1413 total: {
1414 number: 1u64,
1415 bytes: 88u64,
1416 },
1417 rolled_out: {
1418 number: 0u64,
1419 bytes: 0u64,
1420 },
1421 trace: {
1422 number: 0u64,
1423 bytes: 0u64,
1424 },
1425 debug: {
1426 number: 1u64,
1427 bytes: 88u64,
1428 },
1429 info: {
1430 number: 0u64,
1431 bytes: 0u64,
1432 },
1433 warn: {
1434 number: 0u64,
1435 bytes: 0u64,
1436 },
1437 error: {
1438 number: 0u64,
1439 bytes: 0u64,
1440 },
1441 fatal: {
1442 number: 0u64,
1443 bytes: 0u64,
1444 },
1445 }
1446 }
1447 );
1448 }
1449
1450 #[fuchsia::test]
1451 async fn socket() {
1452 let buffer = Arc::new(SharedBuffer::new(65536, Box::new(|_| {})));
1453 let container_a =
1454 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1455 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1456
1457 let (local, remote) = zx::Socket::create_datagram();
1458 container_a.add_socket(remote);
1459
1460 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1461
1462 let mut futures = FuturesUnordered::new();
1465 futures.push(async move {
1466 let mut cursor_a = pin!(cursor_a);
1467 cursor_a.next().await
1468 });
1469 let mut next = futures.next();
1470 assert!(futures::poll!(&mut next).is_pending());
1471
1472 local.write(msg.bytes()).unwrap();
1473
1474 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1475
1476 assert_eq!(
1477 cursor_b
1478 .map(|item| {
1479 match item {
1480 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1481 _ => panic!("Unexpected item {item:?}"),
1482 }
1483 })
1484 .count()
1485 .await,
1486 1
1487 );
1488
1489 next.await;
1491 }
1492
1493 #[fuchsia::test]
1494 async fn socket_on_inactive() {
1495 let on_inactive = Arc::new(AtomicU64::new(0));
1496 let a_identity = Arc::new(vec!["a"].into());
1497 let buffer = Arc::new(SharedBuffer::new(65536, {
1498 let on_inactive = Arc::clone(&on_inactive);
1499 let a_identity = Arc::clone(&a_identity);
1500 Box::new(move |id| {
1501 assert_eq!(id, a_identity);
1502 on_inactive.fetch_add(1, Ordering::Relaxed);
1503 })
1504 }));
1505 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1506 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1507
1508 let (local, remote) = zx::Socket::create_datagram();
1509 container_a.add_socket(remote);
1510
1511 local.write(msg.bytes()).unwrap();
1512
1513 let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1514
1515 assert_eq!(
1516 cursor
1517 .map(|item| {
1518 match item {
1519 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1520 _ => panic!("Unexpected item {item:?}"),
1521 }
1522 })
1523 .count()
1524 .await,
1525 1
1526 );
1527
1528 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1530 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1531 container_b.push_back(msg.bytes());
1532 }
1533
1534 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1535
1536 std::mem::drop(local);
1538
1539 while on_inactive.load(Ordering::Relaxed) != 1 {
1541 fasync::Timer::new(std::time::Duration::from_millis(50)).await;
1542 }
1543 }
1544}