archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::repository::ARCHIVIST_MONIKER;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use diagnostics_log_encoding::encode::add_dropped_count;
11use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
14use fuchsia_async as fasync;
15use fuchsia_async::condition::{Condition, ConditionGuard, WakerEntry};
16use fuchsia_sync::Mutex;
17use futures::Stream;
18use futures::channel::oneshot;
19use log::debug;
20use pin_project::{pin_project, pinned_drop};
21use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
22use std::collections::VecDeque;
23use std::mem::ManuallyDrop;
24use std::ops::{Deref, DerefMut, Range};
25use std::pin::{Pin};
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll};
28use std::time::Duration;
29use zerocopy::FromBytes;
30use zx::AsHandleRef as _;
31
32// Aim to keep 25% of the buffer free. This is expressed as a fraction: numerator / denominator.
33const SPACE_THRESHOLD_NUMERATOR: usize = 1;
34const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
35
36// The default amount of time that Archivist will sleep for to reduce how often it wakes up to
37// handle log messages.
38const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
39
40pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
41    RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
42}
43
44fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
45    // We always keep spare space in the buffer so that we don't drop messages.  This is controlled
46    // by SPACE_THRESHOLD_NUMERATOR & SPACE_THRESHOLD_DENOMINATOR.  Round up capacity so that
47    // `capacity` reflects the actual amount of log data we can store.
48    let page_size = zx::system_get_page_size() as usize;
49    (capacity * SPACE_THRESHOLD_DENOMINATOR
50        / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
51        .next_multiple_of(page_size)
52}
53
54const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
55
56pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
57
58pub struct SharedBuffer {
59    inner: Condition<Inner>,
60
61    // Sockets. This *must* be locked after `inner` and the lock *must* be dropped before
62    // `InnerGuard` is dropped.
63    sockets: Mutex<Slab<Socket>>,
64
65    // Callback for when a container is inactive (i.e. no logs and no sockets).
66    on_inactive: OnInactive,
67
68    // The port that we use to service the sockets.
69    port: zx::Port,
70
71    // Event used for termination.
72    event: zx::Event,
73
74    // The thread that we use to service the sockets.
75    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
76
77    // The task responsible for monitoring the space in the IOBuffer and popping messages when it
78    // gets full. It will also wake cursors whenever new data arrives.
79    _buffer_monitor_task: fasync::Task<()>,
80}
81
82struct InnerGuard<'a> {
83    buffer: &'a SharedBuffer,
84
85    guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
86
87    // The list of components that should be reported as inactive once the lock is dropped.
88    on_inactive: Vec<Arc<ComponentIdentity>>,
89
90    // If true, wake all the wakers registered with the ConditionGuard.
91    wake: bool,
92}
93
94impl Drop for InnerGuard<'_> {
95    fn drop(&mut self) {
96        if self.wake {
97            for waker in self.guard.drain_wakers() {
98                waker.wake();
99            }
100        }
101        // SAFETY: This is the only place we drop the guard.
102        unsafe {
103            ManuallyDrop::drop(&mut self.guard);
104        }
105        for identity in self.on_inactive.drain(..) {
106            (*self.buffer.on_inactive)(identity);
107        }
108    }
109}
110
111impl Deref for InnerGuard<'_> {
112    type Target = Inner;
113
114    fn deref(&self) -> &Self::Target {
115        &self.guard
116    }
117}
118
119impl DerefMut for InnerGuard<'_> {
120    fn deref_mut(&mut self) -> &mut Self::Target {
121        &mut self.guard
122    }
123}
124
125struct Inner {
126    // The ring buffer.
127    ring_buffer: Arc<RingBuffer>,
128
129    // Registered containers.
130    containers: Containers,
131
132    // Socket thread message queue.
133    thread_msg_queue: VecDeque<ThreadMessage>,
134
135    // The index in the buffer that we have scanned for messages.
136    last_scanned: u64,
137
138    // A copy of the tail index in the ring buffer.
139    tail: u64,
140
141    // The IOBuffer peers that we must watch.
142    iob_peers: Slab<(ContainerId, zx::Iob)>,
143}
144
145enum ThreadMessage {
146    // The thread should terminate.
147    Terminate,
148
149    // Process all pending socket messages and report via the Sender when done.
150    Flush(oneshot::Sender<()>),
151}
152
153pub struct SharedBufferOptions {
154    // To reduce how often Archivist wakes when new messages are written, Archivist will sleep for
155    // this time. This will impact how quickly messages show up via the cursors.
156    pub sleep_time: Duration,
157}
158
159impl Default for SharedBufferOptions {
160    fn default() -> Self {
161        Self { sleep_time: DEFAULT_SLEEP_TIME }
162    }
163}
164
165impl SharedBuffer {
166    /// Returns a new shared buffer and the container for Archivist.
167    pub fn new(
168        ring_buffer: ring_buffer::Reader,
169        on_inactive: OnInactive,
170        options: SharedBufferOptions,
171    ) -> Arc<Self> {
172        let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
173            inner: Condition::new(Inner {
174                ring_buffer: Arc::clone(&ring_buffer),
175                containers: Containers::default(),
176                thread_msg_queue: VecDeque::default(),
177                last_scanned: 0,
178                tail: 0,
179                iob_peers: Slab::default(),
180            }),
181            sockets: Mutex::new(Slab::default()),
182            on_inactive,
183            port: zx::Port::create(),
184            event: zx::Event::create(),
185            socket_thread: Mutex::default(),
186            _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
187                Weak::clone(weak),
188                ring_buffer,
189                options.sleep_time,
190            )),
191        });
192
193        *this.socket_thread.lock() = Some({
194            let this = Arc::clone(&this);
195            std::thread::spawn(move || this.socket_thread(options.sleep_time))
196        });
197        this
198    }
199
200    pub fn new_container_buffer(
201        self: &Arc<Self>,
202        identity: Arc<ComponentIdentity>,
203        stats: Arc<LogStreamStats>,
204    ) -> ContainerBuffer {
205        let mut inner = self.inner.lock();
206        let Inner { containers, ring_buffer, .. } = &mut *inner;
207        ContainerBuffer {
208            shared_buffer: Arc::clone(self),
209            container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
210        }
211    }
212
213    pub async fn flush(&self) {
214        let (sender, receiver) = oneshot::channel();
215        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
216        self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
217        // Ignore failure if Archivist is shutting down.
218        let _ = receiver.await;
219    }
220
221    /// Returns the number of registered containers in use by the buffer.
222    #[cfg(test)]
223    pub fn container_count(&self) -> usize {
224        self.inner.lock().containers.len()
225    }
226
227    /// Terminates the socket thread. The socket thread will drain the sockets before terminating.
228    pub async fn terminate(&self) {
229        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Terminate);
230        self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
231        let join_handle = self.socket_thread.lock().take().unwrap();
232        fasync::unblock(|| {
233            let _ = join_handle.join();
234        })
235        .await;
236    }
237
238    fn socket_thread(&self, sleep_time: Duration) {
239        const INTERRUPT_KEY: u64 = u64::MAX;
240        let mut sockets_ready = Vec::new();
241        let mut iob_peer_closed = Vec::new();
242        let mut interrupt_needs_arming = true;
243        let mut msg = None;
244
245        loop {
246            let mut deadline = if msg.is_some() {
247                zx::MonotonicInstant::INFINITE_PAST
248            } else {
249                if interrupt_needs_arming {
250                    self.event
251                        .wait_async_handle(
252                            &self.port,
253                            INTERRUPT_KEY,
254                            zx::Signals::USER_0,
255                            zx::WaitAsyncOpts::empty(),
256                        )
257                        .unwrap();
258                    interrupt_needs_arming = false;
259                }
260
261                // Wait so that we're not constantly waking up for every log message that is queued.
262                // Ignore errors here.
263                let _ = self.event.wait_handle(
264                    zx::Signals::USER_0,
265                    zx::MonotonicInstant::after(sleep_time.into()),
266                );
267                zx::MonotonicInstant::INFINITE
268            };
269
270            // Gather the list of sockets that are ready to read.
271            loop {
272                match self.port.wait(deadline) {
273                    Ok(packet) => {
274                        if packet.key() == INTERRUPT_KEY {
275                            interrupt_needs_arming = true;
276                            // To maintain proper ordering, we must capture the message here whilst
277                            // we are still gathering the list of sockets that are ready to read.
278                            // If we wait till later, we introduce windows where we might miss a
279                            // socket that should be read.
280                            if msg.is_none() {
281                                msg = self.inner.lock().thread_msg_queue.pop_front();
282                            }
283                        } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
284                            iob_peer_closed.push(packet.key() as u32);
285                        } else {
286                            sockets_ready.push(SocketId(packet.key() as u32))
287                        }
288                    }
289                    Err(zx::Status::TIMED_OUT) => break,
290                    Err(status) => panic!("port wait error: {status:?}"),
291                }
292                deadline = zx::MonotonicInstant::INFINITE_PAST;
293            }
294
295            let mut inner = InnerGuard::new(self);
296
297            if !iob_peer_closed.is_empty() {
298                // See the comment on `is_active()` for why this is required.
299                inner.update_message_ids(inner.ring_buffer.head());
300
301                for iob_peer_closed in iob_peer_closed.drain(..) {
302                    let container_id = inner.iob_peers.free(iob_peer_closed).0;
303                    if let Some(container) = inner.containers.get_mut(container_id) {
304                        container.iob_count -= 1;
305                        if container.iob_count == 0 && !container.is_active() {
306                            if container.should_free() {
307                                inner.containers.free(container_id);
308                            } else {
309                                let identity = Arc::clone(&container.identity);
310                                inner.on_inactive.push(identity);
311                            }
312                        }
313                    }
314                }
315            }
316
317            {
318                let mut sockets = self.sockets.lock();
319                for socket_id in sockets_ready.drain(..) {
320                    inner.read_socket(&mut sockets, socket_id, |socket| {
321                        socket
322                            .socket
323                            .wait_async_handle(
324                                &self.port,
325                                socket_id.0 as u64,
326                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
327                                zx::WaitAsyncOpts::empty(),
328                            )
329                            .unwrap();
330                    });
331                }
332            }
333
334            // Now that we've processed the sockets, we can process the message.
335            if let Some(m) = msg.take() {
336                match m {
337                    ThreadMessage::Terminate => return,
338                    ThreadMessage::Flush(sender) => {
339                        let _ = sender.send(());
340                    }
341                }
342
343                // See if there's another message.
344                msg = inner.thread_msg_queue.pop_front();
345                if msg.is_none() {
346                    // If there are no more messages, we must clear the signal so that we get
347                    // notified when the next message arrives. This must be done whilst we are
348                    // holding the lock.
349                    self.event.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
350                }
351            }
352        }
353    }
354
355    async fn buffer_monitor_task(
356        this: Weak<Self>,
357        mut ring_buffer: ring_buffer::Reader,
358        sleep_time: Duration,
359    ) {
360        let mut last_head = 0;
361        loop {
362            // Sleep to limit how often we wake up.
363            fasync::Timer::new(sleep_time).await;
364            let head = ring_buffer.wait(last_head).await;
365            let Some(this) = this.upgrade() else { return };
366            let mut inner = InnerGuard::new(&this);
367            inner.check_space(head);
368            last_head = head;
369        }
370    }
371}
372
373impl Inner {
374    // Returns list of containers that no longer have any messages.
375    fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
376        if msg.len() < std::mem::size_of::<Header>() {
377            debug!("message too short ({})", msg.len());
378            if let Some(container) = self.containers.get(container_id) {
379                container.stats.increment_invalid(msg.len());
380            }
381            return;
382        }
383
384        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
385
386        // NOTE: Some tests send messages that are bigger than the header indicates. We ignore the
387        // tail of any such messages.
388        let msg_len = header.size_words() as usize * 8;
389
390        // Check the correct type and size.
391        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
392            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
393            if let Some(container) = self.containers.get(container_id) {
394                container.stats.increment_invalid(msg.len());
395            }
396            return;
397        }
398
399        let Some(container) = self.containers.get_mut(container_id) else {
400            return;
401        };
402
403        let mut data;
404        let msg = if container.dropped_count > 0 {
405            data = msg.to_vec();
406            if !add_dropped_count(&mut data, container.dropped_count) {
407                debug!("unable to add dropped count to invalid message");
408                container.stats.increment_invalid(data.len());
409                return;
410            }
411            &data
412        } else {
413            msg
414        };
415
416        if container.iob.write(Default::default(), 0, msg).is_err() {
417            // We were unable to write the message to the buffer, most likely due to lack of
418            // space. We drop this message and then we'll add to the dropped count for the next
419            // message.
420            container.dropped_count += 1
421        } else {
422            container.dropped_count = 0;
423        }
424    }
425
426    /// Parses the first message within `range` returns (container_id, message, timestamp).
427    ///
428    /// # Panics
429    ///
430    /// This will panic if the ring buffer has been corrupted. Only the kernel and Archivist can
431    /// write to the ring buffer and so we trust both not to corrupt the ring buffer.
432    ///
433    /// # Safety
434    ///
435    /// `range` *must* be within the written range of the ring buffer so that there is no concurrent
436    /// write access to that range. The returned slice is only valid whilst this remains true.
437    unsafe fn parse_message(
438        &self,
439        range: Range<u64>,
440    ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
441        let (tag, msg) = self
442            .ring_buffer
443            .first_message_in(range)
444            .expect("Unable to read message from ring buffer");
445        (
446            ContainerId(tag as u32),
447            msg,
448            (msg.len() >= 16)
449                .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
450        )
451    }
452}
453
454impl<'a> InnerGuard<'a> {
455    fn new(buffer: &'a SharedBuffer) -> Self {
456        Self {
457            buffer,
458            guard: ManuallyDrop::new(buffer.inner.lock()),
459            on_inactive: Vec::new(),
460            wake: false,
461        }
462    }
463
464    /// Pops a message and returns its total size. The caller should call `update_message_ids(head)`
465    /// prior to calling this.
466    ///
467    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
468    /// necessarily the message with the *oldest* timestamp because we might not have received the
469    /// messages in perfect timestamp order. This should be close enough for all use cases we care
470    /// about, and besides, the timestamps can't be trusted anyway.
471    fn pop(&mut self, head: u64) -> Option<usize> {
472        if head == self.tail {
473            return None;
474        }
475
476        // SAFETY: There can be no concurrent writes between `tail..head` and the *only* place
477        // we increment the tail index is just before we leave this function.
478        let record_len = {
479            let (container_id, message, timestamp) = unsafe { self.parse_message(self.tail..head) };
480            let record_len = ring_buffer_record_len(message.len());
481
482            let container = self.containers.get_mut(container_id).unwrap();
483
484            container.stats.increment_rolled_out(record_len);
485            container.msg_ids.start += 1;
486            if let Some(timestamp) = timestamp {
487                container.last_rolled_out_timestamp = timestamp;
488            }
489            if !container.is_active() {
490                if container.should_free() {
491                    self.containers.free(container_id);
492                } else {
493                    let identity = Arc::clone(&container.identity);
494                    self.on_inactive.push(identity);
495                }
496            }
497
498            record_len
499        };
500
501        // NOTE: This should go last. After incrementing `tail`, the `message` can be overwritten.
502        self.ring_buffer.increment_tail(record_len);
503        self.tail += record_len as u64;
504
505        // The caller should have called `update_message_ids(head)` prior to calling this.
506        assert!(self.last_scanned >= self.tail);
507
508        Some(record_len)
509    }
510
511    /// Reads a socket. Calls `rearm` to rearm the socket once it has been drained.
512    fn read_socket(
513        &mut self,
514        sockets: &mut Slab<Socket>,
515        socket_id: SocketId,
516        rearm: impl FnOnce(&mut Socket),
517    ) {
518        let Some(socket) = sockets.get_mut(socket_id.0) else { return };
519        let container_id = socket.container_id;
520
521        loop {
522            self.check_space(self.ring_buffer.head());
523
524            let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
525
526            // Read directly into the buffer leaving space for the header.
527            let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
528                Ok(d) => d.len(),
529                Err(zx::Status::SHOULD_WAIT) => {
530                    // The socket has been drained.
531                    rearm(socket);
532                    return;
533                }
534                Err(_) => break,
535            };
536
537            // SAFETY: `read_uninit` will have written to `len` bytes.
538            unsafe {
539                data.set_len(len);
540            }
541
542            let container = self.containers.get_mut(container_id).unwrap();
543            if data.len() < 16 {
544                container.stats.increment_invalid(data.len());
545                continue;
546            }
547
548            let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
549            let msg_len = header.size_words() as usize * 8;
550            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
551                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
552                container.stats.increment_invalid(data.len());
553                continue;
554            }
555
556            if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
557            {
558                debug!("unable to add dropped count to invalid message");
559                container.stats.increment_invalid(data.len());
560                continue;
561            }
562
563            if container.iob.write(Default::default(), 0, &data).is_err() {
564                // We were unable to write the message to the buffer, most likely due to lack of
565                // space. We drop this message and then we'll add to the dropped count for the next
566                // message.
567                container.dropped_count += 1
568            } else {
569                container.dropped_count = 0;
570            }
571        }
572
573        // This path is taken when the socket should be closed.
574
575        // See the comment on `is_active()` for why this is required.
576        self.update_message_ids(self.ring_buffer.head());
577
578        let container = self.containers.get_mut(container_id).unwrap();
579        container.remove_socket(socket_id, sockets);
580        if !container.is_active() {
581            if container.should_free() {
582                self.containers.free(container_id);
583            } else {
584                let identity = Arc::clone(&container.identity);
585                self.on_inactive.push(identity);
586            }
587        }
588    }
589
590    /// Scans the ring buffer and updates `msg_ids` for the containers.
591    fn update_message_ids(&mut self, head: u64) {
592        while self.last_scanned < head {
593            // SAFETY: This is safe because `head` must be within the ring buffer range and we make
594            // sure that `self.last_scanned` is always >= `tail` in `pop()`.
595            let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
596            let msg_len = message.len();
597            let severity = (msg_len >= 8)
598                .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
599            let container = self.containers.get_mut(container_id).unwrap();
600            container.msg_ids.end += 1;
601            if let Some(severity) = severity {
602                container.stats.ingest_message(msg_len, severity);
603            }
604            self.last_scanned += ring_buffer_record_len(msg_len) as u64;
605            self.wake = true;
606        }
607    }
608
609    /// Ensures the buffer keeps the required amount of space.
610    fn check_space(&mut self, head: u64) {
611        self.update_message_ids(head);
612        let capacity = self.ring_buffer.capacity();
613        let mut space = capacity
614            .checked_sub((head - self.tail) as usize)
615            .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
616        let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
617        while space < required_space {
618            let Some(amount) = self.pop(head) else { break };
619            space += amount;
620        }
621    }
622}
623
624#[derive(Default)]
625struct Containers {
626    slab: Slab<ContainerInfo>,
627}
628
629#[derive(Clone, Copy, Debug)]
630struct ContainerId(u32);
631
632impl Containers {
633    #[cfg(test)]
634    fn len(&self) -> usize {
635        self.slab.len()
636    }
637
638    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
639        self.slab.get(id.0)
640    }
641
642    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
643        self.slab.get_mut(id.0)
644    }
645
646    fn new_container(
647        &mut self,
648        buffer: &RingBuffer,
649        identity: Arc<ComponentIdentity>,
650        stats: Arc<LogStreamStats>,
651    ) -> ContainerId {
652        ContainerId(self.slab.insert(|id| {
653            let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
654            ContainerInfo::new(identity, stats, iob)
655        }))
656    }
657
658    fn free(&mut self, id: ContainerId) {
659        self.slab.free(id.0);
660    }
661}
662
663#[derive(Derivative)]
664#[derivative(Debug)]
665struct ContainerInfo {
666    // The number of references (typically cursors) that prevent this object from being freed.
667    refs: usize,
668
669    // The identity of the container.
670    identity: Arc<ComponentIdentity>,
671
672    // The first index in the shared buffer for a message for this container. This index
673    // might have been rolled out. This is used as an optimisation to set the starting
674    // cursor position.
675    first_index: u64,
676
677    // The first and last message IDs stored in the shared buffer. The last message ID can be out of
678    // date if there are concurrent writers.
679    msg_ids: Range<u64>,
680
681    // Whether the container is terminated.
682    terminated: bool,
683
684    // Inspect instrumentation.
685    #[derivative(Debug = "ignore")]
686    stats: Arc<LogStreamStats>,
687
688    // The timestamp of the message most recently rolled out.
689    last_rolled_out_timestamp: zx::BootInstant,
690
691    // The first socket ID for this container.
692    first_socket_id: SocketId,
693
694    // The IOBuffer used for writing.
695    iob: zx::Iob,
696
697    // The number of client IOBuffers.
698    iob_count: usize,
699
700    // The number of messages dropped when forwarding from a socket to an IOBuffer.
701    dropped_count: u64,
702}
703
704impl ContainerInfo {
705    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
706        Self {
707            refs: 0,
708            identity,
709            first_index: 0,
710            msg_ids: 0..0,
711            terminated: false,
712            stats,
713            last_rolled_out_timestamp: zx::BootInstant::ZERO,
714            first_socket_id: SocketId::NULL,
715            iob,
716            iob_count: 0,
717            dropped_count: 0,
718        }
719    }
720
721    fn should_free(&self) -> bool {
722        self.terminated && self.refs == 0 && !self.is_active()
723    }
724
725    // Returns true if the container is considered active which is the case if it has sockets, io
726    // buffers, or buffered messages.
727    //
728    // NOTE: Whenever a socket or iob is closed, `update_message_ids` must called to ensure
729    // `msg_ids.end` is correctly set.
730    fn is_active(&self) -> bool {
731        self.first_socket_id != SocketId::NULL
732            || self.iob_count > 0
733            || self.msg_ids.end != self.msg_ids.start
734            || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
735    }
736
737    // # Panics
738    //
739    // This will panic if the socket isn't found.
740    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
741        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
742        if prev == SocketId::NULL {
743            self.first_socket_id = next;
744        } else {
745            sockets.get_mut(prev.0).unwrap().next = next;
746        }
747        if next != SocketId::NULL {
748            sockets
749                .get_mut(next.0)
750                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
751                .prev = prev;
752        }
753        sockets.free(socket_id.0);
754        self.stats.close_socket();
755        debug!(identity:% = self.identity; "Socket closed.");
756    }
757}
758
759pub struct ContainerBuffer {
760    shared_buffer: Arc<SharedBuffer>,
761    container_id: ContainerId,
762}
763
764impl ContainerBuffer {
765    /// Returns the tag used by IOBuffers used for this component.
766    pub fn iob_tag(&self) -> u64 {
767        self.container_id.0 as u64
768    }
769
770    /// Ingests a new message.
771    ///
772    /// If the message is invalid, it is dropped.
773    pub fn push_back(&self, msg: &[u8]) {
774        self.shared_buffer.inner.lock().ingest(msg, self.container_id);
775    }
776
777    /// Returns an IOBuffer for the container.
778    pub fn iob(&self) -> zx::Iob {
779        let mut inner = self.shared_buffer.inner.lock();
780
781        inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
782
783        let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
784
785        inner.iob_peers.insert(|idx| {
786            ep1.wait_async_handle(
787                &self.shared_buffer.port,
788                idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
789                zx::Signals::IOB_PEER_CLOSED,
790                zx::WaitAsyncOpts::empty(),
791            )
792            .unwrap();
793
794            (self.container_id, ep1)
795        });
796
797        ep0
798    }
799
800    /// Returns a cursor.
801    pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
802        // NOTE: It is not safe to use on_inactive in this function because this function can be
803        // called whilst locks are held which are the same locks that the on_inactive notification
804        // uses.
805        let mut inner = InnerGuard::new(&self.shared_buffer);
806        let Some(mut container) = inner.containers.get_mut(self.container_id) else {
807            // We've hit a race where the container has terminated.
808            return None;
809        };
810
811        container.refs += 1;
812        let stats = Arc::clone(&container.stats);
813
814        let (index, next_id, end) = match mode {
815            StreamMode::Snapshot => {
816                (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
817            }
818            StreamMode::Subscribe => {
819                // Call `update_message_ids` so that `msg_ids.end` is up to date.
820                let head = inner.ring_buffer.head();
821                inner.update_message_ids(head);
822                container = inner.containers.get_mut(self.container_id).unwrap();
823                (head, container.msg_ids.end, CursorEnd::Stream)
824            }
825            StreamMode::SnapshotThenSubscribe => {
826                (container.first_index, container.msg_ids.start, CursorEnd::Stream)
827            }
828        };
829
830        Some(Cursor {
831            index,
832            container_id: self.container_id,
833            next_id,
834            end,
835            buffer: Arc::clone(&self.shared_buffer),
836            waker_entry: self.shared_buffer.inner.waker_entry(),
837            stats,
838        })
839    }
840
841    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
842    /// The component's data will remain in the buffer until the messages are rolled out. This will
843    /// *not* drain sockets or close IOBuffers.
844    pub fn terminate(&self) {
845        let mut inner = InnerGuard::new(&self.shared_buffer);
846
847        // See the comment on `is_active()` for why this is required.
848        inner.update_message_ids(inner.ring_buffer.head());
849
850        if let Some(container) = inner.containers.get_mut(self.container_id) {
851            container.terminated = true;
852            if container.first_socket_id != SocketId::NULL {
853                let mut sockets = self.shared_buffer.sockets.lock();
854                loop {
855                    container.remove_socket(container.first_socket_id, &mut sockets);
856                    if container.first_socket_id == SocketId::NULL {
857                        break;
858                    }
859                }
860            }
861            if container.should_free() {
862                inner.containers.free(self.container_id);
863            }
864            inner.wake = true;
865        }
866    }
867
868    /// Returns true if the container has messages, sockets or IOBuffers.
869    pub fn is_active(&self) -> bool {
870        self.shared_buffer
871            .inner
872            .lock()
873            .containers
874            .get(self.container_id)
875            .is_some_and(|c| c.is_active())
876    }
877
878    /// Adds a socket for this container.
879    pub fn add_socket(&self, socket: zx::Socket) {
880        let mut inner = self.shared_buffer.inner.lock();
881        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
882        container.stats.open_socket();
883        let next = container.first_socket_id;
884        let mut sockets = self.shared_buffer.sockets.lock();
885        let socket_id = SocketId(sockets.insert(|socket_id| {
886            socket
887                .wait_async_handle(
888                    &self.shared_buffer.port,
889                    socket_id as u64,
890                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
891                    zx::WaitAsyncOpts::empty(),
892                )
893                .unwrap();
894            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
895        }));
896        if next != SocketId::NULL {
897            sockets.get_mut(next.0).unwrap().prev = socket_id;
898        }
899        container.first_socket_id = socket_id;
900    }
901}
902
903impl Drop for ContainerBuffer {
904    fn drop(&mut self) {
905        self.terminate();
906    }
907}
908
909#[pin_project(PinnedDrop)]
910#[derive(Derivative)]
911#[derivative(Debug)]
912pub struct Cursor {
913    // Index in the buffer that we should continue searching from.
914    index: u64,
915
916    container_id: ContainerId,
917
918    // The next expected message ID.
919    next_id: u64,
920
921    // The ID, if any, that we should end at (exclusive).
922    end: CursorEnd,
923
924    #[derivative(Debug = "ignore")]
925    buffer: Arc<SharedBuffer>,
926
927    // waker_entry is used to register a waker for subscribing cursors.
928    #[pin]
929    #[derivative(Debug = "ignore")]
930    waker_entry: WakerEntry<Inner>,
931
932    #[derivative(Debug = "ignore")]
933    stats: Arc<LogStreamStats>,
934}
935
936/// The next element in the stream or a marker of the number of items rolled out since last polled.
937#[derive(Debug, PartialEq)]
938pub enum LazyItem<T> {
939    /// The next item in the stream.
940    Next(Arc<T>),
941    /// A count of the items dropped between the last call to poll_next and this one.
942    ItemsRolledOut(u64, zx::BootInstant),
943}
944
945impl Stream for Cursor {
946    type Item = LazyItem<StoredMessage>;
947
948    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
949        let mut this = self.project();
950        let mut inner = InnerGuard::new(this.buffer);
951
952        let mut head = inner.ring_buffer.head();
953        inner.check_space(head);
954
955        let mut container = match inner.containers.get(*this.container_id) {
956            None => return Poll::Ready(None),
957            Some(container) => container,
958        };
959
960        let end_id = match &mut this.end {
961            CursorEnd::Snapshot(None) => {
962                let mut sockets = this.buffer.sockets.lock();
963                // Drain the sockets associated with this container first so that we capture
964                // pending messages. Some tests rely on this.
965                let mut socket_id = container.first_socket_id;
966                while socket_id != SocketId::NULL {
967                    let socket = sockets.get_mut(socket_id.0).unwrap();
968                    let next = socket.next;
969                    // The socket doesn't need to be rearmed here.
970                    inner.read_socket(&mut sockets, socket_id, |_| {});
971                    socket_id = next;
972                }
973
974                // Call `update_message_ids` so that `msg_ids.end` is up to date.
975                head = inner.ring_buffer.head();
976                inner.update_message_ids(head);
977                container = inner.containers.get(*this.container_id).unwrap();
978                *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
979                container.msg_ids.end
980            }
981            CursorEnd::Snapshot(Some(end)) => *end,
982            CursorEnd::Stream => u64::MAX,
983        };
984
985        if *this.next_id == end_id {
986            return Poll::Ready(None);
987        }
988
989        // See if messages have been rolled out.
990        if container.msg_ids.start > *this.next_id {
991            let mut next_id = container.msg_ids.start;
992            if end_id < next_id {
993                next_id = end_id;
994            }
995            let rolled_out = next_id - *this.next_id;
996            *this.next_id = next_id;
997            return Poll::Ready(Some(LazyItem::ItemsRolledOut(
998                rolled_out,
999                container.last_rolled_out_timestamp,
1000            )));
1001        }
1002
1003        if inner.tail > *this.index {
1004            *this.index = inner.tail;
1005        }
1006
1007        // Some optimisations to reduce the amount of searching we might have to do.
1008        if container.first_index > *this.index {
1009            *this.index = container.first_index;
1010        }
1011
1012        if *this.next_id == container.msg_ids.end && *this.index < inner.last_scanned {
1013            *this.index = inner.last_scanned;
1014        }
1015
1016        // Scan forward until we find a record matching this container.
1017        while *this.index < head {
1018            // SAFETY: `*this.index..head` must be within the written region of the ring-buffer.
1019            let (container_id, message, _timestamp) =
1020                unsafe { inner.parse_message(*this.index..head) };
1021
1022            // Move index to the next record.
1023            *this.index += ring_buffer_record_len(message.len()) as u64;
1024            assert!(*this.index <= head);
1025
1026            if container_id.0 == this.container_id.0 {
1027                *this.next_id += 1;
1028                if let Some(msg) = StoredMessage::new(message.into(), this.stats) {
1029                    return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
1030                } else {
1031                    // The message is corrupt. Just skip it.
1032                }
1033            }
1034        }
1035
1036        if container.terminated {
1037            Poll::Ready(None)
1038        } else {
1039            inner.guard.add_waker(this.waker_entry, cx.waker().clone());
1040            Poll::Pending
1041        }
1042    }
1043}
1044
1045#[pinned_drop]
1046impl PinnedDrop for Cursor {
1047    fn drop(self: Pin<&mut Self>) {
1048        let mut inner = self.buffer.inner.lock();
1049        if let Some(container) = inner.containers.get_mut(self.container_id) {
1050            container.refs -= 1;
1051            if container.should_free() {
1052                inner.containers.free(self.container_id);
1053            }
1054        }
1055    }
1056}
1057
1058#[derive(Debug)]
1059enum CursorEnd {
1060    // The id will be None when the cursor is first created and is set after the cursor
1061    // is first polled.
1062    Snapshot(Option<u64>),
1063    Stream,
1064}
1065
1066/// Implements a simple Slab allocator.
1067struct Slab<T> {
1068    slab: Vec<Slot<T>>,
1069    free_index: usize,
1070}
1071
1072impl<T> Default for Slab<T> {
1073    fn default() -> Self {
1074        Self { slab: Vec::new(), free_index: usize::MAX }
1075    }
1076}
1077
1078enum Slot<T> {
1079    Used(T),
1080    Free(usize),
1081}
1082
1083impl<T> Slab<T> {
1084    /// Returns the number of used entries. This is not performant.
1085    #[cfg(test)]
1086    fn len(&self) -> usize {
1087        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
1088    }
1089
1090    fn free(&mut self, index: u32) -> T {
1091        let index = index as usize;
1092        let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
1093            Slot::Free(_) => panic!("Slot already free"),
1094            Slot::Used(value) => value,
1095        };
1096        self.free_index = index;
1097        value
1098    }
1099
1100    fn get(&self, id: u32) -> Option<&T> {
1101        self.slab.get(id as usize).and_then(|s| match s {
1102            Slot::Used(s) => Some(s),
1103            _ => None,
1104        })
1105    }
1106
1107    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
1108        self.slab.get_mut(id as usize).and_then(|s| match s {
1109            Slot::Used(s) => Some(s),
1110            _ => None,
1111        })
1112    }
1113
1114    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
1115        let free_index = self.free_index;
1116        if free_index != usize::MAX {
1117            self.free_index = match std::mem::replace(
1118                &mut self.slab[free_index],
1119                Slot::Used(value(free_index as u32)),
1120            ) {
1121                Slot::Free(next) => next,
1122                _ => unreachable!(),
1123            };
1124            free_index as u32
1125        } else {
1126            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
1127            // (see SocketId::NULL below).
1128            assert!(self.slab.len() < u32::MAX as usize);
1129            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1130            (self.slab.len() - 1) as u32
1131        }
1132    }
1133}
1134
1135#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1136struct SocketId(u32);
1137
1138impl SocketId {
1139    const NULL: Self = SocketId(0xffff_ffff);
1140}
1141
1142struct Socket {
1143    socket: zx::Socket,
1144    container_id: ContainerId,
1145    // Sockets are stored as a linked list for each container.
1146    prev: SocketId,
1147    next: SocketId,
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152    use super::{SharedBuffer, SharedBufferOptions, create_ring_buffer};
1153    use crate::logs::shared_buffer::LazyItem;
1154    use crate::logs::stats::LogStreamStats;
1155    use crate::logs::testing::make_message;
1156    use assert_matches::assert_matches;
1157    use diagnostics_assertions::{AnyProperty, assert_data_tree};
1158    use fidl_fuchsia_diagnostics::StreamMode;
1159    use fuchsia_async as fasync;
1160    use fuchsia_async::TimeoutExt;
1161    use fuchsia_inspect::{Inspector, InspectorConfig};
1162    use fuchsia_inspect_derive::WithInspect;
1163    use futures::channel::mpsc;
1164    use futures::future::OptionFuture;
1165    use futures::stream::{FuturesUnordered, StreamExt as _};
1166    use futures::{FutureExt, poll};
1167    use ring_buffer::MAX_MESSAGE_SIZE;
1168    use std::future::poll_fn;
1169    use std::pin::pin;
1170    use std::sync::Arc;
1171    use std::sync::atomic::{AtomicU64, Ordering};
1172    use std::task::Poll;
1173    use std::time::Duration;
1174
1175    async fn yield_to_executor() {
1176        let mut first_time = true;
1177        poll_fn(|cx| {
1178            if first_time {
1179                cx.waker().wake_by_ref();
1180                first_time = false;
1181                Poll::Pending
1182            } else {
1183                Poll::Ready(())
1184            }
1185        })
1186        .await;
1187    }
1188
1189    #[fuchsia::test]
1190    async fn push_one_message() {
1191        let buffer = SharedBuffer::new(
1192            create_ring_buffer(MAX_MESSAGE_SIZE),
1193            Box::new(|_| {}),
1194            Default::default(),
1195        );
1196        let container_buffer =
1197            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1198        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1199        container_buffer.push_back(msg.bytes());
1200
1201        // Make sure the cursor can find it.
1202        let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1203        assert_eq!(
1204            cursor
1205                .map(|item| {
1206                    match item {
1207                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1208                        _ => panic!("Unexpected item {item:?}"),
1209                    }
1210                })
1211                .count()
1212                .await,
1213            1
1214        );
1215    }
1216
1217    #[fuchsia::test]
1218    async fn message_too_short() {
1219        let buffer = SharedBuffer::new(
1220            create_ring_buffer(MAX_MESSAGE_SIZE),
1221            Box::new(|_| {}),
1222            Default::default(),
1223        );
1224
1225        let container_buffer =
1226            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1227        container_buffer.push_back(&[0]);
1228
1229        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1230    }
1231
1232    #[fuchsia::test]
1233    async fn bad_type() {
1234        let buffer = SharedBuffer::new(
1235            create_ring_buffer(MAX_MESSAGE_SIZE),
1236            Box::new(|_| {}),
1237            Default::default(),
1238        );
1239        let container_buffer =
1240            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1241        container_buffer.push_back(&[0x77; 16]);
1242
1243        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1244    }
1245
1246    #[fuchsia::test]
1247    async fn message_truncated() {
1248        let buffer = SharedBuffer::new(
1249            create_ring_buffer(MAX_MESSAGE_SIZE),
1250            Box::new(|_| {}),
1251            Default::default(),
1252        );
1253        let container_buffer =
1254            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1255        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1256        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1257
1258        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1259    }
1260
1261    #[fuchsia::test]
1262    async fn buffer_wrapping() {
1263        let buffer = SharedBuffer::new(
1264            create_ring_buffer(MAX_MESSAGE_SIZE),
1265            Box::new(|_| {}),
1266            SharedBufferOptions { sleep_time: Duration::ZERO },
1267        );
1268        let container_buffer =
1269            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1270
1271        // Keep writing messages until we wrap.
1272        let mut i = 0;
1273        loop {
1274            let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1275            container_buffer.push_back(msg.bytes());
1276            i += 1;
1277
1278            // Yield to the executor to allow messages to be rolled out.
1279            yield_to_executor().await;
1280
1281            let inner = buffer.inner.lock();
1282            if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1283                break;
1284            }
1285        }
1286
1287        // Read back all the messages.
1288        let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1289
1290        let mut j;
1291        let mut item = cursor.next().await;
1292        // Calling `cursor.next()` can cause messages to be rolled out, so skip those if that has
1293        // happened.
1294        if let Some(LazyItem::ItemsRolledOut(_, _)) = item {
1295            item = cursor.next().await;
1296        }
1297        assert_matches!(
1298            item,
1299            Some(LazyItem::Next(item)) => {
1300                j = item.timestamp().into_nanos();
1301                let msg = make_message(&format!("{j}"),
1302                                       None,
1303                                       item.timestamp());
1304                assert_eq!(&*item, &msg);
1305            }
1306        );
1307
1308        j += 1;
1309        while j != i {
1310            assert_matches!(
1311                cursor.next().await,
1312                Some(LazyItem::Next(item)) => {
1313                    assert_eq!(&*item, &make_message(&format!("{j}"),
1314                                                     None,
1315                                                     item.timestamp()));
1316                }
1317            );
1318            j += 1;
1319        }
1320
1321        assert_eq!(cursor.next().await, None);
1322    }
1323
1324    #[fuchsia::test]
1325    async fn on_inactive() {
1326        let identity = Arc::new(vec!["a"].into());
1327        let on_inactive = Arc::new(AtomicU64::new(0));
1328        let buffer = {
1329            let on_inactive = Arc::clone(&on_inactive);
1330            let identity = Arc::clone(&identity);
1331            Arc::new(SharedBuffer::new(
1332                create_ring_buffer(MAX_MESSAGE_SIZE),
1333                Box::new(move |i| {
1334                    assert_eq!(i, identity);
1335                    on_inactive.fetch_add(1, Ordering::Relaxed);
1336                }),
1337                SharedBufferOptions { sleep_time: Duration::ZERO },
1338            ))
1339        };
1340        let container_a = buffer.new_container_buffer(identity, Arc::default());
1341        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1342
1343        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1344        container_a.push_back(msg.bytes());
1345
1346        // Repeatedly write messages to b until a is rolled out.
1347        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1348            container_b.push_back(msg.bytes());
1349
1350            // Yield to the executor to allow messages to be rolled out.
1351            yield_to_executor().await;
1352        }
1353
1354        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1355    }
1356
1357    #[fuchsia::test]
1358    async fn terminate_drops_container() {
1359        // Silence a clippy warning; SharedBuffer needs an executor.
1360        async {}.await;
1361
1362        let buffer = SharedBuffer::new(
1363            create_ring_buffer(MAX_MESSAGE_SIZE),
1364            Box::new(|_| {}),
1365            SharedBufferOptions { sleep_time: Duration::ZERO },
1366        );
1367
1368        // terminate when buffer has no logs.
1369        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1370        assert_eq!(buffer.container_count(), 1);
1371        container_a.terminate();
1372
1373        assert_eq!(buffer.container_count(), 0);
1374
1375        // terminate when buffer has logs.
1376        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1377        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1378        container_a.push_back(msg.bytes());
1379        assert_eq!(buffer.container_count(), 1);
1380        container_a.terminate();
1381
1382        // The container should still be there because it has logs.
1383        assert_eq!(buffer.container_count(), 1);
1384
1385        // Roll out the logs.
1386        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1387        assert_eq!(buffer.container_count(), 2);
1388
1389        // Repeatedly write messages to b until a's message is dropped and then the container will
1390        // be dropped.
1391        while buffer.container_count() != 1 {
1392            container_b.push_back(msg.bytes());
1393
1394            // Yield to the executor to allow messages to be rolled out.
1395            yield_to_executor().await;
1396        }
1397
1398        assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1399    }
1400
1401    #[fuchsia::test]
1402    async fn cursor_subscribe() {
1403        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1404            let buffer = SharedBuffer::new(
1405                create_ring_buffer(MAX_MESSAGE_SIZE),
1406                Box::new(|_| {}),
1407                Default::default(),
1408            );
1409            let container =
1410                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1411            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1412            container.push_back(msg.bytes());
1413
1414            let (sender, mut receiver) = mpsc::unbounded();
1415
1416            // Run the cursor in a separate task so that we can test it gets woken correctly.
1417            {
1418                let container = Arc::clone(&container);
1419                fasync::Task::spawn(async move {
1420                    let mut cursor = pin!(container.cursor(mode).unwrap());
1421                    while let Some(item) = cursor.next().await {
1422                        sender.unbounded_send(item).unwrap();
1423                    }
1424                })
1425                .detach();
1426            }
1427
1428            // The existing message should only be returned with SnapshotThenSubscribe
1429            if mode == StreamMode::SnapshotThenSubscribe {
1430                assert_matches!(
1431                    receiver.next().await,
1432                    Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1433                );
1434            }
1435
1436            // No message should arrive. We can only use a timeout here.
1437            assert_eq!(
1438                OptionFuture::from(Some(receiver.next()))
1439                    .on_timeout(Duration::from_millis(500), || None)
1440                    .await,
1441                None
1442            );
1443
1444            container.push_back(msg.bytes());
1445
1446            // The message should arrive now.
1447            assert_matches!(
1448                receiver.next().await,
1449                Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1450            );
1451
1452            container.terminate();
1453
1454            // The cursor should terminate now.
1455            assert!(receiver.next().await.is_none());
1456        }
1457    }
1458
1459    #[fuchsia::test]
1460    async fn cursor_rolled_out() {
1461        // On the first pass we roll out before the cursor has started. On the second pass, we roll
1462        // out after the cursor has started.
1463        for pass in 0..2 {
1464            let buffer = SharedBuffer::new(
1465                create_ring_buffer(MAX_MESSAGE_SIZE),
1466                Box::new(|_| {}),
1467                SharedBufferOptions { sleep_time: Duration::ZERO },
1468            );
1469            let container_a =
1470                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1471            let container_b =
1472                Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1473            let container_c =
1474                Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1475            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1476
1477            container_a.push_back(msg.bytes());
1478            container_a.push_back(msg.bytes());
1479            container_b.push_back(msg.bytes());
1480
1481            const A_MESSAGE_COUNT: usize = 50;
1482            for _ in 0..A_MESSAGE_COUNT - 2 {
1483                container_a.push_back(msg.bytes());
1484            }
1485
1486            let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1487
1488            let mut expected = A_MESSAGE_COUNT;
1489
1490            // Get the first stored message on the first pass.
1491            if pass == 0 {
1492                assert!(cursor.next().await.is_some());
1493                expected -= 1;
1494            }
1495
1496            // Roll out messages until container_b is rolled out.
1497            while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1498                container_c.push_back(msg.bytes());
1499
1500                // Yield to the executor to allow messages to be rolled out.
1501                yield_to_executor().await;
1502            }
1503
1504            // We should have rolled out some messages in container a.
1505            assert_matches!(
1506                cursor.next().await,
1507                Some(LazyItem::ItemsRolledOut(rolled_out, t))
1508                    if t == zx::BootInstant::from_nanos(1) && rolled_out > 0
1509                    => expected -= rolled_out as usize
1510            );
1511
1512            // And check how many are remaining.
1513            assert_eq!(cursor.count().await, expected);
1514        }
1515    }
1516
1517    #[fuchsia::test]
1518    async fn drained_post_termination_cursors() {
1519        let buffer = SharedBuffer::new(
1520            create_ring_buffer(MAX_MESSAGE_SIZE),
1521            Box::new(|_| {}),
1522            Default::default(),
1523        );
1524        let container =
1525            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1526        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1527
1528        let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1529        let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1530
1531        container.push_back(msg.bytes());
1532        container.push_back(msg.bytes());
1533        container.push_back(msg.bytes());
1534        container.push_back(msg.bytes());
1535        container.push_back(msg.bytes());
1536
1537        let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1538        assert!(cursor_a.next().await.is_some());
1539        assert!(cursor_b.next().await.is_some());
1540        assert!(cursor_c.next().await.is_some());
1541
1542        container.terminate();
1543
1544        // All cursors should return the 4 remaining messages.
1545        assert_eq!(cursor_a.count().await, 4);
1546        assert_eq!(cursor_b.count().await, 4);
1547        assert_eq!(cursor_c.count().await, 4);
1548    }
1549
1550    #[fuchsia::test]
1551    async fn empty_post_termination_cursors() {
1552        let buffer = SharedBuffer::new(
1553            create_ring_buffer(MAX_MESSAGE_SIZE),
1554            Box::new(|_| {}),
1555            Default::default(),
1556        );
1557        let container =
1558            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1559
1560        let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1561        let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1562        let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1563
1564        container.terminate();
1565
1566        assert_eq!(cursor_a.count().await, 0);
1567        assert_eq!(cursor_b.count().await, 0);
1568        assert_eq!(cursor_c.count().await, 0);
1569    }
1570
1571    #[fuchsia::test]
1572    async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1573        let buffer = SharedBuffer::new(
1574            create_ring_buffer(MAX_MESSAGE_SIZE),
1575            Box::new(|_| {}),
1576            SharedBufferOptions { sleep_time: Duration::ZERO },
1577        );
1578        let container_a =
1579            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1580        let container_b =
1581            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1582        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1583        container_a.push_back(msg.bytes());
1584        container_a.push_back(msg.bytes());
1585        container_a.push_back(msg.bytes());
1586        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1587
1588        // Roll out all the messages.
1589        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1590            container_b.push_back(msg.bytes());
1591
1592            // Yield to the executor to allow messages to be rolled out.
1593            yield_to_executor().await;
1594        }
1595
1596        assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1597
1598        assert!(poll!(cursor.next()).is_pending());
1599
1600        container_a.terminate();
1601        assert_eq!(cursor.count().await, 0);
1602    }
1603
1604    #[fuchsia::test]
1605    async fn recycled_container_slot() {
1606        let buffer = Arc::new(SharedBuffer::new(
1607            create_ring_buffer(MAX_MESSAGE_SIZE),
1608            Box::new(|_| {}),
1609            SharedBufferOptions { sleep_time: Duration::ZERO },
1610        ));
1611        let container_a =
1612            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1613        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1614        container_a.push_back(msg.bytes());
1615
1616        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1617        assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1618
1619        // Roll out all the messages.
1620        let container_b =
1621            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1622        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1623            container_b.push_back(msg.bytes());
1624
1625            // Yield to the executor to allow messages to be rolled out.
1626            yield_to_executor().await;
1627        }
1628
1629        container_a.terminate();
1630
1631        // This should create a new container that uses a new slot and shouldn't interfere with
1632        // container_a.
1633        let container_c =
1634            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1635        container_c.push_back(msg.bytes());
1636        container_c.push_back(msg.bytes());
1637
1638        // The original cursor should have finished.
1639        assert_matches!(cursor.next().await, None);
1640    }
1641
1642    #[fuchsia::test]
1643    async fn socket_increments_logstats() {
1644        let inspector = Inspector::new(InspectorConfig::default());
1645        let stats: Arc<LogStreamStats> =
1646            Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1647        let buffer = Arc::new(SharedBuffer::new(
1648            create_ring_buffer(65536),
1649            Box::new(|_| {}),
1650            Default::default(),
1651        ));
1652        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1653        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1654
1655        let (local, remote) = zx::Socket::create_datagram();
1656        container_a.add_socket(remote);
1657
1658        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1659
1660        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1661        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1662        let mut futures = FuturesUnordered::new();
1663        futures.push(async move {
1664            let mut cursor_a = pin!(cursor_a);
1665            cursor_a.next().await
1666        });
1667        let mut next = futures.next();
1668        assert!(futures::poll!(&mut next).is_pending());
1669
1670        local.write(msg.bytes()).unwrap();
1671
1672        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1673
1674        assert_eq!(
1675            cursor_b
1676                .map(|item| {
1677                    match item {
1678                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1679                        _ => panic!("Unexpected item {item:?}"),
1680                    }
1681                })
1682                .count()
1683                .await,
1684            1
1685        );
1686
1687        // If cursor_a wasn't woken, this will hang.
1688        next.await;
1689        // Validate logstats (must happen after the socket was handled)
1690        assert_data_tree!(
1691            inspector,
1692            root: contains {
1693                test: {
1694                    url: "",
1695                    last_timestamp: AnyProperty,
1696                    sockets_closed: 0u64,
1697                    sockets_opened: 1u64,
1698                    invalid: {
1699                        number: 0u64,
1700                        bytes: 0u64,
1701                    },
1702                    total: {
1703                        number: 1u64,
1704                        bytes: 88u64,
1705                    },
1706                    rolled_out: {
1707                        number: 0u64,
1708                        bytes: 0u64,
1709                    },
1710                    trace: {
1711                        number: 0u64,
1712                        bytes: 0u64,
1713                    },
1714                    debug: {
1715                        number: 1u64,
1716                        bytes: 88u64,
1717                    },
1718                    info: {
1719                        number: 0u64,
1720                        bytes: 0u64,
1721                    },
1722                    warn: {
1723                        number: 0u64,
1724                        bytes: 0u64,
1725                    },
1726                    error: {
1727                        number: 0u64,
1728                        bytes: 0u64,
1729                    },
1730                    fatal: {
1731                        number: 0u64,
1732                        bytes: 0u64,
1733                    },
1734                }
1735            }
1736        );
1737    }
1738
1739    #[fuchsia::test]
1740    async fn socket() {
1741        let buffer = Arc::new(SharedBuffer::new(
1742            create_ring_buffer(MAX_MESSAGE_SIZE),
1743            Box::new(|_| {}),
1744            Default::default(),
1745        ));
1746        let container_a =
1747            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1748        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1749
1750        let (local, remote) = zx::Socket::create_datagram();
1751        container_a.add_socket(remote);
1752
1753        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1754
1755        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1756        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1757        let mut futures = FuturesUnordered::new();
1758        futures.push(async move {
1759            let mut cursor_a = pin!(cursor_a);
1760            cursor_a.next().await
1761        });
1762        let mut next = futures.next();
1763        assert!(futures::poll!(&mut next).is_pending());
1764
1765        local.write(msg.bytes()).unwrap();
1766
1767        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1768
1769        assert_eq!(
1770            cursor_b
1771                .map(|item| {
1772                    match item {
1773                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1774                        _ => panic!("Unexpected item {item:?}"),
1775                    }
1776                })
1777                .count()
1778                .await,
1779            1
1780        );
1781
1782        // If cursor_a wasn't woken, this will hang.
1783        next.await;
1784    }
1785
1786    #[fuchsia::test]
1787    async fn socket_on_inactive() {
1788        let on_inactive = Arc::new(AtomicU64::new(0));
1789        let a_identity = Arc::new(vec!["a"].into());
1790        let buffer = Arc::new(SharedBuffer::new(
1791            create_ring_buffer(MAX_MESSAGE_SIZE),
1792            {
1793                let on_inactive = Arc::clone(&on_inactive);
1794                let a_identity = Arc::clone(&a_identity);
1795                Box::new(move |id| {
1796                    assert_eq!(id, a_identity);
1797                    on_inactive.fetch_add(1, Ordering::Relaxed);
1798                })
1799            },
1800            SharedBufferOptions { sleep_time: Duration::ZERO },
1801        ));
1802        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1803        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1804
1805        let (local, remote) = zx::Socket::create_datagram();
1806        container_a.add_socket(remote);
1807
1808        local.write(msg.bytes()).unwrap();
1809
1810        let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1811
1812        assert_eq!(
1813            cursor
1814                .map(|item| {
1815                    match item {
1816                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1817                        _ => panic!("Unexpected item {item:?}"),
1818                    }
1819                })
1820                .count()
1821                .await,
1822            1
1823        );
1824
1825        // Now roll out a's messages.
1826        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1827        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1828            container_b.push_back(msg.bytes());
1829
1830            // Yield to the executor to allow messages to be rolled out.
1831            yield_to_executor().await;
1832        }
1833
1834        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1835
1836        // Close the socket.
1837        std::mem::drop(local);
1838
1839        // We don't know when the socket thread will run so we have to loop.
1840        while on_inactive.load(Ordering::Relaxed) != 1 {
1841            fasync::Timer::new(Duration::from_millis(50)).await;
1842        }
1843    }
1844
1845    #[fuchsia::test]
1846    async fn flush() {
1847        let a_identity = Arc::new(vec!["a"].into());
1848        let buffer = Arc::new(SharedBuffer::new(
1849            create_ring_buffer(1024 * 1024),
1850            Box::new(|_| {}),
1851            Default::default(),
1852        ));
1853        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1854        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1855
1856        let (local, remote) = zx::Socket::create_datagram();
1857        container_a.add_socket(remote);
1858
1859        let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1860
1861        const COUNT: usize = 1000;
1862        for _ in 0..COUNT {
1863            local.write(msg.bytes()).unwrap();
1864        }
1865
1866        // Race two flush futures.
1867        let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1868        flush_futures.next().await;
1869
1870        let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1871        assert!(messages.is_some());
1872
1873        // Make sure the other one finishes too.
1874        flush_futures.next().await;
1875
1876        // Make sure we can still terminate the buffer.
1877        buffer.terminate().await;
1878    }
1879}