1use crate::identity::ComponentIdentity;
6use crate::logs::repository::ARCHIVIST_MONIKER;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use diagnostics_log_encoding::encode::add_dropped_count;
11use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
14use fuchsia_async as fasync;
15use fuchsia_async::condition::{Condition, ConditionGuard, WakerEntry};
16use fuchsia_sync::Mutex;
17use futures::Stream;
18use futures::channel::oneshot;
19use log::debug;
20use pin_project::{pin_project, pinned_drop};
21use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
22use std::collections::VecDeque;
23use std::mem::ManuallyDrop;
24use std::ops::{Deref, DerefMut, Range};
25use std::pin::{Pin};
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll};
28use std::time::Duration;
29use zerocopy::FromBytes;
30use zx::AsHandleRef as _;
31
32const SPACE_THRESHOLD_NUMERATOR: usize = 1;
34const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
35
36const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
39
40pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
41 RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
42}
43
44fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
45 let page_size = zx::system_get_page_size() as usize;
49 (capacity * SPACE_THRESHOLD_DENOMINATOR
50 / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
51 .next_multiple_of(page_size)
52}
53
54const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
55
56pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
57
58pub struct SharedBuffer {
59 inner: Condition<Inner>,
60
61 sockets: Mutex<Slab<Socket>>,
64
65 on_inactive: OnInactive,
67
68 port: zx::Port,
70
71 event: zx::Event,
73
74 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
76
77 _buffer_monitor_task: fasync::Task<()>,
80}
81
82struct InnerGuard<'a> {
83 buffer: &'a SharedBuffer,
84
85 guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
86
87 on_inactive: Vec<Arc<ComponentIdentity>>,
89
90 wake: bool,
92}
93
94impl Drop for InnerGuard<'_> {
95 fn drop(&mut self) {
96 if self.wake {
97 for waker in self.guard.drain_wakers() {
98 waker.wake();
99 }
100 }
101 unsafe {
103 ManuallyDrop::drop(&mut self.guard);
104 }
105 for identity in self.on_inactive.drain(..) {
106 (*self.buffer.on_inactive)(identity);
107 }
108 }
109}
110
111impl Deref for InnerGuard<'_> {
112 type Target = Inner;
113
114 fn deref(&self) -> &Self::Target {
115 &self.guard
116 }
117}
118
119impl DerefMut for InnerGuard<'_> {
120 fn deref_mut(&mut self) -> &mut Self::Target {
121 &mut self.guard
122 }
123}
124
125struct Inner {
126 ring_buffer: Arc<RingBuffer>,
128
129 containers: Containers,
131
132 thread_msg_queue: VecDeque<ThreadMessage>,
134
135 last_scanned: u64,
137
138 tail: u64,
140
141 iob_peers: Slab<(ContainerId, zx::Iob)>,
143}
144
145enum ThreadMessage {
146 Terminate,
148
149 Flush(oneshot::Sender<()>),
151}
152
153pub struct SharedBufferOptions {
154 pub sleep_time: Duration,
157}
158
159impl Default for SharedBufferOptions {
160 fn default() -> Self {
161 Self { sleep_time: DEFAULT_SLEEP_TIME }
162 }
163}
164
165impl SharedBuffer {
166 pub fn new(
168 ring_buffer: ring_buffer::Reader,
169 on_inactive: OnInactive,
170 options: SharedBufferOptions,
171 ) -> Arc<Self> {
172 let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
173 inner: Condition::new(Inner {
174 ring_buffer: Arc::clone(&ring_buffer),
175 containers: Containers::default(),
176 thread_msg_queue: VecDeque::default(),
177 last_scanned: 0,
178 tail: 0,
179 iob_peers: Slab::default(),
180 }),
181 sockets: Mutex::new(Slab::default()),
182 on_inactive,
183 port: zx::Port::create(),
184 event: zx::Event::create(),
185 socket_thread: Mutex::default(),
186 _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
187 Weak::clone(weak),
188 ring_buffer,
189 options.sleep_time,
190 )),
191 });
192
193 *this.socket_thread.lock() = Some({
194 let this = Arc::clone(&this);
195 std::thread::spawn(move || this.socket_thread(options.sleep_time))
196 });
197 this
198 }
199
200 pub fn new_container_buffer(
201 self: &Arc<Self>,
202 identity: Arc<ComponentIdentity>,
203 stats: Arc<LogStreamStats>,
204 ) -> ContainerBuffer {
205 let mut inner = self.inner.lock();
206 let Inner { containers, ring_buffer, .. } = &mut *inner;
207 ContainerBuffer {
208 shared_buffer: Arc::clone(self),
209 container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
210 }
211 }
212
213 pub async fn flush(&self) {
214 let (sender, receiver) = oneshot::channel();
215 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
216 self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
217 let _ = receiver.await;
219 }
220
221 #[cfg(test)]
223 pub fn container_count(&self) -> usize {
224 self.inner.lock().containers.len()
225 }
226
227 pub async fn terminate(&self) {
229 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Terminate);
230 self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
231 let join_handle = self.socket_thread.lock().take().unwrap();
232 fasync::unblock(|| {
233 let _ = join_handle.join();
234 })
235 .await;
236 }
237
238 fn socket_thread(&self, sleep_time: Duration) {
239 const INTERRUPT_KEY: u64 = u64::MAX;
240 let mut sockets_ready = Vec::new();
241 let mut iob_peer_closed = Vec::new();
242 let mut interrupt_needs_arming = true;
243 let mut msg = None;
244
245 loop {
246 let mut deadline = if msg.is_some() {
247 zx::MonotonicInstant::INFINITE_PAST
248 } else {
249 if interrupt_needs_arming {
250 self.event
251 .wait_async_handle(
252 &self.port,
253 INTERRUPT_KEY,
254 zx::Signals::USER_0,
255 zx::WaitAsyncOpts::empty(),
256 )
257 .unwrap();
258 interrupt_needs_arming = false;
259 }
260
261 let _ = self.event.wait_handle(
264 zx::Signals::USER_0,
265 zx::MonotonicInstant::after(sleep_time.into()),
266 );
267 zx::MonotonicInstant::INFINITE
268 };
269
270 loop {
272 match self.port.wait(deadline) {
273 Ok(packet) => {
274 if packet.key() == INTERRUPT_KEY {
275 interrupt_needs_arming = true;
276 if msg.is_none() {
281 msg = self.inner.lock().thread_msg_queue.pop_front();
282 }
283 } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
284 iob_peer_closed.push(packet.key() as u32);
285 } else {
286 sockets_ready.push(SocketId(packet.key() as u32))
287 }
288 }
289 Err(zx::Status::TIMED_OUT) => break,
290 Err(status) => panic!("port wait error: {status:?}"),
291 }
292 deadline = zx::MonotonicInstant::INFINITE_PAST;
293 }
294
295 let mut inner = InnerGuard::new(self);
296
297 if !iob_peer_closed.is_empty() {
298 inner.update_message_ids(inner.ring_buffer.head());
300
301 for iob_peer_closed in iob_peer_closed.drain(..) {
302 let container_id = inner.iob_peers.free(iob_peer_closed).0;
303 if let Some(container) = inner.containers.get_mut(container_id) {
304 container.iob_count -= 1;
305 if container.iob_count == 0 && !container.is_active() {
306 if container.should_free() {
307 inner.containers.free(container_id);
308 } else {
309 let identity = Arc::clone(&container.identity);
310 inner.on_inactive.push(identity);
311 }
312 }
313 }
314 }
315 }
316
317 {
318 let mut sockets = self.sockets.lock();
319 for socket_id in sockets_ready.drain(..) {
320 inner.read_socket(&mut sockets, socket_id, |socket| {
321 socket
322 .socket
323 .wait_async_handle(
324 &self.port,
325 socket_id.0 as u64,
326 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
327 zx::WaitAsyncOpts::empty(),
328 )
329 .unwrap();
330 });
331 }
332 }
333
334 if let Some(m) = msg.take() {
336 match m {
337 ThreadMessage::Terminate => return,
338 ThreadMessage::Flush(sender) => {
339 let _ = sender.send(());
340 }
341 }
342
343 msg = inner.thread_msg_queue.pop_front();
345 if msg.is_none() {
346 self.event.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
350 }
351 }
352 }
353 }
354
355 async fn buffer_monitor_task(
356 this: Weak<Self>,
357 mut ring_buffer: ring_buffer::Reader,
358 sleep_time: Duration,
359 ) {
360 let mut last_head = 0;
361 loop {
362 fasync::Timer::new(sleep_time).await;
364 let head = ring_buffer.wait(last_head).await;
365 let Some(this) = this.upgrade() else { return };
366 let mut inner = InnerGuard::new(&this);
367 inner.check_space(head);
368 last_head = head;
369 }
370 }
371}
372
373impl Inner {
374 fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
376 if msg.len() < std::mem::size_of::<Header>() {
377 debug!("message too short ({})", msg.len());
378 if let Some(container) = self.containers.get(container_id) {
379 container.stats.increment_invalid(msg.len());
380 }
381 return;
382 }
383
384 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
385
386 let msg_len = header.size_words() as usize * 8;
389
390 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
392 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
393 if let Some(container) = self.containers.get(container_id) {
394 container.stats.increment_invalid(msg.len());
395 }
396 return;
397 }
398
399 let Some(container) = self.containers.get_mut(container_id) else {
400 return;
401 };
402
403 let mut data;
404 let msg = if container.dropped_count > 0 {
405 data = msg.to_vec();
406 if !add_dropped_count(&mut data, container.dropped_count) {
407 debug!("unable to add dropped count to invalid message");
408 container.stats.increment_invalid(data.len());
409 return;
410 }
411 &data
412 } else {
413 msg
414 };
415
416 if container.iob.write(Default::default(), 0, msg).is_err() {
417 container.dropped_count += 1
421 } else {
422 container.dropped_count = 0;
423 }
424 }
425
426 unsafe fn parse_message(
438 &self,
439 range: Range<u64>,
440 ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
441 let (tag, msg) = self
442 .ring_buffer
443 .first_message_in(range)
444 .expect("Unable to read message from ring buffer");
445 (
446 ContainerId(tag as u32),
447 msg,
448 (msg.len() >= 16)
449 .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
450 )
451 }
452}
453
454impl<'a> InnerGuard<'a> {
455 fn new(buffer: &'a SharedBuffer) -> Self {
456 Self {
457 buffer,
458 guard: ManuallyDrop::new(buffer.inner.lock()),
459 on_inactive: Vec::new(),
460 wake: false,
461 }
462 }
463
464 fn pop(&mut self, head: u64) -> Option<usize> {
472 if head == self.tail {
473 return None;
474 }
475
476 let record_len = {
479 let (container_id, message, timestamp) = unsafe { self.parse_message(self.tail..head) };
480 let record_len = ring_buffer_record_len(message.len());
481
482 let container = self.containers.get_mut(container_id).unwrap();
483
484 container.stats.increment_rolled_out(record_len);
485 container.msg_ids.start += 1;
486 if let Some(timestamp) = timestamp {
487 container.last_rolled_out_timestamp = timestamp;
488 }
489 if !container.is_active() {
490 if container.should_free() {
491 self.containers.free(container_id);
492 } else {
493 let identity = Arc::clone(&container.identity);
494 self.on_inactive.push(identity);
495 }
496 }
497
498 record_len
499 };
500
501 self.ring_buffer.increment_tail(record_len);
503 self.tail += record_len as u64;
504
505 assert!(self.last_scanned >= self.tail);
507
508 Some(record_len)
509 }
510
511 fn read_socket(
513 &mut self,
514 sockets: &mut Slab<Socket>,
515 socket_id: SocketId,
516 rearm: impl FnOnce(&mut Socket),
517 ) {
518 let Some(socket) = sockets.get_mut(socket_id.0) else { return };
519 let container_id = socket.container_id;
520
521 loop {
522 self.check_space(self.ring_buffer.head());
523
524 let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
525
526 let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
528 Ok(d) => d.len(),
529 Err(zx::Status::SHOULD_WAIT) => {
530 rearm(socket);
532 return;
533 }
534 Err(_) => break,
535 };
536
537 unsafe {
539 data.set_len(len);
540 }
541
542 let container = self.containers.get_mut(container_id).unwrap();
543 if data.len() < 16 {
544 container.stats.increment_invalid(data.len());
545 continue;
546 }
547
548 let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
549 let msg_len = header.size_words() as usize * 8;
550 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
551 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
552 container.stats.increment_invalid(data.len());
553 continue;
554 }
555
556 if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
557 {
558 debug!("unable to add dropped count to invalid message");
559 container.stats.increment_invalid(data.len());
560 continue;
561 }
562
563 if container.iob.write(Default::default(), 0, &data).is_err() {
564 container.dropped_count += 1
568 } else {
569 container.dropped_count = 0;
570 }
571 }
572
573 self.update_message_ids(self.ring_buffer.head());
577
578 let container = self.containers.get_mut(container_id).unwrap();
579 container.remove_socket(socket_id, sockets);
580 if !container.is_active() {
581 if container.should_free() {
582 self.containers.free(container_id);
583 } else {
584 let identity = Arc::clone(&container.identity);
585 self.on_inactive.push(identity);
586 }
587 }
588 }
589
590 fn update_message_ids(&mut self, head: u64) {
592 while self.last_scanned < head {
593 let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
596 let msg_len = message.len();
597 let severity = (msg_len >= 8)
598 .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
599 let container = self.containers.get_mut(container_id).unwrap();
600 container.msg_ids.end += 1;
601 if let Some(severity) = severity {
602 container.stats.ingest_message(msg_len, severity);
603 }
604 self.last_scanned += ring_buffer_record_len(msg_len) as u64;
605 self.wake = true;
606 }
607 }
608
609 fn check_space(&mut self, head: u64) {
611 self.update_message_ids(head);
612 let capacity = self.ring_buffer.capacity();
613 let mut space = capacity
614 .checked_sub((head - self.tail) as usize)
615 .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
616 let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
617 while space < required_space {
618 let Some(amount) = self.pop(head) else { break };
619 space += amount;
620 }
621 }
622}
623
624#[derive(Default)]
625struct Containers {
626 slab: Slab<ContainerInfo>,
627}
628
629#[derive(Clone, Copy, Debug)]
630struct ContainerId(u32);
631
632impl Containers {
633 #[cfg(test)]
634 fn len(&self) -> usize {
635 self.slab.len()
636 }
637
638 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
639 self.slab.get(id.0)
640 }
641
642 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
643 self.slab.get_mut(id.0)
644 }
645
646 fn new_container(
647 &mut self,
648 buffer: &RingBuffer,
649 identity: Arc<ComponentIdentity>,
650 stats: Arc<LogStreamStats>,
651 ) -> ContainerId {
652 ContainerId(self.slab.insert(|id| {
653 let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
654 ContainerInfo::new(identity, stats, iob)
655 }))
656 }
657
658 fn free(&mut self, id: ContainerId) {
659 self.slab.free(id.0);
660 }
661}
662
663#[derive(Derivative)]
664#[derivative(Debug)]
665struct ContainerInfo {
666 refs: usize,
668
669 identity: Arc<ComponentIdentity>,
671
672 first_index: u64,
676
677 msg_ids: Range<u64>,
680
681 terminated: bool,
683
684 #[derivative(Debug = "ignore")]
686 stats: Arc<LogStreamStats>,
687
688 last_rolled_out_timestamp: zx::BootInstant,
690
691 first_socket_id: SocketId,
693
694 iob: zx::Iob,
696
697 iob_count: usize,
699
700 dropped_count: u64,
702}
703
704impl ContainerInfo {
705 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
706 Self {
707 refs: 0,
708 identity,
709 first_index: 0,
710 msg_ids: 0..0,
711 terminated: false,
712 stats,
713 last_rolled_out_timestamp: zx::BootInstant::ZERO,
714 first_socket_id: SocketId::NULL,
715 iob,
716 iob_count: 0,
717 dropped_count: 0,
718 }
719 }
720
721 fn should_free(&self) -> bool {
722 self.terminated && self.refs == 0 && !self.is_active()
723 }
724
725 fn is_active(&self) -> bool {
731 self.first_socket_id != SocketId::NULL
732 || self.iob_count > 0
733 || self.msg_ids.end != self.msg_ids.start
734 || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
735 }
736
737 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
741 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
742 if prev == SocketId::NULL {
743 self.first_socket_id = next;
744 } else {
745 sockets.get_mut(prev.0).unwrap().next = next;
746 }
747 if next != SocketId::NULL {
748 sockets
749 .get_mut(next.0)
750 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
751 .prev = prev;
752 }
753 sockets.free(socket_id.0);
754 self.stats.close_socket();
755 debug!(identity:% = self.identity; "Socket closed.");
756 }
757}
758
759pub struct ContainerBuffer {
760 shared_buffer: Arc<SharedBuffer>,
761 container_id: ContainerId,
762}
763
764impl ContainerBuffer {
765 pub fn iob_tag(&self) -> u64 {
767 self.container_id.0 as u64
768 }
769
770 pub fn push_back(&self, msg: &[u8]) {
774 self.shared_buffer.inner.lock().ingest(msg, self.container_id);
775 }
776
777 pub fn iob(&self) -> zx::Iob {
779 let mut inner = self.shared_buffer.inner.lock();
780
781 inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
782
783 let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
784
785 inner.iob_peers.insert(|idx| {
786 ep1.wait_async_handle(
787 &self.shared_buffer.port,
788 idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
789 zx::Signals::IOB_PEER_CLOSED,
790 zx::WaitAsyncOpts::empty(),
791 )
792 .unwrap();
793
794 (self.container_id, ep1)
795 });
796
797 ep0
798 }
799
800 pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
802 let mut inner = InnerGuard::new(&self.shared_buffer);
806 let Some(mut container) = inner.containers.get_mut(self.container_id) else {
807 return None;
809 };
810
811 container.refs += 1;
812 let stats = Arc::clone(&container.stats);
813
814 let (index, next_id, end) = match mode {
815 StreamMode::Snapshot => {
816 (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
817 }
818 StreamMode::Subscribe => {
819 let head = inner.ring_buffer.head();
821 inner.update_message_ids(head);
822 container = inner.containers.get_mut(self.container_id).unwrap();
823 (head, container.msg_ids.end, CursorEnd::Stream)
824 }
825 StreamMode::SnapshotThenSubscribe => {
826 (container.first_index, container.msg_ids.start, CursorEnd::Stream)
827 }
828 };
829
830 Some(Cursor {
831 index,
832 container_id: self.container_id,
833 next_id,
834 end,
835 buffer: Arc::clone(&self.shared_buffer),
836 waker_entry: self.shared_buffer.inner.waker_entry(),
837 stats,
838 })
839 }
840
841 pub fn terminate(&self) {
845 let mut inner = InnerGuard::new(&self.shared_buffer);
846
847 inner.update_message_ids(inner.ring_buffer.head());
849
850 if let Some(container) = inner.containers.get_mut(self.container_id) {
851 container.terminated = true;
852 if container.first_socket_id != SocketId::NULL {
853 let mut sockets = self.shared_buffer.sockets.lock();
854 loop {
855 container.remove_socket(container.first_socket_id, &mut sockets);
856 if container.first_socket_id == SocketId::NULL {
857 break;
858 }
859 }
860 }
861 if container.should_free() {
862 inner.containers.free(self.container_id);
863 }
864 inner.wake = true;
865 }
866 }
867
868 pub fn is_active(&self) -> bool {
870 self.shared_buffer
871 .inner
872 .lock()
873 .containers
874 .get(self.container_id)
875 .is_some_and(|c| c.is_active())
876 }
877
878 pub fn add_socket(&self, socket: zx::Socket) {
880 let mut inner = self.shared_buffer.inner.lock();
881 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
882 container.stats.open_socket();
883 let next = container.first_socket_id;
884 let mut sockets = self.shared_buffer.sockets.lock();
885 let socket_id = SocketId(sockets.insert(|socket_id| {
886 socket
887 .wait_async_handle(
888 &self.shared_buffer.port,
889 socket_id as u64,
890 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
891 zx::WaitAsyncOpts::empty(),
892 )
893 .unwrap();
894 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
895 }));
896 if next != SocketId::NULL {
897 sockets.get_mut(next.0).unwrap().prev = socket_id;
898 }
899 container.first_socket_id = socket_id;
900 }
901}
902
903impl Drop for ContainerBuffer {
904 fn drop(&mut self) {
905 self.terminate();
906 }
907}
908
909#[pin_project(PinnedDrop)]
910#[derive(Derivative)]
911#[derivative(Debug)]
912pub struct Cursor {
913 index: u64,
915
916 container_id: ContainerId,
917
918 next_id: u64,
920
921 end: CursorEnd,
923
924 #[derivative(Debug = "ignore")]
925 buffer: Arc<SharedBuffer>,
926
927 #[pin]
929 #[derivative(Debug = "ignore")]
930 waker_entry: WakerEntry<Inner>,
931
932 #[derivative(Debug = "ignore")]
933 stats: Arc<LogStreamStats>,
934}
935
936#[derive(Debug, PartialEq)]
938pub enum LazyItem<T> {
939 Next(Arc<T>),
941 ItemsRolledOut(u64, zx::BootInstant),
943}
944
945impl Stream for Cursor {
946 type Item = LazyItem<StoredMessage>;
947
948 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
949 let mut this = self.project();
950 let mut inner = InnerGuard::new(this.buffer);
951
952 let mut head = inner.ring_buffer.head();
953 inner.check_space(head);
954
955 let mut container = match inner.containers.get(*this.container_id) {
956 None => return Poll::Ready(None),
957 Some(container) => container,
958 };
959
960 let end_id = match &mut this.end {
961 CursorEnd::Snapshot(None) => {
962 let mut sockets = this.buffer.sockets.lock();
963 let mut socket_id = container.first_socket_id;
966 while socket_id != SocketId::NULL {
967 let socket = sockets.get_mut(socket_id.0).unwrap();
968 let next = socket.next;
969 inner.read_socket(&mut sockets, socket_id, |_| {});
971 socket_id = next;
972 }
973
974 head = inner.ring_buffer.head();
976 inner.update_message_ids(head);
977 container = inner.containers.get(*this.container_id).unwrap();
978 *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
979 container.msg_ids.end
980 }
981 CursorEnd::Snapshot(Some(end)) => *end,
982 CursorEnd::Stream => u64::MAX,
983 };
984
985 if *this.next_id == end_id {
986 return Poll::Ready(None);
987 }
988
989 if container.msg_ids.start > *this.next_id {
991 let mut next_id = container.msg_ids.start;
992 if end_id < next_id {
993 next_id = end_id;
994 }
995 let rolled_out = next_id - *this.next_id;
996 *this.next_id = next_id;
997 return Poll::Ready(Some(LazyItem::ItemsRolledOut(
998 rolled_out,
999 container.last_rolled_out_timestamp,
1000 )));
1001 }
1002
1003 if inner.tail > *this.index {
1004 *this.index = inner.tail;
1005 }
1006
1007 if container.first_index > *this.index {
1009 *this.index = container.first_index;
1010 }
1011
1012 if *this.next_id == container.msg_ids.end && *this.index < inner.last_scanned {
1013 *this.index = inner.last_scanned;
1014 }
1015
1016 while *this.index < head {
1018 let (container_id, message, _timestamp) =
1020 unsafe { inner.parse_message(*this.index..head) };
1021
1022 *this.index += ring_buffer_record_len(message.len()) as u64;
1024 assert!(*this.index <= head);
1025
1026 if container_id.0 == this.container_id.0 {
1027 *this.next_id += 1;
1028 if let Some(msg) = StoredMessage::new(message.into(), this.stats) {
1029 return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
1030 } else {
1031 }
1033 }
1034 }
1035
1036 if container.terminated {
1037 Poll::Ready(None)
1038 } else {
1039 inner.guard.add_waker(this.waker_entry, cx.waker().clone());
1040 Poll::Pending
1041 }
1042 }
1043}
1044
1045#[pinned_drop]
1046impl PinnedDrop for Cursor {
1047 fn drop(self: Pin<&mut Self>) {
1048 let mut inner = self.buffer.inner.lock();
1049 if let Some(container) = inner.containers.get_mut(self.container_id) {
1050 container.refs -= 1;
1051 if container.should_free() {
1052 inner.containers.free(self.container_id);
1053 }
1054 }
1055 }
1056}
1057
1058#[derive(Debug)]
1059enum CursorEnd {
1060 Snapshot(Option<u64>),
1063 Stream,
1064}
1065
1066struct Slab<T> {
1068 slab: Vec<Slot<T>>,
1069 free_index: usize,
1070}
1071
1072impl<T> Default for Slab<T> {
1073 fn default() -> Self {
1074 Self { slab: Vec::new(), free_index: usize::MAX }
1075 }
1076}
1077
1078enum Slot<T> {
1079 Used(T),
1080 Free(usize),
1081}
1082
1083impl<T> Slab<T> {
1084 #[cfg(test)]
1086 fn len(&self) -> usize {
1087 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
1088 }
1089
1090 fn free(&mut self, index: u32) -> T {
1091 let index = index as usize;
1092 let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
1093 Slot::Free(_) => panic!("Slot already free"),
1094 Slot::Used(value) => value,
1095 };
1096 self.free_index = index;
1097 value
1098 }
1099
1100 fn get(&self, id: u32) -> Option<&T> {
1101 self.slab.get(id as usize).and_then(|s| match s {
1102 Slot::Used(s) => Some(s),
1103 _ => None,
1104 })
1105 }
1106
1107 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
1108 self.slab.get_mut(id as usize).and_then(|s| match s {
1109 Slot::Used(s) => Some(s),
1110 _ => None,
1111 })
1112 }
1113
1114 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
1115 let free_index = self.free_index;
1116 if free_index != usize::MAX {
1117 self.free_index = match std::mem::replace(
1118 &mut self.slab[free_index],
1119 Slot::Used(value(free_index as u32)),
1120 ) {
1121 Slot::Free(next) => next,
1122 _ => unreachable!(),
1123 };
1124 free_index as u32
1125 } else {
1126 assert!(self.slab.len() < u32::MAX as usize);
1129 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1130 (self.slab.len() - 1) as u32
1131 }
1132 }
1133}
1134
1135#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1136struct SocketId(u32);
1137
1138impl SocketId {
1139 const NULL: Self = SocketId(0xffff_ffff);
1140}
1141
1142struct Socket {
1143 socket: zx::Socket,
1144 container_id: ContainerId,
1145 prev: SocketId,
1147 next: SocketId,
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152 use super::{SharedBuffer, SharedBufferOptions, create_ring_buffer};
1153 use crate::logs::shared_buffer::LazyItem;
1154 use crate::logs::stats::LogStreamStats;
1155 use crate::logs::testing::make_message;
1156 use assert_matches::assert_matches;
1157 use diagnostics_assertions::{AnyProperty, assert_data_tree};
1158 use fidl_fuchsia_diagnostics::StreamMode;
1159 use fuchsia_async as fasync;
1160 use fuchsia_async::TimeoutExt;
1161 use fuchsia_inspect::{Inspector, InspectorConfig};
1162 use fuchsia_inspect_derive::WithInspect;
1163 use futures::channel::mpsc;
1164 use futures::future::OptionFuture;
1165 use futures::stream::{FuturesUnordered, StreamExt as _};
1166 use futures::{FutureExt, poll};
1167 use ring_buffer::MAX_MESSAGE_SIZE;
1168 use std::future::poll_fn;
1169 use std::pin::pin;
1170 use std::sync::Arc;
1171 use std::sync::atomic::{AtomicU64, Ordering};
1172 use std::task::Poll;
1173 use std::time::Duration;
1174
1175 async fn yield_to_executor() {
1176 let mut first_time = true;
1177 poll_fn(|cx| {
1178 if first_time {
1179 cx.waker().wake_by_ref();
1180 first_time = false;
1181 Poll::Pending
1182 } else {
1183 Poll::Ready(())
1184 }
1185 })
1186 .await;
1187 }
1188
1189 #[fuchsia::test]
1190 async fn push_one_message() {
1191 let buffer = SharedBuffer::new(
1192 create_ring_buffer(MAX_MESSAGE_SIZE),
1193 Box::new(|_| {}),
1194 Default::default(),
1195 );
1196 let container_buffer =
1197 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1198 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1199 container_buffer.push_back(msg.bytes());
1200
1201 let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1203 assert_eq!(
1204 cursor
1205 .map(|item| {
1206 match item {
1207 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1208 _ => panic!("Unexpected item {item:?}"),
1209 }
1210 })
1211 .count()
1212 .await,
1213 1
1214 );
1215 }
1216
1217 #[fuchsia::test]
1218 async fn message_too_short() {
1219 let buffer = SharedBuffer::new(
1220 create_ring_buffer(MAX_MESSAGE_SIZE),
1221 Box::new(|_| {}),
1222 Default::default(),
1223 );
1224
1225 let container_buffer =
1226 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1227 container_buffer.push_back(&[0]);
1228
1229 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1230 }
1231
1232 #[fuchsia::test]
1233 async fn bad_type() {
1234 let buffer = SharedBuffer::new(
1235 create_ring_buffer(MAX_MESSAGE_SIZE),
1236 Box::new(|_| {}),
1237 Default::default(),
1238 );
1239 let container_buffer =
1240 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1241 container_buffer.push_back(&[0x77; 16]);
1242
1243 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1244 }
1245
1246 #[fuchsia::test]
1247 async fn message_truncated() {
1248 let buffer = SharedBuffer::new(
1249 create_ring_buffer(MAX_MESSAGE_SIZE),
1250 Box::new(|_| {}),
1251 Default::default(),
1252 );
1253 let container_buffer =
1254 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1255 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1256 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1257
1258 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1259 }
1260
1261 #[fuchsia::test]
1262 async fn buffer_wrapping() {
1263 let buffer = SharedBuffer::new(
1264 create_ring_buffer(MAX_MESSAGE_SIZE),
1265 Box::new(|_| {}),
1266 SharedBufferOptions { sleep_time: Duration::ZERO },
1267 );
1268 let container_buffer =
1269 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1270
1271 let mut i = 0;
1273 loop {
1274 let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1275 container_buffer.push_back(msg.bytes());
1276 i += 1;
1277
1278 yield_to_executor().await;
1280
1281 let inner = buffer.inner.lock();
1282 if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1283 break;
1284 }
1285 }
1286
1287 let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1289
1290 let mut j;
1291 let mut item = cursor.next().await;
1292 if let Some(LazyItem::ItemsRolledOut(_, _)) = item {
1295 item = cursor.next().await;
1296 }
1297 assert_matches!(
1298 item,
1299 Some(LazyItem::Next(item)) => {
1300 j = item.timestamp().into_nanos();
1301 let msg = make_message(&format!("{j}"),
1302 None,
1303 item.timestamp());
1304 assert_eq!(&*item, &msg);
1305 }
1306 );
1307
1308 j += 1;
1309 while j != i {
1310 assert_matches!(
1311 cursor.next().await,
1312 Some(LazyItem::Next(item)) => {
1313 assert_eq!(&*item, &make_message(&format!("{j}"),
1314 None,
1315 item.timestamp()));
1316 }
1317 );
1318 j += 1;
1319 }
1320
1321 assert_eq!(cursor.next().await, None);
1322 }
1323
1324 #[fuchsia::test]
1325 async fn on_inactive() {
1326 let identity = Arc::new(vec!["a"].into());
1327 let on_inactive = Arc::new(AtomicU64::new(0));
1328 let buffer = {
1329 let on_inactive = Arc::clone(&on_inactive);
1330 let identity = Arc::clone(&identity);
1331 Arc::new(SharedBuffer::new(
1332 create_ring_buffer(MAX_MESSAGE_SIZE),
1333 Box::new(move |i| {
1334 assert_eq!(i, identity);
1335 on_inactive.fetch_add(1, Ordering::Relaxed);
1336 }),
1337 SharedBufferOptions { sleep_time: Duration::ZERO },
1338 ))
1339 };
1340 let container_a = buffer.new_container_buffer(identity, Arc::default());
1341 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1342
1343 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1344 container_a.push_back(msg.bytes());
1345
1346 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1348 container_b.push_back(msg.bytes());
1349
1350 yield_to_executor().await;
1352 }
1353
1354 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1355 }
1356
1357 #[fuchsia::test]
1358 async fn terminate_drops_container() {
1359 async {}.await;
1361
1362 let buffer = SharedBuffer::new(
1363 create_ring_buffer(MAX_MESSAGE_SIZE),
1364 Box::new(|_| {}),
1365 SharedBufferOptions { sleep_time: Duration::ZERO },
1366 );
1367
1368 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1370 assert_eq!(buffer.container_count(), 1);
1371 container_a.terminate();
1372
1373 assert_eq!(buffer.container_count(), 0);
1374
1375 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1377 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1378 container_a.push_back(msg.bytes());
1379 assert_eq!(buffer.container_count(), 1);
1380 container_a.terminate();
1381
1382 assert_eq!(buffer.container_count(), 1);
1384
1385 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1387 assert_eq!(buffer.container_count(), 2);
1388
1389 while buffer.container_count() != 1 {
1392 container_b.push_back(msg.bytes());
1393
1394 yield_to_executor().await;
1396 }
1397
1398 assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1399 }
1400
1401 #[fuchsia::test]
1402 async fn cursor_subscribe() {
1403 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1404 let buffer = SharedBuffer::new(
1405 create_ring_buffer(MAX_MESSAGE_SIZE),
1406 Box::new(|_| {}),
1407 Default::default(),
1408 );
1409 let container =
1410 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1411 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1412 container.push_back(msg.bytes());
1413
1414 let (sender, mut receiver) = mpsc::unbounded();
1415
1416 {
1418 let container = Arc::clone(&container);
1419 fasync::Task::spawn(async move {
1420 let mut cursor = pin!(container.cursor(mode).unwrap());
1421 while let Some(item) = cursor.next().await {
1422 sender.unbounded_send(item).unwrap();
1423 }
1424 })
1425 .detach();
1426 }
1427
1428 if mode == StreamMode::SnapshotThenSubscribe {
1430 assert_matches!(
1431 receiver.next().await,
1432 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1433 );
1434 }
1435
1436 assert_eq!(
1438 OptionFuture::from(Some(receiver.next()))
1439 .on_timeout(Duration::from_millis(500), || None)
1440 .await,
1441 None
1442 );
1443
1444 container.push_back(msg.bytes());
1445
1446 assert_matches!(
1448 receiver.next().await,
1449 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1450 );
1451
1452 container.terminate();
1453
1454 assert!(receiver.next().await.is_none());
1456 }
1457 }
1458
1459 #[fuchsia::test]
1460 async fn cursor_rolled_out() {
1461 for pass in 0..2 {
1464 let buffer = SharedBuffer::new(
1465 create_ring_buffer(MAX_MESSAGE_SIZE),
1466 Box::new(|_| {}),
1467 SharedBufferOptions { sleep_time: Duration::ZERO },
1468 );
1469 let container_a =
1470 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1471 let container_b =
1472 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1473 let container_c =
1474 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1475 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1476
1477 container_a.push_back(msg.bytes());
1478 container_a.push_back(msg.bytes());
1479 container_b.push_back(msg.bytes());
1480
1481 const A_MESSAGE_COUNT: usize = 50;
1482 for _ in 0..A_MESSAGE_COUNT - 2 {
1483 container_a.push_back(msg.bytes());
1484 }
1485
1486 let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1487
1488 let mut expected = A_MESSAGE_COUNT;
1489
1490 if pass == 0 {
1492 assert!(cursor.next().await.is_some());
1493 expected -= 1;
1494 }
1495
1496 while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1498 container_c.push_back(msg.bytes());
1499
1500 yield_to_executor().await;
1502 }
1503
1504 assert_matches!(
1506 cursor.next().await,
1507 Some(LazyItem::ItemsRolledOut(rolled_out, t))
1508 if t == zx::BootInstant::from_nanos(1) && rolled_out > 0
1509 => expected -= rolled_out as usize
1510 );
1511
1512 assert_eq!(cursor.count().await, expected);
1514 }
1515 }
1516
1517 #[fuchsia::test]
1518 async fn drained_post_termination_cursors() {
1519 let buffer = SharedBuffer::new(
1520 create_ring_buffer(MAX_MESSAGE_SIZE),
1521 Box::new(|_| {}),
1522 Default::default(),
1523 );
1524 let container =
1525 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1526 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1527
1528 let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1529 let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1530
1531 container.push_back(msg.bytes());
1532 container.push_back(msg.bytes());
1533 container.push_back(msg.bytes());
1534 container.push_back(msg.bytes());
1535 container.push_back(msg.bytes());
1536
1537 let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1538 assert!(cursor_a.next().await.is_some());
1539 assert!(cursor_b.next().await.is_some());
1540 assert!(cursor_c.next().await.is_some());
1541
1542 container.terminate();
1543
1544 assert_eq!(cursor_a.count().await, 4);
1546 assert_eq!(cursor_b.count().await, 4);
1547 assert_eq!(cursor_c.count().await, 4);
1548 }
1549
1550 #[fuchsia::test]
1551 async fn empty_post_termination_cursors() {
1552 let buffer = SharedBuffer::new(
1553 create_ring_buffer(MAX_MESSAGE_SIZE),
1554 Box::new(|_| {}),
1555 Default::default(),
1556 );
1557 let container =
1558 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1559
1560 let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1561 let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1562 let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1563
1564 container.terminate();
1565
1566 assert_eq!(cursor_a.count().await, 0);
1567 assert_eq!(cursor_b.count().await, 0);
1568 assert_eq!(cursor_c.count().await, 0);
1569 }
1570
1571 #[fuchsia::test]
1572 async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1573 let buffer = SharedBuffer::new(
1574 create_ring_buffer(MAX_MESSAGE_SIZE),
1575 Box::new(|_| {}),
1576 SharedBufferOptions { sleep_time: Duration::ZERO },
1577 );
1578 let container_a =
1579 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1580 let container_b =
1581 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1582 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1583 container_a.push_back(msg.bytes());
1584 container_a.push_back(msg.bytes());
1585 container_a.push_back(msg.bytes());
1586 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1587
1588 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1590 container_b.push_back(msg.bytes());
1591
1592 yield_to_executor().await;
1594 }
1595
1596 assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1597
1598 assert!(poll!(cursor.next()).is_pending());
1599
1600 container_a.terminate();
1601 assert_eq!(cursor.count().await, 0);
1602 }
1603
1604 #[fuchsia::test]
1605 async fn recycled_container_slot() {
1606 let buffer = Arc::new(SharedBuffer::new(
1607 create_ring_buffer(MAX_MESSAGE_SIZE),
1608 Box::new(|_| {}),
1609 SharedBufferOptions { sleep_time: Duration::ZERO },
1610 ));
1611 let container_a =
1612 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1613 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1614 container_a.push_back(msg.bytes());
1615
1616 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1617 assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1618
1619 let container_b =
1621 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1622 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1623 container_b.push_back(msg.bytes());
1624
1625 yield_to_executor().await;
1627 }
1628
1629 container_a.terminate();
1630
1631 let container_c =
1634 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1635 container_c.push_back(msg.bytes());
1636 container_c.push_back(msg.bytes());
1637
1638 assert_matches!(cursor.next().await, None);
1640 }
1641
1642 #[fuchsia::test]
1643 async fn socket_increments_logstats() {
1644 let inspector = Inspector::new(InspectorConfig::default());
1645 let stats: Arc<LogStreamStats> =
1646 Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1647 let buffer = Arc::new(SharedBuffer::new(
1648 create_ring_buffer(65536),
1649 Box::new(|_| {}),
1650 Default::default(),
1651 ));
1652 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1653 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1654
1655 let (local, remote) = zx::Socket::create_datagram();
1656 container_a.add_socket(remote);
1657
1658 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1659
1660 let mut futures = FuturesUnordered::new();
1663 futures.push(async move {
1664 let mut cursor_a = pin!(cursor_a);
1665 cursor_a.next().await
1666 });
1667 let mut next = futures.next();
1668 assert!(futures::poll!(&mut next).is_pending());
1669
1670 local.write(msg.bytes()).unwrap();
1671
1672 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1673
1674 assert_eq!(
1675 cursor_b
1676 .map(|item| {
1677 match item {
1678 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1679 _ => panic!("Unexpected item {item:?}"),
1680 }
1681 })
1682 .count()
1683 .await,
1684 1
1685 );
1686
1687 next.await;
1689 assert_data_tree!(
1691 inspector,
1692 root: contains {
1693 test: {
1694 url: "",
1695 last_timestamp: AnyProperty,
1696 sockets_closed: 0u64,
1697 sockets_opened: 1u64,
1698 invalid: {
1699 number: 0u64,
1700 bytes: 0u64,
1701 },
1702 total: {
1703 number: 1u64,
1704 bytes: 88u64,
1705 },
1706 rolled_out: {
1707 number: 0u64,
1708 bytes: 0u64,
1709 },
1710 trace: {
1711 number: 0u64,
1712 bytes: 0u64,
1713 },
1714 debug: {
1715 number: 1u64,
1716 bytes: 88u64,
1717 },
1718 info: {
1719 number: 0u64,
1720 bytes: 0u64,
1721 },
1722 warn: {
1723 number: 0u64,
1724 bytes: 0u64,
1725 },
1726 error: {
1727 number: 0u64,
1728 bytes: 0u64,
1729 },
1730 fatal: {
1731 number: 0u64,
1732 bytes: 0u64,
1733 },
1734 }
1735 }
1736 );
1737 }
1738
1739 #[fuchsia::test]
1740 async fn socket() {
1741 let buffer = Arc::new(SharedBuffer::new(
1742 create_ring_buffer(MAX_MESSAGE_SIZE),
1743 Box::new(|_| {}),
1744 Default::default(),
1745 ));
1746 let container_a =
1747 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1748 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1749
1750 let (local, remote) = zx::Socket::create_datagram();
1751 container_a.add_socket(remote);
1752
1753 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1754
1755 let mut futures = FuturesUnordered::new();
1758 futures.push(async move {
1759 let mut cursor_a = pin!(cursor_a);
1760 cursor_a.next().await
1761 });
1762 let mut next = futures.next();
1763 assert!(futures::poll!(&mut next).is_pending());
1764
1765 local.write(msg.bytes()).unwrap();
1766
1767 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1768
1769 assert_eq!(
1770 cursor_b
1771 .map(|item| {
1772 match item {
1773 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1774 _ => panic!("Unexpected item {item:?}"),
1775 }
1776 })
1777 .count()
1778 .await,
1779 1
1780 );
1781
1782 next.await;
1784 }
1785
1786 #[fuchsia::test]
1787 async fn socket_on_inactive() {
1788 let on_inactive = Arc::new(AtomicU64::new(0));
1789 let a_identity = Arc::new(vec!["a"].into());
1790 let buffer = Arc::new(SharedBuffer::new(
1791 create_ring_buffer(MAX_MESSAGE_SIZE),
1792 {
1793 let on_inactive = Arc::clone(&on_inactive);
1794 let a_identity = Arc::clone(&a_identity);
1795 Box::new(move |id| {
1796 assert_eq!(id, a_identity);
1797 on_inactive.fetch_add(1, Ordering::Relaxed);
1798 })
1799 },
1800 SharedBufferOptions { sleep_time: Duration::ZERO },
1801 ));
1802 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1803 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1804
1805 let (local, remote) = zx::Socket::create_datagram();
1806 container_a.add_socket(remote);
1807
1808 local.write(msg.bytes()).unwrap();
1809
1810 let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1811
1812 assert_eq!(
1813 cursor
1814 .map(|item| {
1815 match item {
1816 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1817 _ => panic!("Unexpected item {item:?}"),
1818 }
1819 })
1820 .count()
1821 .await,
1822 1
1823 );
1824
1825 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1827 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1828 container_b.push_back(msg.bytes());
1829
1830 yield_to_executor().await;
1832 }
1833
1834 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1835
1836 std::mem::drop(local);
1838
1839 while on_inactive.load(Ordering::Relaxed) != 1 {
1841 fasync::Timer::new(Duration::from_millis(50)).await;
1842 }
1843 }
1844
1845 #[fuchsia::test]
1846 async fn flush() {
1847 let a_identity = Arc::new(vec!["a"].into());
1848 let buffer = Arc::new(SharedBuffer::new(
1849 create_ring_buffer(1024 * 1024),
1850 Box::new(|_| {}),
1851 Default::default(),
1852 ));
1853 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1854 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1855
1856 let (local, remote) = zx::Socket::create_datagram();
1857 container_a.add_socket(remote);
1858
1859 let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1860
1861 const COUNT: usize = 1000;
1862 for _ in 0..COUNT {
1863 local.write(msg.bytes()).unwrap();
1864 }
1865
1866 let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1868 flush_futures.next().await;
1869
1870 let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1871 assert!(messages.is_some());
1872
1873 flush_futures.next().await;
1875
1876 buffer.terminate().await;
1878 }
1879}