1use crate::identity::ComponentIdentity;
6use crate::logs::repository::ARCHIVIST_MONIKER;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use diagnostics_log_encoding::encode::add_dropped_count;
11use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
14use fuchsia_async as fasync;
15use fuchsia_async::condition::{Condition, ConditionGuard, WakerEntry};
16use fuchsia_sync::Mutex;
17use futures::Stream;
18use futures::channel::oneshot;
19use log::debug;
20use pin_project::{pin_project, pinned_drop};
21use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
22use std::collections::VecDeque;
23use std::mem::ManuallyDrop;
24use std::ops::{Deref, DerefMut, Range};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll};
28use std::time::Duration;
29use zerocopy::FromBytes;
30use zx::AsHandleRef as _;
31
32const SPACE_THRESHOLD_NUMERATOR: usize = 1;
34const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
35
36const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
39
40pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
41 RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
42}
43
44fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
45 let page_size = zx::system_get_page_size() as usize;
49 (capacity * SPACE_THRESHOLD_DENOMINATOR
50 / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
51 .next_multiple_of(page_size)
52}
53
54const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
55
56pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
57
58pub struct SharedBuffer {
59 inner: Condition<Inner>,
60
61 sockets: Mutex<Slab<Socket>>,
64
65 on_inactive: OnInactive,
67
68 port: zx::Port,
70
71 event: zx::Event,
73
74 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
76
77 _buffer_monitor_task: fasync::Task<()>,
80}
81
82struct InnerGuard<'a> {
83 buffer: &'a SharedBuffer,
84
85 guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
86
87 on_inactive: Vec<Arc<ComponentIdentity>>,
89
90 wake: bool,
92}
93
94impl Drop for InnerGuard<'_> {
95 fn drop(&mut self) {
96 if self.wake {
97 for waker in self.guard.drain_wakers() {
98 waker.wake();
99 }
100 }
101 unsafe {
103 ManuallyDrop::drop(&mut self.guard);
104 }
105 for identity in self.on_inactive.drain(..) {
106 (*self.buffer.on_inactive)(identity);
107 }
108 }
109}
110
111impl Deref for InnerGuard<'_> {
112 type Target = Inner;
113
114 fn deref(&self) -> &Self::Target {
115 &self.guard
116 }
117}
118
119impl DerefMut for InnerGuard<'_> {
120 fn deref_mut(&mut self) -> &mut Self::Target {
121 &mut self.guard
122 }
123}
124
125struct Inner {
126 ring_buffer: Arc<RingBuffer>,
128
129 containers: Containers,
131
132 thread_msg_queue: VecDeque<ThreadMessage>,
134
135 last_scanned: u64,
137
138 tail: u64,
140
141 iob_peers: Slab<(ContainerId, zx::Iob)>,
143}
144
145enum ThreadMessage {
146 Terminate,
148
149 Flush(oneshot::Sender<()>),
151}
152
153pub struct SharedBufferOptions {
154 pub sleep_time: Duration,
157}
158
159impl Default for SharedBufferOptions {
160 fn default() -> Self {
161 Self { sleep_time: DEFAULT_SLEEP_TIME }
162 }
163}
164
165impl SharedBuffer {
166 pub fn new(
168 ring_buffer: ring_buffer::Reader,
169 on_inactive: OnInactive,
170 options: SharedBufferOptions,
171 ) -> Arc<Self> {
172 let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
173 inner: Condition::new(Inner {
174 ring_buffer: Arc::clone(&ring_buffer),
175 containers: Containers::default(),
176 thread_msg_queue: VecDeque::default(),
177 last_scanned: 0,
178 tail: 0,
179 iob_peers: Slab::default(),
180 }),
181 sockets: Mutex::new(Slab::default()),
182 on_inactive,
183 port: zx::Port::create(),
184 event: zx::Event::create(),
185 socket_thread: Mutex::default(),
186 _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
187 Weak::clone(weak),
188 ring_buffer,
189 options.sleep_time,
190 )),
191 });
192
193 *this.socket_thread.lock() = Some({
194 let this = Arc::clone(&this);
195 std::thread::spawn(move || this.socket_thread(options.sleep_time))
196 });
197 this
198 }
199
200 pub fn new_container_buffer(
201 self: &Arc<Self>,
202 identity: Arc<ComponentIdentity>,
203 stats: Arc<LogStreamStats>,
204 ) -> ContainerBuffer {
205 let mut inner = self.inner.lock();
206 let Inner { containers, ring_buffer, .. } = &mut *inner;
207 ContainerBuffer {
208 shared_buffer: Arc::clone(self),
209 container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
210 }
211 }
212
213 pub async fn flush(&self) {
214 let (sender, receiver) = oneshot::channel();
215 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
216 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
217 let _ = receiver.await;
219 }
220
221 #[cfg(test)]
223 pub fn container_count(&self) -> usize {
224 self.inner.lock().containers.len()
225 }
226
227 pub async fn terminate(&self) {
229 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Terminate);
230 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
231 let join_handle = self.socket_thread.lock().take().unwrap();
232 fasync::unblock(|| {
233 let _ = join_handle.join();
234 })
235 .await;
236 }
237
238 fn socket_thread(&self, sleep_time: Duration) {
239 const INTERRUPT_KEY: u64 = u64::MAX;
240 let mut sockets_ready = Vec::new();
241 let mut iob_peer_closed = Vec::new();
242 let mut interrupt_needs_arming = true;
243 let mut msg = None;
244
245 loop {
246 let mut deadline = if msg.is_some() {
247 zx::MonotonicInstant::INFINITE_PAST
248 } else {
249 if interrupt_needs_arming {
250 self.event
251 .wait_async_handle(
252 &self.port,
253 INTERRUPT_KEY,
254 zx::Signals::USER_0,
255 zx::WaitAsyncOpts::empty(),
256 )
257 .unwrap();
258 interrupt_needs_arming = false;
259 }
260
261 let _ = self
264 .event
265 .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
266 zx::MonotonicInstant::INFINITE
267 };
268
269 loop {
271 match self.port.wait(deadline) {
272 Ok(packet) => {
273 if packet.key() == INTERRUPT_KEY {
274 interrupt_needs_arming = true;
275 if msg.is_none() {
280 msg = self.inner.lock().thread_msg_queue.pop_front();
281 }
282 } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
283 iob_peer_closed.push(packet.key() as u32);
284 } else {
285 sockets_ready.push(SocketId(packet.key() as u32))
286 }
287 }
288 Err(zx::Status::TIMED_OUT) => break,
289 Err(status) => panic!("port wait error: {status:?}"),
290 }
291 deadline = zx::MonotonicInstant::INFINITE_PAST;
292 }
293
294 let mut inner = InnerGuard::new(self);
295
296 if !iob_peer_closed.is_empty() {
297 inner.update_message_ids(inner.ring_buffer.head());
299
300 for iob_peer_closed in iob_peer_closed.drain(..) {
301 let container_id = inner.iob_peers.free(iob_peer_closed).0;
302 if let Some(container) = inner.containers.get_mut(container_id) {
303 container.iob_count -= 1;
304 if container.iob_count == 0 && !container.is_active() {
305 if container.should_free() {
306 inner.containers.free(container_id);
307 } else {
308 let identity = Arc::clone(&container.identity);
309 inner.on_inactive.push(identity);
310 }
311 }
312 }
313 }
314 }
315
316 {
317 let mut sockets = self.sockets.lock();
318 for socket_id in sockets_ready.drain(..) {
319 inner.read_socket(&mut sockets, socket_id, |socket| {
320 socket
321 .socket
322 .wait_async_handle(
323 &self.port,
324 socket_id.0 as u64,
325 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
326 zx::WaitAsyncOpts::empty(),
327 )
328 .unwrap();
329 });
330 }
331 }
332
333 if let Some(m) = msg.take() {
335 match m {
336 ThreadMessage::Terminate => return,
337 ThreadMessage::Flush(sender) => {
338 let _ = sender.send(());
339 }
340 }
341
342 msg = inner.thread_msg_queue.pop_front();
344 if msg.is_none() {
345 self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
349 }
350 }
351 }
352 }
353
354 async fn buffer_monitor_task(
355 this: Weak<Self>,
356 mut ring_buffer: ring_buffer::Reader,
357 sleep_time: Duration,
358 ) {
359 let mut last_head = 0;
360 loop {
361 fasync::Timer::new(sleep_time).await;
363 let head = ring_buffer.wait(last_head).await;
364 let Some(this) = this.upgrade() else { return };
365 let mut inner = InnerGuard::new(&this);
366 inner.check_space(head);
367 last_head = head;
368 }
369 }
370}
371
372impl Inner {
373 fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
375 if msg.len() < std::mem::size_of::<Header>() {
376 debug!("message too short ({})", msg.len());
377 if let Some(container) = self.containers.get(container_id) {
378 container.stats.increment_invalid(msg.len());
379 }
380 return;
381 }
382
383 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
384
385 let msg_len = header.size_words() as usize * 8;
388
389 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
391 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
392 if let Some(container) = self.containers.get(container_id) {
393 container.stats.increment_invalid(msg.len());
394 }
395 return;
396 }
397
398 let Some(container) = self.containers.get_mut(container_id) else {
399 return;
400 };
401
402 let mut data;
403 let msg = if container.dropped_count > 0 {
404 data = msg.to_vec();
405 if !add_dropped_count(&mut data, container.dropped_count) {
406 debug!("unable to add dropped count to invalid message");
407 container.stats.increment_invalid(data.len());
408 return;
409 }
410 &data
411 } else {
412 msg
413 };
414
415 if container.iob.write(Default::default(), 0, msg).is_err() {
416 container.dropped_count += 1
420 } else {
421 container.dropped_count = 0;
422 }
423 }
424
425 unsafe fn parse_message(
437 &self,
438 range: Range<u64>,
439 ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
440 let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
441 .expect("Unable to read message from ring buffer");
442 (
443 ContainerId(tag as u32),
444 msg,
445 (msg.len() >= 16)
446 .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
447 )
448 }
449}
450
451impl<'a> InnerGuard<'a> {
452 fn new(buffer: &'a SharedBuffer) -> Self {
453 Self {
454 buffer,
455 guard: ManuallyDrop::new(buffer.inner.lock()),
456 on_inactive: Vec::new(),
457 wake: false,
458 }
459 }
460
461 fn pop(&mut self, head: u64) -> Option<usize> {
469 if head == self.tail {
470 return None;
471 }
472
473 let record_len = {
476 let (container_id, message, timestamp) = unsafe { self.parse_message(self.tail..head) };
477 let record_len = ring_buffer_record_len(message.len());
478
479 let container = self.containers.get_mut(container_id).unwrap();
480
481 container.stats.increment_rolled_out(record_len);
482 container.msg_ids.start += 1;
483 if let Some(timestamp) = timestamp {
484 container.last_rolled_out_timestamp = timestamp;
485 }
486 if !container.is_active() {
487 if container.should_free() {
488 self.containers.free(container_id);
489 } else {
490 let identity = Arc::clone(&container.identity);
491 self.on_inactive.push(identity);
492 }
493 }
494
495 record_len
496 };
497
498 self.ring_buffer.increment_tail(record_len);
500 self.tail += record_len as u64;
501
502 assert!(self.last_scanned >= self.tail);
504
505 Some(record_len)
506 }
507
508 fn read_socket(
510 &mut self,
511 sockets: &mut Slab<Socket>,
512 socket_id: SocketId,
513 rearm: impl FnOnce(&mut Socket),
514 ) {
515 let Some(socket) = sockets.get_mut(socket_id.0) else { return };
516 let container_id = socket.container_id;
517
518 loop {
519 self.check_space(self.ring_buffer.head());
520
521 let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
522
523 let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
525 Ok(d) => d.len(),
526 Err(zx::Status::SHOULD_WAIT) => {
527 rearm(socket);
529 return;
530 }
531 Err(_) => break,
532 };
533
534 unsafe {
536 data.set_len(len);
537 }
538
539 let container = self.containers.get_mut(container_id).unwrap();
540 if data.len() < 16 {
541 container.stats.increment_invalid(data.len());
542 continue;
543 }
544
545 let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
546 let msg_len = header.size_words() as usize * 8;
547 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
548 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
549 container.stats.increment_invalid(data.len());
550 continue;
551 }
552
553 if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
554 {
555 debug!("unable to add dropped count to invalid message");
556 container.stats.increment_invalid(data.len());
557 continue;
558 }
559
560 if container.iob.write(Default::default(), 0, &data).is_err() {
561 container.dropped_count += 1
565 } else {
566 container.dropped_count = 0;
567 }
568 }
569
570 self.update_message_ids(self.ring_buffer.head());
574
575 let container = self.containers.get_mut(container_id).unwrap();
576 container.remove_socket(socket_id, sockets);
577 if !container.is_active() {
578 if container.should_free() {
579 self.containers.free(container_id);
580 } else {
581 let identity = Arc::clone(&container.identity);
582 self.on_inactive.push(identity);
583 }
584 }
585 }
586
587 fn update_message_ids(&mut self, head: u64) {
589 while self.last_scanned < head {
590 let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
593 let msg_len = message.len();
594 let severity = (msg_len >= 8)
595 .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
596 let container = self.containers.get_mut(container_id).unwrap();
597 container.msg_ids.end += 1;
598 if let Some(severity) = severity {
599 container.stats.ingest_message(msg_len, severity);
600 }
601 self.last_scanned += ring_buffer_record_len(msg_len) as u64;
602 self.wake = true;
603 }
604 }
605
606 fn check_space(&mut self, head: u64) {
608 self.update_message_ids(head);
609 let capacity = self.ring_buffer.capacity();
610 let mut space = capacity
611 .checked_sub((head - self.tail) as usize)
612 .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
613 let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
614 while space < required_space {
615 let Some(amount) = self.pop(head) else { break };
616 space += amount;
617 }
618 }
619}
620
621#[derive(Default)]
622struct Containers {
623 slab: Slab<ContainerInfo>,
624}
625
626#[derive(Clone, Copy, Debug)]
627struct ContainerId(u32);
628
629impl Containers {
630 #[cfg(test)]
631 fn len(&self) -> usize {
632 self.slab.len()
633 }
634
635 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
636 self.slab.get(id.0)
637 }
638
639 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
640 self.slab.get_mut(id.0)
641 }
642
643 fn new_container(
644 &mut self,
645 buffer: &RingBuffer,
646 identity: Arc<ComponentIdentity>,
647 stats: Arc<LogStreamStats>,
648 ) -> ContainerId {
649 ContainerId(self.slab.insert(|id| {
650 let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
651 ContainerInfo::new(identity, stats, iob)
652 }))
653 }
654
655 fn free(&mut self, id: ContainerId) {
656 self.slab.free(id.0);
657 }
658}
659
660#[derive(Derivative)]
661#[derivative(Debug)]
662struct ContainerInfo {
663 refs: usize,
665
666 identity: Arc<ComponentIdentity>,
668
669 first_index: u64,
673
674 msg_ids: Range<u64>,
677
678 terminated: bool,
680
681 #[derivative(Debug = "ignore")]
683 stats: Arc<LogStreamStats>,
684
685 last_rolled_out_timestamp: zx::BootInstant,
687
688 first_socket_id: SocketId,
690
691 iob: zx::Iob,
693
694 iob_count: usize,
696
697 dropped_count: u64,
699}
700
701impl ContainerInfo {
702 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
703 Self {
704 refs: 0,
705 identity,
706 first_index: 0,
707 msg_ids: 0..0,
708 terminated: false,
709 stats,
710 last_rolled_out_timestamp: zx::BootInstant::ZERO,
711 first_socket_id: SocketId::NULL,
712 iob,
713 iob_count: 0,
714 dropped_count: 0,
715 }
716 }
717
718 fn should_free(&self) -> bool {
719 self.terminated && self.refs == 0 && !self.is_active()
720 }
721
722 fn is_active(&self) -> bool {
728 self.first_socket_id != SocketId::NULL
729 || self.iob_count > 0
730 || self.msg_ids.end != self.msg_ids.start
731 || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
732 }
733
734 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
738 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
739 if prev == SocketId::NULL {
740 self.first_socket_id = next;
741 } else {
742 sockets.get_mut(prev.0).unwrap().next = next;
743 }
744 if next != SocketId::NULL {
745 sockets
746 .get_mut(next.0)
747 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
748 .prev = prev;
749 }
750 sockets.free(socket_id.0);
751 self.stats.close_socket();
752 debug!(identity:% = self.identity; "Socket closed.");
753 }
754}
755
756pub struct ContainerBuffer {
757 shared_buffer: Arc<SharedBuffer>,
758 container_id: ContainerId,
759}
760
761impl ContainerBuffer {
762 pub fn iob_tag(&self) -> u64 {
764 self.container_id.0 as u64
765 }
766
767 pub fn push_back(&self, msg: &[u8]) {
771 self.shared_buffer.inner.lock().ingest(msg, self.container_id);
772 }
773
774 pub fn iob(&self) -> zx::Iob {
776 let mut inner = self.shared_buffer.inner.lock();
777
778 inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
779
780 let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
781
782 inner.iob_peers.insert(|idx| {
783 ep1.wait_async_handle(
784 &self.shared_buffer.port,
785 idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
786 zx::Signals::IOB_PEER_CLOSED,
787 zx::WaitAsyncOpts::empty(),
788 )
789 .unwrap();
790
791 (self.container_id, ep1)
792 });
793
794 ep0
795 }
796
797 pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
799 let mut inner = InnerGuard::new(&self.shared_buffer);
803 let Some(mut container) = inner.containers.get_mut(self.container_id) else {
804 return None;
806 };
807
808 container.refs += 1;
809 let stats = Arc::clone(&container.stats);
810
811 let (index, next_id, end) = match mode {
812 StreamMode::Snapshot => {
813 (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
814 }
815 StreamMode::Subscribe => {
816 let head = inner.ring_buffer.head();
818 inner.update_message_ids(head);
819 container = inner.containers.get_mut(self.container_id).unwrap();
820 (head, container.msg_ids.end, CursorEnd::Stream)
821 }
822 StreamMode::SnapshotThenSubscribe => {
823 (container.first_index, container.msg_ids.start, CursorEnd::Stream)
824 }
825 };
826
827 Some(Cursor {
828 index,
829 container_id: self.container_id,
830 next_id,
831 end,
832 buffer: Arc::clone(&self.shared_buffer),
833 waker_entry: self.shared_buffer.inner.waker_entry(),
834 stats,
835 })
836 }
837
838 pub fn terminate(&self) {
842 let mut inner = InnerGuard::new(&self.shared_buffer);
843
844 inner.update_message_ids(inner.ring_buffer.head());
846
847 if let Some(container) = inner.containers.get_mut(self.container_id) {
848 container.terminated = true;
849 if container.first_socket_id != SocketId::NULL {
850 let mut sockets = self.shared_buffer.sockets.lock();
851 loop {
852 container.remove_socket(container.first_socket_id, &mut sockets);
853 if container.first_socket_id == SocketId::NULL {
854 break;
855 }
856 }
857 }
858 if container.should_free() {
859 inner.containers.free(self.container_id);
860 }
861 inner.wake = true;
862 }
863 }
864
865 pub fn is_active(&self) -> bool {
867 self.shared_buffer
868 .inner
869 .lock()
870 .containers
871 .get(self.container_id)
872 .is_some_and(|c| c.is_active())
873 }
874
875 pub fn add_socket(&self, socket: zx::Socket) {
877 let mut inner = self.shared_buffer.inner.lock();
878 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
879 container.stats.open_socket();
880 let next = container.first_socket_id;
881 let mut sockets = self.shared_buffer.sockets.lock();
882 let socket_id = SocketId(sockets.insert(|socket_id| {
883 socket
884 .wait_async_handle(
885 &self.shared_buffer.port,
886 socket_id as u64,
887 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
888 zx::WaitAsyncOpts::empty(),
889 )
890 .unwrap();
891 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
892 }));
893 if next != SocketId::NULL {
894 sockets.get_mut(next.0).unwrap().prev = socket_id;
895 }
896 container.first_socket_id = socket_id;
897 }
898}
899
900impl Drop for ContainerBuffer {
901 fn drop(&mut self) {
902 self.terminate();
903 }
904}
905
906#[pin_project(PinnedDrop)]
907#[derive(Derivative)]
908#[derivative(Debug)]
909pub struct Cursor {
910 index: u64,
912
913 container_id: ContainerId,
914
915 next_id: u64,
917
918 end: CursorEnd,
920
921 #[derivative(Debug = "ignore")]
922 buffer: Arc<SharedBuffer>,
923
924 #[pin]
926 #[derivative(Debug = "ignore")]
927 waker_entry: WakerEntry<Inner>,
928
929 #[derivative(Debug = "ignore")]
930 stats: Arc<LogStreamStats>,
931}
932
933#[derive(Debug, PartialEq)]
935pub enum LazyItem<T> {
936 Next(Arc<T>),
938 ItemsRolledOut(u64, zx::BootInstant),
940}
941
942impl Stream for Cursor {
943 type Item = LazyItem<StoredMessage>;
944
945 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
946 let mut this = self.project();
947 let mut inner = InnerGuard::new(this.buffer);
948
949 let mut head = inner.ring_buffer.head();
950 inner.check_space(head);
951
952 let mut container = match inner.containers.get(*this.container_id) {
953 None => return Poll::Ready(None),
954 Some(container) => container,
955 };
956
957 let end_id = match &mut this.end {
958 CursorEnd::Snapshot(None) => {
959 let mut sockets = this.buffer.sockets.lock();
960 let mut socket_id = container.first_socket_id;
963 while socket_id != SocketId::NULL {
964 let socket = sockets.get_mut(socket_id.0).unwrap();
965 let next = socket.next;
966 inner.read_socket(&mut sockets, socket_id, |_| {});
968 socket_id = next;
969 }
970
971 head = inner.ring_buffer.head();
973 inner.update_message_ids(head);
974 container = inner.containers.get(*this.container_id).unwrap();
975 *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
976 container.msg_ids.end
977 }
978 CursorEnd::Snapshot(Some(end)) => *end,
979 CursorEnd::Stream => u64::MAX,
980 };
981
982 if *this.next_id == end_id {
983 return Poll::Ready(None);
984 }
985
986 if container.msg_ids.start > *this.next_id {
988 let mut next_id = container.msg_ids.start;
989 if end_id < next_id {
990 next_id = end_id;
991 }
992 let rolled_out = next_id - *this.next_id;
993 *this.next_id = next_id;
994 return Poll::Ready(Some(LazyItem::ItemsRolledOut(
995 rolled_out,
996 container.last_rolled_out_timestamp,
997 )));
998 }
999
1000 if inner.tail > *this.index {
1001 *this.index = inner.tail;
1002 }
1003
1004 if container.first_index > *this.index {
1006 *this.index = container.first_index;
1007 }
1008
1009 if *this.next_id == container.msg_ids.end && *this.index < inner.last_scanned {
1010 *this.index = inner.last_scanned;
1011 }
1012
1013 while *this.index < head {
1015 let (container_id, message, _timestamp) =
1017 unsafe { inner.parse_message(*this.index..head) };
1018
1019 *this.index += ring_buffer_record_len(message.len()) as u64;
1021 assert!(*this.index <= head);
1022
1023 if container_id.0 == this.container_id.0 {
1024 *this.next_id += 1;
1025 if let Some(msg) = StoredMessage::new(message.into(), this.stats) {
1026 return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
1027 } else {
1028 }
1030 }
1031 }
1032
1033 if container.terminated {
1034 Poll::Ready(None)
1035 } else {
1036 inner.guard.add_waker(this.waker_entry, cx.waker().clone());
1037 Poll::Pending
1038 }
1039 }
1040}
1041
1042#[pinned_drop]
1043impl PinnedDrop for Cursor {
1044 fn drop(self: Pin<&mut Self>) {
1045 let mut inner = self.buffer.inner.lock();
1046 if let Some(container) = inner.containers.get_mut(self.container_id) {
1047 container.refs -= 1;
1048 if container.should_free() {
1049 inner.containers.free(self.container_id);
1050 }
1051 }
1052 }
1053}
1054
1055#[derive(Debug)]
1056enum CursorEnd {
1057 Snapshot(Option<u64>),
1060 Stream,
1061}
1062
1063struct Slab<T> {
1065 slab: Vec<Slot<T>>,
1066 free_index: usize,
1067}
1068
1069impl<T> Default for Slab<T> {
1070 fn default() -> Self {
1071 Self { slab: Vec::new(), free_index: usize::MAX }
1072 }
1073}
1074
1075enum Slot<T> {
1076 Used(T),
1077 Free(usize),
1078}
1079
1080impl<T> Slab<T> {
1081 #[cfg(test)]
1083 fn len(&self) -> usize {
1084 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
1085 }
1086
1087 fn free(&mut self, index: u32) -> T {
1088 let index = index as usize;
1089 let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
1090 Slot::Free(_) => panic!("Slot already free"),
1091 Slot::Used(value) => value,
1092 };
1093 self.free_index = index;
1094 value
1095 }
1096
1097 fn get(&self, id: u32) -> Option<&T> {
1098 self.slab.get(id as usize).and_then(|s| match s {
1099 Slot::Used(s) => Some(s),
1100 _ => None,
1101 })
1102 }
1103
1104 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
1105 self.slab.get_mut(id as usize).and_then(|s| match s {
1106 Slot::Used(s) => Some(s),
1107 _ => None,
1108 })
1109 }
1110
1111 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
1112 let free_index = self.free_index;
1113 if free_index != usize::MAX {
1114 self.free_index = match std::mem::replace(
1115 &mut self.slab[free_index],
1116 Slot::Used(value(free_index as u32)),
1117 ) {
1118 Slot::Free(next) => next,
1119 _ => unreachable!(),
1120 };
1121 free_index as u32
1122 } else {
1123 assert!(self.slab.len() < u32::MAX as usize);
1126 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1127 (self.slab.len() - 1) as u32
1128 }
1129 }
1130}
1131
1132#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1133struct SocketId(u32);
1134
1135impl SocketId {
1136 const NULL: Self = SocketId(0xffff_ffff);
1137}
1138
1139struct Socket {
1140 socket: zx::Socket,
1141 container_id: ContainerId,
1142 prev: SocketId,
1144 next: SocketId,
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149 use super::{SharedBuffer, SharedBufferOptions, create_ring_buffer};
1150 use crate::logs::shared_buffer::LazyItem;
1151 use crate::logs::stats::LogStreamStats;
1152 use crate::logs::testing::make_message;
1153 use assert_matches::assert_matches;
1154 use diagnostics_assertions::{AnyProperty, assert_data_tree};
1155 use fidl_fuchsia_diagnostics::StreamMode;
1156 use fuchsia_async as fasync;
1157 use fuchsia_async::TimeoutExt;
1158 use fuchsia_inspect::{Inspector, InspectorConfig};
1159 use fuchsia_inspect_derive::WithInspect;
1160 use futures::channel::mpsc;
1161 use futures::future::OptionFuture;
1162 use futures::stream::{FuturesUnordered, StreamExt as _};
1163 use futures::{FutureExt, poll};
1164 use ring_buffer::MAX_MESSAGE_SIZE;
1165 use std::future::poll_fn;
1166 use std::pin::pin;
1167 use std::sync::Arc;
1168 use std::sync::atomic::{AtomicU64, Ordering};
1169 use std::task::Poll;
1170 use std::time::Duration;
1171
1172 async fn yield_to_executor() {
1173 let mut first_time = true;
1174 poll_fn(|cx| {
1175 if first_time {
1176 cx.waker().wake_by_ref();
1177 first_time = false;
1178 Poll::Pending
1179 } else {
1180 Poll::Ready(())
1181 }
1182 })
1183 .await;
1184 }
1185
1186 #[fuchsia::test]
1187 async fn push_one_message() {
1188 let buffer = SharedBuffer::new(
1189 create_ring_buffer(MAX_MESSAGE_SIZE),
1190 Box::new(|_| {}),
1191 Default::default(),
1192 );
1193 let container_buffer =
1194 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1195 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1196 container_buffer.push_back(msg.bytes());
1197
1198 let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1200 assert_eq!(
1201 cursor
1202 .map(|item| {
1203 match item {
1204 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1205 _ => panic!("Unexpected item {item:?}"),
1206 }
1207 })
1208 .count()
1209 .await,
1210 1
1211 );
1212 }
1213
1214 #[fuchsia::test]
1215 async fn message_too_short() {
1216 let buffer = SharedBuffer::new(
1217 create_ring_buffer(MAX_MESSAGE_SIZE),
1218 Box::new(|_| {}),
1219 Default::default(),
1220 );
1221
1222 let container_buffer =
1223 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1224 container_buffer.push_back(&[0]);
1225
1226 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1227 }
1228
1229 #[fuchsia::test]
1230 async fn bad_type() {
1231 let buffer = SharedBuffer::new(
1232 create_ring_buffer(MAX_MESSAGE_SIZE),
1233 Box::new(|_| {}),
1234 Default::default(),
1235 );
1236 let container_buffer =
1237 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1238 container_buffer.push_back(&[0x77; 16]);
1239
1240 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1241 }
1242
1243 #[fuchsia::test]
1244 async fn message_truncated() {
1245 let buffer = SharedBuffer::new(
1246 create_ring_buffer(MAX_MESSAGE_SIZE),
1247 Box::new(|_| {}),
1248 Default::default(),
1249 );
1250 let container_buffer =
1251 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1252 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1253 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1254
1255 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1256 }
1257
1258 #[fuchsia::test]
1259 async fn buffer_wrapping() {
1260 let buffer = SharedBuffer::new(
1261 create_ring_buffer(MAX_MESSAGE_SIZE),
1262 Box::new(|_| {}),
1263 SharedBufferOptions { sleep_time: Duration::ZERO },
1264 );
1265 let container_buffer =
1266 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1267
1268 let mut i = 0;
1270 loop {
1271 let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1272 container_buffer.push_back(msg.bytes());
1273 i += 1;
1274
1275 yield_to_executor().await;
1277
1278 let inner = buffer.inner.lock();
1279 if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1280 break;
1281 }
1282 }
1283
1284 let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1286
1287 let mut j;
1288 let mut item = cursor.next().await;
1289 if let Some(LazyItem::ItemsRolledOut(_, _)) = item {
1292 item = cursor.next().await;
1293 }
1294 assert_matches!(
1295 item,
1296 Some(LazyItem::Next(item)) => {
1297 j = item.timestamp().into_nanos();
1298 let msg = make_message(&format!("{j}"),
1299 None,
1300 item.timestamp());
1301 assert_eq!(&*item, &msg);
1302 }
1303 );
1304
1305 j += 1;
1306 while j != i {
1307 assert_matches!(
1308 cursor.next().await,
1309 Some(LazyItem::Next(item)) => {
1310 assert_eq!(&*item, &make_message(&format!("{j}"),
1311 None,
1312 item.timestamp()));
1313 }
1314 );
1315 j += 1;
1316 }
1317
1318 assert_eq!(cursor.next().await, None);
1319 }
1320
1321 #[fuchsia::test]
1322 async fn on_inactive() {
1323 let identity = Arc::new(vec!["a"].into());
1324 let on_inactive = Arc::new(AtomicU64::new(0));
1325 let buffer = {
1326 let on_inactive = Arc::clone(&on_inactive);
1327 let identity = Arc::clone(&identity);
1328 Arc::new(SharedBuffer::new(
1329 create_ring_buffer(MAX_MESSAGE_SIZE),
1330 Box::new(move |i| {
1331 assert_eq!(i, identity);
1332 on_inactive.fetch_add(1, Ordering::Relaxed);
1333 }),
1334 SharedBufferOptions { sleep_time: Duration::ZERO },
1335 ))
1336 };
1337 let container_a = buffer.new_container_buffer(identity, Arc::default());
1338 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1339
1340 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1341 container_a.push_back(msg.bytes());
1342
1343 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1345 container_b.push_back(msg.bytes());
1346
1347 yield_to_executor().await;
1349 }
1350
1351 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1352 }
1353
1354 #[fuchsia::test]
1355 async fn terminate_drops_container() {
1356 async {}.await;
1358
1359 let buffer = SharedBuffer::new(
1360 create_ring_buffer(MAX_MESSAGE_SIZE),
1361 Box::new(|_| {}),
1362 SharedBufferOptions { sleep_time: Duration::ZERO },
1363 );
1364
1365 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1367 assert_eq!(buffer.container_count(), 1);
1368 container_a.terminate();
1369
1370 assert_eq!(buffer.container_count(), 0);
1371
1372 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1374 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1375 container_a.push_back(msg.bytes());
1376 assert_eq!(buffer.container_count(), 1);
1377 container_a.terminate();
1378
1379 assert_eq!(buffer.container_count(), 1);
1381
1382 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1384 assert_eq!(buffer.container_count(), 2);
1385
1386 while buffer.container_count() != 1 {
1389 container_b.push_back(msg.bytes());
1390
1391 yield_to_executor().await;
1393 }
1394
1395 assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1396 }
1397
1398 #[fuchsia::test]
1399 async fn cursor_subscribe() {
1400 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1401 let buffer = SharedBuffer::new(
1402 create_ring_buffer(MAX_MESSAGE_SIZE),
1403 Box::new(|_| {}),
1404 Default::default(),
1405 );
1406 let container =
1407 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1408 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1409 container.push_back(msg.bytes());
1410
1411 let (sender, mut receiver) = mpsc::unbounded();
1412
1413 {
1415 let container = Arc::clone(&container);
1416 fasync::Task::spawn(async move {
1417 let mut cursor = pin!(container.cursor(mode).unwrap());
1418 while let Some(item) = cursor.next().await {
1419 sender.unbounded_send(item).unwrap();
1420 }
1421 })
1422 .detach();
1423 }
1424
1425 if mode == StreamMode::SnapshotThenSubscribe {
1427 assert_matches!(
1428 receiver.next().await,
1429 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1430 );
1431 }
1432
1433 assert_eq!(
1435 OptionFuture::from(Some(receiver.next()))
1436 .on_timeout(Duration::from_millis(500), || None)
1437 .await,
1438 None
1439 );
1440
1441 container.push_back(msg.bytes());
1442
1443 assert_matches!(
1445 receiver.next().await,
1446 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1447 );
1448
1449 container.terminate();
1450
1451 assert!(receiver.next().await.is_none());
1453 }
1454 }
1455
1456 #[fuchsia::test]
1457 async fn cursor_rolled_out() {
1458 for pass in 0..2 {
1461 let buffer = SharedBuffer::new(
1462 create_ring_buffer(MAX_MESSAGE_SIZE),
1463 Box::new(|_| {}),
1464 SharedBufferOptions { sleep_time: Duration::ZERO },
1465 );
1466 let container_a =
1467 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1468 let container_b =
1469 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1470 let container_c =
1471 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1472 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1473
1474 container_a.push_back(msg.bytes());
1475 container_a.push_back(msg.bytes());
1476 container_b.push_back(msg.bytes());
1477
1478 const A_MESSAGE_COUNT: usize = 50;
1479 for _ in 0..A_MESSAGE_COUNT - 2 {
1480 container_a.push_back(msg.bytes());
1481 }
1482
1483 let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1484
1485 let mut expected = A_MESSAGE_COUNT;
1486
1487 if pass == 0 {
1489 assert!(cursor.next().await.is_some());
1490 expected -= 1;
1491 }
1492
1493 while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1495 container_c.push_back(msg.bytes());
1496
1497 yield_to_executor().await;
1499 }
1500
1501 assert_matches!(
1503 cursor.next().await,
1504 Some(LazyItem::ItemsRolledOut(rolled_out, t))
1505 if t == zx::BootInstant::from_nanos(1) && rolled_out > 0
1506 => expected -= rolled_out as usize
1507 );
1508
1509 assert_eq!(cursor.count().await, expected);
1511 }
1512 }
1513
1514 #[fuchsia::test]
1515 async fn drained_post_termination_cursors() {
1516 let buffer = SharedBuffer::new(
1517 create_ring_buffer(MAX_MESSAGE_SIZE),
1518 Box::new(|_| {}),
1519 Default::default(),
1520 );
1521 let container =
1522 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1523 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1524
1525 let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1526 let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1527
1528 container.push_back(msg.bytes());
1529 container.push_back(msg.bytes());
1530 container.push_back(msg.bytes());
1531 container.push_back(msg.bytes());
1532 container.push_back(msg.bytes());
1533
1534 let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1535 assert!(cursor_a.next().await.is_some());
1536 assert!(cursor_b.next().await.is_some());
1537 assert!(cursor_c.next().await.is_some());
1538
1539 container.terminate();
1540
1541 assert_eq!(cursor_a.count().await, 4);
1543 assert_eq!(cursor_b.count().await, 4);
1544 assert_eq!(cursor_c.count().await, 4);
1545 }
1546
1547 #[fuchsia::test]
1548 async fn empty_post_termination_cursors() {
1549 let buffer = SharedBuffer::new(
1550 create_ring_buffer(MAX_MESSAGE_SIZE),
1551 Box::new(|_| {}),
1552 Default::default(),
1553 );
1554 let container =
1555 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1556
1557 let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1558 let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1559 let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1560
1561 container.terminate();
1562
1563 assert_eq!(cursor_a.count().await, 0);
1564 assert_eq!(cursor_b.count().await, 0);
1565 assert_eq!(cursor_c.count().await, 0);
1566 }
1567
1568 #[fuchsia::test]
1569 async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1570 let buffer = SharedBuffer::new(
1571 create_ring_buffer(MAX_MESSAGE_SIZE),
1572 Box::new(|_| {}),
1573 SharedBufferOptions { sleep_time: Duration::ZERO },
1574 );
1575 let container_a =
1576 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1577 let container_b =
1578 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1579 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1580 container_a.push_back(msg.bytes());
1581 container_a.push_back(msg.bytes());
1582 container_a.push_back(msg.bytes());
1583 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1584
1585 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1587 container_b.push_back(msg.bytes());
1588
1589 yield_to_executor().await;
1591 }
1592
1593 assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1594
1595 assert!(poll!(cursor.next()).is_pending());
1596
1597 container_a.terminate();
1598 assert_eq!(cursor.count().await, 0);
1599 }
1600
1601 #[fuchsia::test]
1602 async fn recycled_container_slot() {
1603 let buffer = Arc::new(SharedBuffer::new(
1604 create_ring_buffer(MAX_MESSAGE_SIZE),
1605 Box::new(|_| {}),
1606 SharedBufferOptions { sleep_time: Duration::ZERO },
1607 ));
1608 let container_a =
1609 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1610 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1611 container_a.push_back(msg.bytes());
1612
1613 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1614 assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1615
1616 let container_b =
1618 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1619 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1620 container_b.push_back(msg.bytes());
1621
1622 yield_to_executor().await;
1624 }
1625
1626 container_a.terminate();
1627
1628 let container_c =
1631 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1632 container_c.push_back(msg.bytes());
1633 container_c.push_back(msg.bytes());
1634
1635 assert_matches!(cursor.next().await, None);
1637 }
1638
1639 #[fuchsia::test]
1640 async fn socket_increments_logstats() {
1641 let inspector = Inspector::new(InspectorConfig::default());
1642 let stats: Arc<LogStreamStats> =
1643 Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1644 let buffer = Arc::new(SharedBuffer::new(
1645 create_ring_buffer(65536),
1646 Box::new(|_| {}),
1647 Default::default(),
1648 ));
1649 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1650 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1651
1652 let (local, remote) = zx::Socket::create_datagram();
1653 container_a.add_socket(remote);
1654
1655 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1656
1657 let mut futures = FuturesUnordered::new();
1660 futures.push(async move {
1661 let mut cursor_a = pin!(cursor_a);
1662 cursor_a.next().await
1663 });
1664 let mut next = futures.next();
1665 assert!(futures::poll!(&mut next).is_pending());
1666
1667 local.write(msg.bytes()).unwrap();
1668
1669 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1670
1671 assert_eq!(
1672 cursor_b
1673 .map(|item| {
1674 match item {
1675 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1676 _ => panic!("Unexpected item {item:?}"),
1677 }
1678 })
1679 .count()
1680 .await,
1681 1
1682 );
1683
1684 next.await;
1686 assert_data_tree!(
1688 inspector,
1689 root: contains {
1690 test: {
1691 url: "",
1692 last_timestamp: AnyProperty,
1693 sockets_closed: 0u64,
1694 sockets_opened: 1u64,
1695 invalid: {
1696 number: 0u64,
1697 bytes: 0u64,
1698 },
1699 total: {
1700 number: 1u64,
1701 bytes: 88u64,
1702 },
1703 rolled_out: {
1704 number: 0u64,
1705 bytes: 0u64,
1706 },
1707 trace: {
1708 number: 0u64,
1709 bytes: 0u64,
1710 },
1711 debug: {
1712 number: 1u64,
1713 bytes: 88u64,
1714 },
1715 info: {
1716 number: 0u64,
1717 bytes: 0u64,
1718 },
1719 warn: {
1720 number: 0u64,
1721 bytes: 0u64,
1722 },
1723 error: {
1724 number: 0u64,
1725 bytes: 0u64,
1726 },
1727 fatal: {
1728 number: 0u64,
1729 bytes: 0u64,
1730 },
1731 }
1732 }
1733 );
1734 }
1735
1736 #[fuchsia::test]
1737 async fn socket() {
1738 let buffer = Arc::new(SharedBuffer::new(
1739 create_ring_buffer(MAX_MESSAGE_SIZE),
1740 Box::new(|_| {}),
1741 Default::default(),
1742 ));
1743 let container_a =
1744 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1745 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1746
1747 let (local, remote) = zx::Socket::create_datagram();
1748 container_a.add_socket(remote);
1749
1750 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1751
1752 let mut futures = FuturesUnordered::new();
1755 futures.push(async move {
1756 let mut cursor_a = pin!(cursor_a);
1757 cursor_a.next().await
1758 });
1759 let mut next = futures.next();
1760 assert!(futures::poll!(&mut next).is_pending());
1761
1762 local.write(msg.bytes()).unwrap();
1763
1764 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1765
1766 assert_eq!(
1767 cursor_b
1768 .map(|item| {
1769 match item {
1770 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1771 _ => panic!("Unexpected item {item:?}"),
1772 }
1773 })
1774 .count()
1775 .await,
1776 1
1777 );
1778
1779 next.await;
1781 }
1782
1783 #[fuchsia::test]
1784 async fn socket_on_inactive() {
1785 let on_inactive = Arc::new(AtomicU64::new(0));
1786 let a_identity = Arc::new(vec!["a"].into());
1787 let buffer = Arc::new(SharedBuffer::new(
1788 create_ring_buffer(MAX_MESSAGE_SIZE),
1789 {
1790 let on_inactive = Arc::clone(&on_inactive);
1791 let a_identity = Arc::clone(&a_identity);
1792 Box::new(move |id| {
1793 assert_eq!(id, a_identity);
1794 on_inactive.fetch_add(1, Ordering::Relaxed);
1795 })
1796 },
1797 SharedBufferOptions { sleep_time: Duration::ZERO },
1798 ));
1799 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1800 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1801
1802 let (local, remote) = zx::Socket::create_datagram();
1803 container_a.add_socket(remote);
1804
1805 local.write(msg.bytes()).unwrap();
1806
1807 let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1808
1809 assert_eq!(
1810 cursor
1811 .map(|item| {
1812 match item {
1813 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1814 _ => panic!("Unexpected item {item:?}"),
1815 }
1816 })
1817 .count()
1818 .await,
1819 1
1820 );
1821
1822 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1824 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1825 container_b.push_back(msg.bytes());
1826
1827 yield_to_executor().await;
1829 }
1830
1831 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1832
1833 std::mem::drop(local);
1835
1836 while on_inactive.load(Ordering::Relaxed) != 1 {
1838 fasync::Timer::new(Duration::from_millis(50)).await;
1839 }
1840 }
1841
1842 #[fuchsia::test]
1843 async fn flush() {
1844 let a_identity = Arc::new(vec!["a"].into());
1845 let buffer = Arc::new(SharedBuffer::new(
1846 create_ring_buffer(1024 * 1024),
1847 Box::new(|_| {}),
1848 Default::default(),
1849 ));
1850 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1851 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1852
1853 let (local, remote) = zx::Socket::create_datagram();
1854 container_a.add_socket(remote);
1855
1856 let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1857
1858 const COUNT: usize = 1000;
1859 for _ in 0..COUNT {
1860 local.write(msg.bytes()).unwrap();
1861 }
1862
1863 let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1865 flush_futures.next().await;
1866
1867 let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1868 assert!(messages.is_some());
1869
1870 flush_futures.next().await;
1872
1873 buffer.terminate().await;
1875 }
1876}