archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::repository::ARCHIVIST_MONIKER;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use diagnostics_log_encoding::encode::add_dropped_count;
11use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
14use fuchsia_async as fasync;
15use fuchsia_async::condition::{Condition, ConditionGuard, WakerEntry};
16use fuchsia_sync::Mutex;
17use futures::Stream;
18use futures::channel::oneshot;
19use log::debug;
20use pin_project::{pin_project, pinned_drop};
21use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
22use std::collections::VecDeque;
23use std::mem::ManuallyDrop;
24use std::ops::{Deref, DerefMut, Range};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll};
28use std::time::Duration;
29use zerocopy::FromBytes;
30use zx::AsHandleRef as _;
31
32// Aim to keep 25% of the buffer free. This is expressed as a fraction: numerator / denominator.
33const SPACE_THRESHOLD_NUMERATOR: usize = 1;
34const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
35
36// The default amount of time that Archivist will sleep for to reduce how often it wakes up to
37// handle log messages.
38const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
39
40pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
41    RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
42}
43
44fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
45    // We always keep spare space in the buffer so that we don't drop messages.  This is controlled
46    // by SPACE_THRESHOLD_NUMERATOR & SPACE_THRESHOLD_DENOMINATOR.  Round up capacity so that
47    // `capacity` reflects the actual amount of log data we can store.
48    let page_size = zx::system_get_page_size() as usize;
49    (capacity * SPACE_THRESHOLD_DENOMINATOR
50        / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
51        .next_multiple_of(page_size)
52}
53
54const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
55
56pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
57
58pub struct SharedBuffer {
59    inner: Condition<Inner>,
60
61    // Sockets. This *must* be locked after `inner` and the lock *must* be dropped before
62    // `InnerGuard` is dropped.
63    sockets: Mutex<Slab<Socket>>,
64
65    // Callback for when a container is inactive (i.e. no logs and no sockets).
66    on_inactive: OnInactive,
67
68    // The port that we use to service the sockets.
69    port: zx::Port,
70
71    // Event used for termination.
72    event: zx::Event,
73
74    // The thread that we use to service the sockets.
75    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
76
77    // The task responsible for monitoring the space in the IOBuffer and popping messages when it
78    // gets full. It will also wake cursors whenever new data arrives.
79    _buffer_monitor_task: fasync::Task<()>,
80}
81
82struct InnerGuard<'a> {
83    buffer: &'a SharedBuffer,
84
85    guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
86
87    // The list of components that should be reported as inactive once the lock is dropped.
88    on_inactive: Vec<Arc<ComponentIdentity>>,
89
90    // If true, wake all the wakers registered with the ConditionGuard.
91    wake: bool,
92}
93
94impl Drop for InnerGuard<'_> {
95    fn drop(&mut self) {
96        if self.wake {
97            for waker in self.guard.drain_wakers() {
98                waker.wake();
99            }
100        }
101        // SAFETY: This is the only place we drop the guard.
102        unsafe {
103            ManuallyDrop::drop(&mut self.guard);
104        }
105        for identity in self.on_inactive.drain(..) {
106            (*self.buffer.on_inactive)(identity);
107        }
108    }
109}
110
111impl Deref for InnerGuard<'_> {
112    type Target = Inner;
113
114    fn deref(&self) -> &Self::Target {
115        &self.guard
116    }
117}
118
119impl DerefMut for InnerGuard<'_> {
120    fn deref_mut(&mut self) -> &mut Self::Target {
121        &mut self.guard
122    }
123}
124
125struct Inner {
126    // The ring buffer.
127    ring_buffer: Arc<RingBuffer>,
128
129    // Registered containers.
130    containers: Containers,
131
132    // Socket thread message queue.
133    thread_msg_queue: VecDeque<ThreadMessage>,
134
135    // The index in the buffer that we have scanned for messages.
136    last_scanned: u64,
137
138    // A copy of the tail index in the ring buffer.
139    tail: u64,
140
141    // The IOBuffer peers that we must watch.
142    iob_peers: Slab<(ContainerId, zx::Iob)>,
143}
144
145enum ThreadMessage {
146    // The thread should terminate.
147    Terminate,
148
149    // Process all pending socket messages and report via the Sender when done.
150    Flush(oneshot::Sender<()>),
151}
152
153pub struct SharedBufferOptions {
154    // To reduce how often Archivist wakes when new messages are written, Archivist will sleep for
155    // this time. This will impact how quickly messages show up via the cursors.
156    pub sleep_time: Duration,
157}
158
159impl Default for SharedBufferOptions {
160    fn default() -> Self {
161        Self { sleep_time: DEFAULT_SLEEP_TIME }
162    }
163}
164
165impl SharedBuffer {
166    /// Returns a new shared buffer and the container for Archivist.
167    pub fn new(
168        ring_buffer: ring_buffer::Reader,
169        on_inactive: OnInactive,
170        options: SharedBufferOptions,
171    ) -> Arc<Self> {
172        let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
173            inner: Condition::new(Inner {
174                ring_buffer: Arc::clone(&ring_buffer),
175                containers: Containers::default(),
176                thread_msg_queue: VecDeque::default(),
177                last_scanned: 0,
178                tail: 0,
179                iob_peers: Slab::default(),
180            }),
181            sockets: Mutex::new(Slab::default()),
182            on_inactive,
183            port: zx::Port::create(),
184            event: zx::Event::create(),
185            socket_thread: Mutex::default(),
186            _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
187                Weak::clone(weak),
188                ring_buffer,
189                options.sleep_time,
190            )),
191        });
192
193        *this.socket_thread.lock() = Some({
194            let this = Arc::clone(&this);
195            std::thread::spawn(move || this.socket_thread(options.sleep_time))
196        });
197        this
198    }
199
200    pub fn new_container_buffer(
201        self: &Arc<Self>,
202        identity: Arc<ComponentIdentity>,
203        stats: Arc<LogStreamStats>,
204    ) -> ContainerBuffer {
205        let mut inner = self.inner.lock();
206        let Inner { containers, ring_buffer, .. } = &mut *inner;
207        ContainerBuffer {
208            shared_buffer: Arc::clone(self),
209            container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
210        }
211    }
212
213    pub async fn flush(&self) {
214        let (sender, receiver) = oneshot::channel();
215        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
216        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
217        // Ignore failure if Archivist is shutting down.
218        let _ = receiver.await;
219    }
220
221    /// Returns the number of registered containers in use by the buffer.
222    #[cfg(test)]
223    pub fn container_count(&self) -> usize {
224        self.inner.lock().containers.len()
225    }
226
227    /// Terminates the socket thread. The socket thread will drain the sockets before terminating.
228    pub async fn terminate(&self) {
229        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Terminate);
230        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
231        let join_handle = self.socket_thread.lock().take().unwrap();
232        fasync::unblock(|| {
233            let _ = join_handle.join();
234        })
235        .await;
236    }
237
238    fn socket_thread(&self, sleep_time: Duration) {
239        const INTERRUPT_KEY: u64 = u64::MAX;
240        let mut sockets_ready = Vec::new();
241        let mut iob_peer_closed = Vec::new();
242        let mut interrupt_needs_arming = true;
243        let mut msg = None;
244
245        loop {
246            let mut deadline = if msg.is_some() {
247                zx::MonotonicInstant::INFINITE_PAST
248            } else {
249                if interrupt_needs_arming {
250                    self.event
251                        .wait_async_handle(
252                            &self.port,
253                            INTERRUPT_KEY,
254                            zx::Signals::USER_0,
255                            zx::WaitAsyncOpts::empty(),
256                        )
257                        .unwrap();
258                    interrupt_needs_arming = false;
259                }
260
261                // Wait so that we're not constantly waking up for every log message that is queued.
262                // Ignore errors here.
263                let _ = self
264                    .event
265                    .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
266                zx::MonotonicInstant::INFINITE
267            };
268
269            // Gather the list of sockets that are ready to read.
270            loop {
271                match self.port.wait(deadline) {
272                    Ok(packet) => {
273                        if packet.key() == INTERRUPT_KEY {
274                            interrupt_needs_arming = true;
275                            // To maintain proper ordering, we must capture the message here whilst
276                            // we are still gathering the list of sockets that are ready to read.
277                            // If we wait till later, we introduce windows where we might miss a
278                            // socket that should be read.
279                            if msg.is_none() {
280                                msg = self.inner.lock().thread_msg_queue.pop_front();
281                            }
282                        } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
283                            iob_peer_closed.push(packet.key() as u32);
284                        } else {
285                            sockets_ready.push(SocketId(packet.key() as u32))
286                        }
287                    }
288                    Err(zx::Status::TIMED_OUT) => break,
289                    Err(status) => panic!("port wait error: {status:?}"),
290                }
291                deadline = zx::MonotonicInstant::INFINITE_PAST;
292            }
293
294            let mut inner = InnerGuard::new(self);
295
296            if !iob_peer_closed.is_empty() {
297                // See the comment on `is_active()` for why this is required.
298                inner.update_message_ids(inner.ring_buffer.head());
299
300                for iob_peer_closed in iob_peer_closed.drain(..) {
301                    let container_id = inner.iob_peers.free(iob_peer_closed).0;
302                    if let Some(container) = inner.containers.get_mut(container_id) {
303                        container.iob_count -= 1;
304                        if container.iob_count == 0 && !container.is_active() {
305                            if container.should_free() {
306                                inner.containers.free(container_id);
307                            } else {
308                                let identity = Arc::clone(&container.identity);
309                                inner.on_inactive.push(identity);
310                            }
311                        }
312                    }
313                }
314            }
315
316            {
317                let mut sockets = self.sockets.lock();
318                for socket_id in sockets_ready.drain(..) {
319                    inner.read_socket(&mut sockets, socket_id, |socket| {
320                        socket
321                            .socket
322                            .wait_async_handle(
323                                &self.port,
324                                socket_id.0 as u64,
325                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
326                                zx::WaitAsyncOpts::empty(),
327                            )
328                            .unwrap();
329                    });
330                }
331            }
332
333            // Now that we've processed the sockets, we can process the message.
334            if let Some(m) = msg.take() {
335                match m {
336                    ThreadMessage::Terminate => return,
337                    ThreadMessage::Flush(sender) => {
338                        let _ = sender.send(());
339                    }
340                }
341
342                // See if there's another message.
343                msg = inner.thread_msg_queue.pop_front();
344                if msg.is_none() {
345                    // If there are no more messages, we must clear the signal so that we get
346                    // notified when the next message arrives. This must be done whilst we are
347                    // holding the lock.
348                    self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
349                }
350            }
351        }
352    }
353
354    async fn buffer_monitor_task(
355        this: Weak<Self>,
356        mut ring_buffer: ring_buffer::Reader,
357        sleep_time: Duration,
358    ) {
359        let mut last_head = 0;
360        loop {
361            // Sleep to limit how often we wake up.
362            fasync::Timer::new(sleep_time).await;
363            let head = ring_buffer.wait(last_head).await;
364            let Some(this) = this.upgrade() else { return };
365            let mut inner = InnerGuard::new(&this);
366            inner.check_space(head);
367            last_head = head;
368        }
369    }
370}
371
372impl Inner {
373    // Returns list of containers that no longer have any messages.
374    fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
375        if msg.len() < std::mem::size_of::<Header>() {
376            debug!("message too short ({})", msg.len());
377            if let Some(container) = self.containers.get(container_id) {
378                container.stats.increment_invalid(msg.len());
379            }
380            return;
381        }
382
383        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
384
385        // NOTE: Some tests send messages that are bigger than the header indicates. We ignore the
386        // tail of any such messages.
387        let msg_len = header.size_words() as usize * 8;
388
389        // Check the correct type and size.
390        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
391            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
392            if let Some(container) = self.containers.get(container_id) {
393                container.stats.increment_invalid(msg.len());
394            }
395            return;
396        }
397
398        let Some(container) = self.containers.get_mut(container_id) else {
399            return;
400        };
401
402        let mut data;
403        let msg = if container.dropped_count > 0 {
404            data = msg.to_vec();
405            if !add_dropped_count(&mut data, container.dropped_count) {
406                debug!("unable to add dropped count to invalid message");
407                container.stats.increment_invalid(data.len());
408                return;
409            }
410            &data
411        } else {
412            msg
413        };
414
415        if container.iob.write(Default::default(), 0, msg).is_err() {
416            // We were unable to write the message to the buffer, most likely due to lack of
417            // space. We drop this message and then we'll add to the dropped count for the next
418            // message.
419            container.dropped_count += 1
420        } else {
421            container.dropped_count = 0;
422        }
423    }
424
425    /// Parses the first message within `range` returns (container_id, message, timestamp).
426    ///
427    /// # Panics
428    ///
429    /// This will panic if the ring buffer has been corrupted. Only the kernel and Archivist can
430    /// write to the ring buffer and so we trust both not to corrupt the ring buffer.
431    ///
432    /// # Safety
433    ///
434    /// `range` *must* be within the written range of the ring buffer so that there is no concurrent
435    /// write access to that range. The returned slice is only valid whilst this remains true.
436    unsafe fn parse_message(
437        &self,
438        range: Range<u64>,
439    ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
440        let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
441            .expect("Unable to read message from ring buffer");
442        (
443            ContainerId(tag as u32),
444            msg,
445            (msg.len() >= 16)
446                .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
447        )
448    }
449}
450
451impl<'a> InnerGuard<'a> {
452    fn new(buffer: &'a SharedBuffer) -> Self {
453        Self {
454            buffer,
455            guard: ManuallyDrop::new(buffer.inner.lock()),
456            on_inactive: Vec::new(),
457            wake: false,
458        }
459    }
460
461    /// Pops a message and returns its total size. The caller should call `update_message_ids(head)`
462    /// prior to calling this.
463    ///
464    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
465    /// necessarily the message with the *oldest* timestamp because we might not have received the
466    /// messages in perfect timestamp order. This should be close enough for all use cases we care
467    /// about, and besides, the timestamps can't be trusted anyway.
468    fn pop(&mut self, head: u64) -> Option<usize> {
469        if head == self.tail {
470            return None;
471        }
472
473        // SAFETY: There can be no concurrent writes between `tail..head` and the *only* place
474        // we increment the tail index is just before we leave this function.
475        let record_len = {
476            let (container_id, message, timestamp) = unsafe { self.parse_message(self.tail..head) };
477            let record_len = ring_buffer_record_len(message.len());
478
479            let container = self.containers.get_mut(container_id).unwrap();
480
481            container.stats.increment_rolled_out(record_len);
482            container.msg_ids.start += 1;
483            if let Some(timestamp) = timestamp {
484                container.last_rolled_out_timestamp = timestamp;
485            }
486            if !container.is_active() {
487                if container.should_free() {
488                    self.containers.free(container_id);
489                } else {
490                    let identity = Arc::clone(&container.identity);
491                    self.on_inactive.push(identity);
492                }
493            }
494
495            record_len
496        };
497
498        // NOTE: This should go last. After incrementing `tail`, the `message` can be overwritten.
499        self.ring_buffer.increment_tail(record_len);
500        self.tail += record_len as u64;
501
502        // The caller should have called `update_message_ids(head)` prior to calling this.
503        assert!(self.last_scanned >= self.tail);
504
505        Some(record_len)
506    }
507
508    /// Reads a socket. Calls `rearm` to rearm the socket once it has been drained.
509    fn read_socket(
510        &mut self,
511        sockets: &mut Slab<Socket>,
512        socket_id: SocketId,
513        rearm: impl FnOnce(&mut Socket),
514    ) {
515        let Some(socket) = sockets.get_mut(socket_id.0) else { return };
516        let container_id = socket.container_id;
517
518        loop {
519            self.check_space(self.ring_buffer.head());
520
521            let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
522
523            // Read directly into the buffer leaving space for the header.
524            let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
525                Ok(d) => d.len(),
526                Err(zx::Status::SHOULD_WAIT) => {
527                    // The socket has been drained.
528                    rearm(socket);
529                    return;
530                }
531                Err(_) => break,
532            };
533
534            // SAFETY: `read_uninit` will have written to `len` bytes.
535            unsafe {
536                data.set_len(len);
537            }
538
539            let container = self.containers.get_mut(container_id).unwrap();
540            if data.len() < 16 {
541                container.stats.increment_invalid(data.len());
542                continue;
543            }
544
545            let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
546            let msg_len = header.size_words() as usize * 8;
547            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
548                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
549                container.stats.increment_invalid(data.len());
550                continue;
551            }
552
553            if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
554            {
555                debug!("unable to add dropped count to invalid message");
556                container.stats.increment_invalid(data.len());
557                continue;
558            }
559
560            if container.iob.write(Default::default(), 0, &data).is_err() {
561                // We were unable to write the message to the buffer, most likely due to lack of
562                // space. We drop this message and then we'll add to the dropped count for the next
563                // message.
564                container.dropped_count += 1
565            } else {
566                container.dropped_count = 0;
567            }
568        }
569
570        // This path is taken when the socket should be closed.
571
572        // See the comment on `is_active()` for why this is required.
573        self.update_message_ids(self.ring_buffer.head());
574
575        let container = self.containers.get_mut(container_id).unwrap();
576        container.remove_socket(socket_id, sockets);
577        if !container.is_active() {
578            if container.should_free() {
579                self.containers.free(container_id);
580            } else {
581                let identity = Arc::clone(&container.identity);
582                self.on_inactive.push(identity);
583            }
584        }
585    }
586
587    /// Scans the ring buffer and updates `msg_ids` for the containers.
588    fn update_message_ids(&mut self, head: u64) {
589        while self.last_scanned < head {
590            // SAFETY: This is safe because `head` must be within the ring buffer range and we make
591            // sure that `self.last_scanned` is always >= `tail` in `pop()`.
592            let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
593            let msg_len = message.len();
594            let severity = (msg_len >= 8)
595                .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
596            let container = self.containers.get_mut(container_id).unwrap();
597            container.msg_ids.end += 1;
598            if let Some(severity) = severity {
599                container.stats.ingest_message(msg_len, severity);
600            }
601            self.last_scanned += ring_buffer_record_len(msg_len) as u64;
602            self.wake = true;
603        }
604    }
605
606    /// Ensures the buffer keeps the required amount of space.
607    fn check_space(&mut self, head: u64) {
608        self.update_message_ids(head);
609        let capacity = self.ring_buffer.capacity();
610        let mut space = capacity
611            .checked_sub((head - self.tail) as usize)
612            .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
613        let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
614        while space < required_space {
615            let Some(amount) = self.pop(head) else { break };
616            space += amount;
617        }
618    }
619}
620
621#[derive(Default)]
622struct Containers {
623    slab: Slab<ContainerInfo>,
624}
625
626#[derive(Clone, Copy, Debug)]
627struct ContainerId(u32);
628
629impl Containers {
630    #[cfg(test)]
631    fn len(&self) -> usize {
632        self.slab.len()
633    }
634
635    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
636        self.slab.get(id.0)
637    }
638
639    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
640        self.slab.get_mut(id.0)
641    }
642
643    fn new_container(
644        &mut self,
645        buffer: &RingBuffer,
646        identity: Arc<ComponentIdentity>,
647        stats: Arc<LogStreamStats>,
648    ) -> ContainerId {
649        ContainerId(self.slab.insert(|id| {
650            let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
651            ContainerInfo::new(identity, stats, iob)
652        }))
653    }
654
655    fn free(&mut self, id: ContainerId) {
656        self.slab.free(id.0);
657    }
658}
659
660#[derive(Derivative)]
661#[derivative(Debug)]
662struct ContainerInfo {
663    // The number of references (typically cursors) that prevent this object from being freed.
664    refs: usize,
665
666    // The identity of the container.
667    identity: Arc<ComponentIdentity>,
668
669    // The first index in the shared buffer for a message for this container. This index
670    // might have been rolled out. This is used as an optimisation to set the starting
671    // cursor position.
672    first_index: u64,
673
674    // The first and last message IDs stored in the shared buffer. The last message ID can be out of
675    // date if there are concurrent writers.
676    msg_ids: Range<u64>,
677
678    // Whether the container is terminated.
679    terminated: bool,
680
681    // Inspect instrumentation.
682    #[derivative(Debug = "ignore")]
683    stats: Arc<LogStreamStats>,
684
685    // The timestamp of the message most recently rolled out.
686    last_rolled_out_timestamp: zx::BootInstant,
687
688    // The first socket ID for this container.
689    first_socket_id: SocketId,
690
691    // The IOBuffer used for writing.
692    iob: zx::Iob,
693
694    // The number of client IOBuffers.
695    iob_count: usize,
696
697    // The number of messages dropped when forwarding from a socket to an IOBuffer.
698    dropped_count: u64,
699}
700
701impl ContainerInfo {
702    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
703        Self {
704            refs: 0,
705            identity,
706            first_index: 0,
707            msg_ids: 0..0,
708            terminated: false,
709            stats,
710            last_rolled_out_timestamp: zx::BootInstant::ZERO,
711            first_socket_id: SocketId::NULL,
712            iob,
713            iob_count: 0,
714            dropped_count: 0,
715        }
716    }
717
718    fn should_free(&self) -> bool {
719        self.terminated && self.refs == 0 && !self.is_active()
720    }
721
722    // Returns true if the container is considered active which is the case if it has sockets, io
723    // buffers, or buffered messages.
724    //
725    // NOTE: Whenever a socket or iob is closed, `update_message_ids` must called to ensure
726    // `msg_ids.end` is correctly set.
727    fn is_active(&self) -> bool {
728        self.first_socket_id != SocketId::NULL
729            || self.iob_count > 0
730            || self.msg_ids.end != self.msg_ids.start
731            || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
732    }
733
734    // # Panics
735    //
736    // This will panic if the socket isn't found.
737    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
738        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
739        if prev == SocketId::NULL {
740            self.first_socket_id = next;
741        } else {
742            sockets.get_mut(prev.0).unwrap().next = next;
743        }
744        if next != SocketId::NULL {
745            sockets
746                .get_mut(next.0)
747                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
748                .prev = prev;
749        }
750        sockets.free(socket_id.0);
751        self.stats.close_socket();
752        debug!(identity:% = self.identity; "Socket closed.");
753    }
754}
755
756pub struct ContainerBuffer {
757    shared_buffer: Arc<SharedBuffer>,
758    container_id: ContainerId,
759}
760
761impl ContainerBuffer {
762    /// Returns the tag used by IOBuffers used for this component.
763    pub fn iob_tag(&self) -> u64 {
764        self.container_id.0 as u64
765    }
766
767    /// Ingests a new message.
768    ///
769    /// If the message is invalid, it is dropped.
770    pub fn push_back(&self, msg: &[u8]) {
771        self.shared_buffer.inner.lock().ingest(msg, self.container_id);
772    }
773
774    /// Returns an IOBuffer for the container.
775    pub fn iob(&self) -> zx::Iob {
776        let mut inner = self.shared_buffer.inner.lock();
777
778        inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
779
780        let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
781
782        inner.iob_peers.insert(|idx| {
783            ep1.wait_async_handle(
784                &self.shared_buffer.port,
785                idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
786                zx::Signals::IOB_PEER_CLOSED,
787                zx::WaitAsyncOpts::empty(),
788            )
789            .unwrap();
790
791            (self.container_id, ep1)
792        });
793
794        ep0
795    }
796
797    /// Returns a cursor.
798    pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
799        // NOTE: It is not safe to use on_inactive in this function because this function can be
800        // called whilst locks are held which are the same locks that the on_inactive notification
801        // uses.
802        let mut inner = InnerGuard::new(&self.shared_buffer);
803        let Some(mut container) = inner.containers.get_mut(self.container_id) else {
804            // We've hit a race where the container has terminated.
805            return None;
806        };
807
808        container.refs += 1;
809        let stats = Arc::clone(&container.stats);
810
811        let (index, next_id, end) = match mode {
812            StreamMode::Snapshot => {
813                (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
814            }
815            StreamMode::Subscribe => {
816                // Call `update_message_ids` so that `msg_ids.end` is up to date.
817                let head = inner.ring_buffer.head();
818                inner.update_message_ids(head);
819                container = inner.containers.get_mut(self.container_id).unwrap();
820                (head, container.msg_ids.end, CursorEnd::Stream)
821            }
822            StreamMode::SnapshotThenSubscribe => {
823                (container.first_index, container.msg_ids.start, CursorEnd::Stream)
824            }
825        };
826
827        Some(Cursor {
828            index,
829            container_id: self.container_id,
830            next_id,
831            end,
832            buffer: Arc::clone(&self.shared_buffer),
833            waker_entry: self.shared_buffer.inner.waker_entry(),
834            stats,
835        })
836    }
837
838    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
839    /// The component's data will remain in the buffer until the messages are rolled out. This will
840    /// *not* drain sockets or close IOBuffers.
841    pub fn terminate(&self) {
842        let mut inner = InnerGuard::new(&self.shared_buffer);
843
844        // See the comment on `is_active()` for why this is required.
845        inner.update_message_ids(inner.ring_buffer.head());
846
847        if let Some(container) = inner.containers.get_mut(self.container_id) {
848            container.terminated = true;
849            if container.first_socket_id != SocketId::NULL {
850                let mut sockets = self.shared_buffer.sockets.lock();
851                loop {
852                    container.remove_socket(container.first_socket_id, &mut sockets);
853                    if container.first_socket_id == SocketId::NULL {
854                        break;
855                    }
856                }
857            }
858            if container.should_free() {
859                inner.containers.free(self.container_id);
860            }
861            inner.wake = true;
862        }
863    }
864
865    /// Returns true if the container has messages, sockets or IOBuffers.
866    pub fn is_active(&self) -> bool {
867        self.shared_buffer
868            .inner
869            .lock()
870            .containers
871            .get(self.container_id)
872            .is_some_and(|c| c.is_active())
873    }
874
875    /// Adds a socket for this container.
876    pub fn add_socket(&self, socket: zx::Socket) {
877        let mut inner = self.shared_buffer.inner.lock();
878        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
879        container.stats.open_socket();
880        let next = container.first_socket_id;
881        let mut sockets = self.shared_buffer.sockets.lock();
882        let socket_id = SocketId(sockets.insert(|socket_id| {
883            socket
884                .wait_async_handle(
885                    &self.shared_buffer.port,
886                    socket_id as u64,
887                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
888                    zx::WaitAsyncOpts::empty(),
889                )
890                .unwrap();
891            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
892        }));
893        if next != SocketId::NULL {
894            sockets.get_mut(next.0).unwrap().prev = socket_id;
895        }
896        container.first_socket_id = socket_id;
897    }
898}
899
900impl Drop for ContainerBuffer {
901    fn drop(&mut self) {
902        self.terminate();
903    }
904}
905
906#[pin_project(PinnedDrop)]
907#[derive(Derivative)]
908#[derivative(Debug)]
909pub struct Cursor {
910    // Index in the buffer that we should continue searching from.
911    index: u64,
912
913    container_id: ContainerId,
914
915    // The next expected message ID.
916    next_id: u64,
917
918    // The ID, if any, that we should end at (exclusive).
919    end: CursorEnd,
920
921    #[derivative(Debug = "ignore")]
922    buffer: Arc<SharedBuffer>,
923
924    // waker_entry is used to register a waker for subscribing cursors.
925    #[pin]
926    #[derivative(Debug = "ignore")]
927    waker_entry: WakerEntry<Inner>,
928
929    #[derivative(Debug = "ignore")]
930    stats: Arc<LogStreamStats>,
931}
932
933/// The next element in the stream or a marker of the number of items rolled out since last polled.
934#[derive(Debug, PartialEq)]
935pub enum LazyItem<T> {
936    /// The next item in the stream.
937    Next(Arc<T>),
938    /// A count of the items dropped between the last call to poll_next and this one.
939    ItemsRolledOut(u64, zx::BootInstant),
940}
941
942impl Stream for Cursor {
943    type Item = LazyItem<StoredMessage>;
944
945    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
946        let mut this = self.project();
947        let mut inner = InnerGuard::new(this.buffer);
948
949        let mut head = inner.ring_buffer.head();
950        inner.check_space(head);
951
952        let mut container = match inner.containers.get(*this.container_id) {
953            None => return Poll::Ready(None),
954            Some(container) => container,
955        };
956
957        let end_id = match &mut this.end {
958            CursorEnd::Snapshot(None) => {
959                let mut sockets = this.buffer.sockets.lock();
960                // Drain the sockets associated with this container first so that we capture
961                // pending messages. Some tests rely on this.
962                let mut socket_id = container.first_socket_id;
963                while socket_id != SocketId::NULL {
964                    let socket = sockets.get_mut(socket_id.0).unwrap();
965                    let next = socket.next;
966                    // The socket doesn't need to be rearmed here.
967                    inner.read_socket(&mut sockets, socket_id, |_| {});
968                    socket_id = next;
969                }
970
971                // Call `update_message_ids` so that `msg_ids.end` is up to date.
972                head = inner.ring_buffer.head();
973                inner.update_message_ids(head);
974                container = inner.containers.get(*this.container_id).unwrap();
975                *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
976                container.msg_ids.end
977            }
978            CursorEnd::Snapshot(Some(end)) => *end,
979            CursorEnd::Stream => u64::MAX,
980        };
981
982        if *this.next_id == end_id {
983            return Poll::Ready(None);
984        }
985
986        // See if messages have been rolled out.
987        if container.msg_ids.start > *this.next_id {
988            let mut next_id = container.msg_ids.start;
989            if end_id < next_id {
990                next_id = end_id;
991            }
992            let rolled_out = next_id - *this.next_id;
993            *this.next_id = next_id;
994            return Poll::Ready(Some(LazyItem::ItemsRolledOut(
995                rolled_out,
996                container.last_rolled_out_timestamp,
997            )));
998        }
999
1000        if inner.tail > *this.index {
1001            *this.index = inner.tail;
1002        }
1003
1004        // Some optimisations to reduce the amount of searching we might have to do.
1005        if container.first_index > *this.index {
1006            *this.index = container.first_index;
1007        }
1008
1009        if *this.next_id == container.msg_ids.end && *this.index < inner.last_scanned {
1010            *this.index = inner.last_scanned;
1011        }
1012
1013        // Scan forward until we find a record matching this container.
1014        while *this.index < head {
1015            // SAFETY: `*this.index..head` must be within the written region of the ring-buffer.
1016            let (container_id, message, _timestamp) =
1017                unsafe { inner.parse_message(*this.index..head) };
1018
1019            // Move index to the next record.
1020            *this.index += ring_buffer_record_len(message.len()) as u64;
1021            assert!(*this.index <= head);
1022
1023            if container_id.0 == this.container_id.0 {
1024                *this.next_id += 1;
1025                if let Some(msg) = StoredMessage::new(message.into(), this.stats) {
1026                    return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
1027                } else {
1028                    // The message is corrupt. Just skip it.
1029                }
1030            }
1031        }
1032
1033        if container.terminated {
1034            Poll::Ready(None)
1035        } else {
1036            inner.guard.add_waker(this.waker_entry, cx.waker().clone());
1037            Poll::Pending
1038        }
1039    }
1040}
1041
1042#[pinned_drop]
1043impl PinnedDrop for Cursor {
1044    fn drop(self: Pin<&mut Self>) {
1045        let mut inner = self.buffer.inner.lock();
1046        if let Some(container) = inner.containers.get_mut(self.container_id) {
1047            container.refs -= 1;
1048            if container.should_free() {
1049                inner.containers.free(self.container_id);
1050            }
1051        }
1052    }
1053}
1054
1055#[derive(Debug)]
1056enum CursorEnd {
1057    // The id will be None when the cursor is first created and is set after the cursor
1058    // is first polled.
1059    Snapshot(Option<u64>),
1060    Stream,
1061}
1062
1063/// Implements a simple Slab allocator.
1064struct Slab<T> {
1065    slab: Vec<Slot<T>>,
1066    free_index: usize,
1067}
1068
1069impl<T> Default for Slab<T> {
1070    fn default() -> Self {
1071        Self { slab: Vec::new(), free_index: usize::MAX }
1072    }
1073}
1074
1075enum Slot<T> {
1076    Used(T),
1077    Free(usize),
1078}
1079
1080impl<T> Slab<T> {
1081    /// Returns the number of used entries. This is not performant.
1082    #[cfg(test)]
1083    fn len(&self) -> usize {
1084        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
1085    }
1086
1087    fn free(&mut self, index: u32) -> T {
1088        let index = index as usize;
1089        let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
1090            Slot::Free(_) => panic!("Slot already free"),
1091            Slot::Used(value) => value,
1092        };
1093        self.free_index = index;
1094        value
1095    }
1096
1097    fn get(&self, id: u32) -> Option<&T> {
1098        self.slab.get(id as usize).and_then(|s| match s {
1099            Slot::Used(s) => Some(s),
1100            _ => None,
1101        })
1102    }
1103
1104    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
1105        self.slab.get_mut(id as usize).and_then(|s| match s {
1106            Slot::Used(s) => Some(s),
1107            _ => None,
1108        })
1109    }
1110
1111    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
1112        let free_index = self.free_index;
1113        if free_index != usize::MAX {
1114            self.free_index = match std::mem::replace(
1115                &mut self.slab[free_index],
1116                Slot::Used(value(free_index as u32)),
1117            ) {
1118                Slot::Free(next) => next,
1119                _ => unreachable!(),
1120            };
1121            free_index as u32
1122        } else {
1123            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
1124            // (see SocketId::NULL below).
1125            assert!(self.slab.len() < u32::MAX as usize);
1126            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1127            (self.slab.len() - 1) as u32
1128        }
1129    }
1130}
1131
1132#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1133struct SocketId(u32);
1134
1135impl SocketId {
1136    const NULL: Self = SocketId(0xffff_ffff);
1137}
1138
1139struct Socket {
1140    socket: zx::Socket,
1141    container_id: ContainerId,
1142    // Sockets are stored as a linked list for each container.
1143    prev: SocketId,
1144    next: SocketId,
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149    use super::{SharedBuffer, SharedBufferOptions, create_ring_buffer};
1150    use crate::logs::shared_buffer::LazyItem;
1151    use crate::logs::stats::LogStreamStats;
1152    use crate::logs::testing::make_message;
1153    use assert_matches::assert_matches;
1154    use diagnostics_assertions::{AnyProperty, assert_data_tree};
1155    use fidl_fuchsia_diagnostics::StreamMode;
1156    use fuchsia_async as fasync;
1157    use fuchsia_async::TimeoutExt;
1158    use fuchsia_inspect::{Inspector, InspectorConfig};
1159    use fuchsia_inspect_derive::WithInspect;
1160    use futures::channel::mpsc;
1161    use futures::future::OptionFuture;
1162    use futures::stream::{FuturesUnordered, StreamExt as _};
1163    use futures::{FutureExt, poll};
1164    use ring_buffer::MAX_MESSAGE_SIZE;
1165    use std::future::poll_fn;
1166    use std::pin::pin;
1167    use std::sync::Arc;
1168    use std::sync::atomic::{AtomicU64, Ordering};
1169    use std::task::Poll;
1170    use std::time::Duration;
1171
1172    async fn yield_to_executor() {
1173        let mut first_time = true;
1174        poll_fn(|cx| {
1175            if first_time {
1176                cx.waker().wake_by_ref();
1177                first_time = false;
1178                Poll::Pending
1179            } else {
1180                Poll::Ready(())
1181            }
1182        })
1183        .await;
1184    }
1185
1186    #[fuchsia::test]
1187    async fn push_one_message() {
1188        let buffer = SharedBuffer::new(
1189            create_ring_buffer(MAX_MESSAGE_SIZE),
1190            Box::new(|_| {}),
1191            Default::default(),
1192        );
1193        let container_buffer =
1194            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1195        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1196        container_buffer.push_back(msg.bytes());
1197
1198        // Make sure the cursor can find it.
1199        let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1200        assert_eq!(
1201            cursor
1202                .map(|item| {
1203                    match item {
1204                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1205                        _ => panic!("Unexpected item {item:?}"),
1206                    }
1207                })
1208                .count()
1209                .await,
1210            1
1211        );
1212    }
1213
1214    #[fuchsia::test]
1215    async fn message_too_short() {
1216        let buffer = SharedBuffer::new(
1217            create_ring_buffer(MAX_MESSAGE_SIZE),
1218            Box::new(|_| {}),
1219            Default::default(),
1220        );
1221
1222        let container_buffer =
1223            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1224        container_buffer.push_back(&[0]);
1225
1226        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1227    }
1228
1229    #[fuchsia::test]
1230    async fn bad_type() {
1231        let buffer = SharedBuffer::new(
1232            create_ring_buffer(MAX_MESSAGE_SIZE),
1233            Box::new(|_| {}),
1234            Default::default(),
1235        );
1236        let container_buffer =
1237            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1238        container_buffer.push_back(&[0x77; 16]);
1239
1240        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1241    }
1242
1243    #[fuchsia::test]
1244    async fn message_truncated() {
1245        let buffer = SharedBuffer::new(
1246            create_ring_buffer(MAX_MESSAGE_SIZE),
1247            Box::new(|_| {}),
1248            Default::default(),
1249        );
1250        let container_buffer =
1251            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1252        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1253        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1254
1255        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1256    }
1257
1258    #[fuchsia::test]
1259    async fn buffer_wrapping() {
1260        let buffer = SharedBuffer::new(
1261            create_ring_buffer(MAX_MESSAGE_SIZE),
1262            Box::new(|_| {}),
1263            SharedBufferOptions { sleep_time: Duration::ZERO },
1264        );
1265        let container_buffer =
1266            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1267
1268        // Keep writing messages until we wrap.
1269        let mut i = 0;
1270        loop {
1271            let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1272            container_buffer.push_back(msg.bytes());
1273            i += 1;
1274
1275            // Yield to the executor to allow messages to be rolled out.
1276            yield_to_executor().await;
1277
1278            let inner = buffer.inner.lock();
1279            if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1280                break;
1281            }
1282        }
1283
1284        // Read back all the messages.
1285        let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1286
1287        let mut j;
1288        let mut item = cursor.next().await;
1289        // Calling `cursor.next()` can cause messages to be rolled out, so skip those if that has
1290        // happened.
1291        if let Some(LazyItem::ItemsRolledOut(_, _)) = item {
1292            item = cursor.next().await;
1293        }
1294        assert_matches!(
1295            item,
1296            Some(LazyItem::Next(item)) => {
1297                j = item.timestamp().into_nanos();
1298                let msg = make_message(&format!("{j}"),
1299                                       None,
1300                                       item.timestamp());
1301                assert_eq!(&*item, &msg);
1302            }
1303        );
1304
1305        j += 1;
1306        while j != i {
1307            assert_matches!(
1308                cursor.next().await,
1309                Some(LazyItem::Next(item)) => {
1310                    assert_eq!(&*item, &make_message(&format!("{j}"),
1311                                                     None,
1312                                                     item.timestamp()));
1313                }
1314            );
1315            j += 1;
1316        }
1317
1318        assert_eq!(cursor.next().await, None);
1319    }
1320
1321    #[fuchsia::test]
1322    async fn on_inactive() {
1323        let identity = Arc::new(vec!["a"].into());
1324        let on_inactive = Arc::new(AtomicU64::new(0));
1325        let buffer = {
1326            let on_inactive = Arc::clone(&on_inactive);
1327            let identity = Arc::clone(&identity);
1328            Arc::new(SharedBuffer::new(
1329                create_ring_buffer(MAX_MESSAGE_SIZE),
1330                Box::new(move |i| {
1331                    assert_eq!(i, identity);
1332                    on_inactive.fetch_add(1, Ordering::Relaxed);
1333                }),
1334                SharedBufferOptions { sleep_time: Duration::ZERO },
1335            ))
1336        };
1337        let container_a = buffer.new_container_buffer(identity, Arc::default());
1338        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1339
1340        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1341        container_a.push_back(msg.bytes());
1342
1343        // Repeatedly write messages to b until a is rolled out.
1344        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1345            container_b.push_back(msg.bytes());
1346
1347            // Yield to the executor to allow messages to be rolled out.
1348            yield_to_executor().await;
1349        }
1350
1351        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1352    }
1353
1354    #[fuchsia::test]
1355    async fn terminate_drops_container() {
1356        // Silence a clippy warning; SharedBuffer needs an executor.
1357        async {}.await;
1358
1359        let buffer = SharedBuffer::new(
1360            create_ring_buffer(MAX_MESSAGE_SIZE),
1361            Box::new(|_| {}),
1362            SharedBufferOptions { sleep_time: Duration::ZERO },
1363        );
1364
1365        // terminate when buffer has no logs.
1366        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1367        assert_eq!(buffer.container_count(), 1);
1368        container_a.terminate();
1369
1370        assert_eq!(buffer.container_count(), 0);
1371
1372        // terminate when buffer has logs.
1373        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1374        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1375        container_a.push_back(msg.bytes());
1376        assert_eq!(buffer.container_count(), 1);
1377        container_a.terminate();
1378
1379        // The container should still be there because it has logs.
1380        assert_eq!(buffer.container_count(), 1);
1381
1382        // Roll out the logs.
1383        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1384        assert_eq!(buffer.container_count(), 2);
1385
1386        // Repeatedly write messages to b until a's message is dropped and then the container will
1387        // be dropped.
1388        while buffer.container_count() != 1 {
1389            container_b.push_back(msg.bytes());
1390
1391            // Yield to the executor to allow messages to be rolled out.
1392            yield_to_executor().await;
1393        }
1394
1395        assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1396    }
1397
1398    #[fuchsia::test]
1399    async fn cursor_subscribe() {
1400        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1401            let buffer = SharedBuffer::new(
1402                create_ring_buffer(MAX_MESSAGE_SIZE),
1403                Box::new(|_| {}),
1404                Default::default(),
1405            );
1406            let container =
1407                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1408            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1409            container.push_back(msg.bytes());
1410
1411            let (sender, mut receiver) = mpsc::unbounded();
1412
1413            // Run the cursor in a separate task so that we can test it gets woken correctly.
1414            {
1415                let container = Arc::clone(&container);
1416                fasync::Task::spawn(async move {
1417                    let mut cursor = pin!(container.cursor(mode).unwrap());
1418                    while let Some(item) = cursor.next().await {
1419                        sender.unbounded_send(item).unwrap();
1420                    }
1421                })
1422                .detach();
1423            }
1424
1425            // The existing message should only be returned with SnapshotThenSubscribe
1426            if mode == StreamMode::SnapshotThenSubscribe {
1427                assert_matches!(
1428                    receiver.next().await,
1429                    Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1430                );
1431            }
1432
1433            // No message should arrive. We can only use a timeout here.
1434            assert_eq!(
1435                OptionFuture::from(Some(receiver.next()))
1436                    .on_timeout(Duration::from_millis(500), || None)
1437                    .await,
1438                None
1439            );
1440
1441            container.push_back(msg.bytes());
1442
1443            // The message should arrive now.
1444            assert_matches!(
1445                receiver.next().await,
1446                Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1447            );
1448
1449            container.terminate();
1450
1451            // The cursor should terminate now.
1452            assert!(receiver.next().await.is_none());
1453        }
1454    }
1455
1456    #[fuchsia::test]
1457    async fn cursor_rolled_out() {
1458        // On the first pass we roll out before the cursor has started. On the second pass, we roll
1459        // out after the cursor has started.
1460        for pass in 0..2 {
1461            let buffer = SharedBuffer::new(
1462                create_ring_buffer(MAX_MESSAGE_SIZE),
1463                Box::new(|_| {}),
1464                SharedBufferOptions { sleep_time: Duration::ZERO },
1465            );
1466            let container_a =
1467                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1468            let container_b =
1469                Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1470            let container_c =
1471                Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1472            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1473
1474            container_a.push_back(msg.bytes());
1475            container_a.push_back(msg.bytes());
1476            container_b.push_back(msg.bytes());
1477
1478            const A_MESSAGE_COUNT: usize = 50;
1479            for _ in 0..A_MESSAGE_COUNT - 2 {
1480                container_a.push_back(msg.bytes());
1481            }
1482
1483            let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1484
1485            let mut expected = A_MESSAGE_COUNT;
1486
1487            // Get the first stored message on the first pass.
1488            if pass == 0 {
1489                assert!(cursor.next().await.is_some());
1490                expected -= 1;
1491            }
1492
1493            // Roll out messages until container_b is rolled out.
1494            while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1495                container_c.push_back(msg.bytes());
1496
1497                // Yield to the executor to allow messages to be rolled out.
1498                yield_to_executor().await;
1499            }
1500
1501            // We should have rolled out some messages in container a.
1502            assert_matches!(
1503                cursor.next().await,
1504                Some(LazyItem::ItemsRolledOut(rolled_out, t))
1505                    if t == zx::BootInstant::from_nanos(1) && rolled_out > 0
1506                    => expected -= rolled_out as usize
1507            );
1508
1509            // And check how many are remaining.
1510            assert_eq!(cursor.count().await, expected);
1511        }
1512    }
1513
1514    #[fuchsia::test]
1515    async fn drained_post_termination_cursors() {
1516        let buffer = SharedBuffer::new(
1517            create_ring_buffer(MAX_MESSAGE_SIZE),
1518            Box::new(|_| {}),
1519            Default::default(),
1520        );
1521        let container =
1522            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1523        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1524
1525        let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1526        let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1527
1528        container.push_back(msg.bytes());
1529        container.push_back(msg.bytes());
1530        container.push_back(msg.bytes());
1531        container.push_back(msg.bytes());
1532        container.push_back(msg.bytes());
1533
1534        let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1535        assert!(cursor_a.next().await.is_some());
1536        assert!(cursor_b.next().await.is_some());
1537        assert!(cursor_c.next().await.is_some());
1538
1539        container.terminate();
1540
1541        // All cursors should return the 4 remaining messages.
1542        assert_eq!(cursor_a.count().await, 4);
1543        assert_eq!(cursor_b.count().await, 4);
1544        assert_eq!(cursor_c.count().await, 4);
1545    }
1546
1547    #[fuchsia::test]
1548    async fn empty_post_termination_cursors() {
1549        let buffer = SharedBuffer::new(
1550            create_ring_buffer(MAX_MESSAGE_SIZE),
1551            Box::new(|_| {}),
1552            Default::default(),
1553        );
1554        let container =
1555            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1556
1557        let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1558        let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1559        let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1560
1561        container.terminate();
1562
1563        assert_eq!(cursor_a.count().await, 0);
1564        assert_eq!(cursor_b.count().await, 0);
1565        assert_eq!(cursor_c.count().await, 0);
1566    }
1567
1568    #[fuchsia::test]
1569    async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1570        let buffer = SharedBuffer::new(
1571            create_ring_buffer(MAX_MESSAGE_SIZE),
1572            Box::new(|_| {}),
1573            SharedBufferOptions { sleep_time: Duration::ZERO },
1574        );
1575        let container_a =
1576            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1577        let container_b =
1578            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1579        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1580        container_a.push_back(msg.bytes());
1581        container_a.push_back(msg.bytes());
1582        container_a.push_back(msg.bytes());
1583        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1584
1585        // Roll out all the messages.
1586        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1587            container_b.push_back(msg.bytes());
1588
1589            // Yield to the executor to allow messages to be rolled out.
1590            yield_to_executor().await;
1591        }
1592
1593        assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1594
1595        assert!(poll!(cursor.next()).is_pending());
1596
1597        container_a.terminate();
1598        assert_eq!(cursor.count().await, 0);
1599    }
1600
1601    #[fuchsia::test]
1602    async fn recycled_container_slot() {
1603        let buffer = Arc::new(SharedBuffer::new(
1604            create_ring_buffer(MAX_MESSAGE_SIZE),
1605            Box::new(|_| {}),
1606            SharedBufferOptions { sleep_time: Duration::ZERO },
1607        ));
1608        let container_a =
1609            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1610        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1611        container_a.push_back(msg.bytes());
1612
1613        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1614        assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1615
1616        // Roll out all the messages.
1617        let container_b =
1618            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1619        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1620            container_b.push_back(msg.bytes());
1621
1622            // Yield to the executor to allow messages to be rolled out.
1623            yield_to_executor().await;
1624        }
1625
1626        container_a.terminate();
1627
1628        // This should create a new container that uses a new slot and shouldn't interfere with
1629        // container_a.
1630        let container_c =
1631            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1632        container_c.push_back(msg.bytes());
1633        container_c.push_back(msg.bytes());
1634
1635        // The original cursor should have finished.
1636        assert_matches!(cursor.next().await, None);
1637    }
1638
1639    #[fuchsia::test]
1640    async fn socket_increments_logstats() {
1641        let inspector = Inspector::new(InspectorConfig::default());
1642        let stats: Arc<LogStreamStats> =
1643            Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1644        let buffer = Arc::new(SharedBuffer::new(
1645            create_ring_buffer(65536),
1646            Box::new(|_| {}),
1647            Default::default(),
1648        ));
1649        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1650        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1651
1652        let (local, remote) = zx::Socket::create_datagram();
1653        container_a.add_socket(remote);
1654
1655        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1656
1657        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1658        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1659        let mut futures = FuturesUnordered::new();
1660        futures.push(async move {
1661            let mut cursor_a = pin!(cursor_a);
1662            cursor_a.next().await
1663        });
1664        let mut next = futures.next();
1665        assert!(futures::poll!(&mut next).is_pending());
1666
1667        local.write(msg.bytes()).unwrap();
1668
1669        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1670
1671        assert_eq!(
1672            cursor_b
1673                .map(|item| {
1674                    match item {
1675                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1676                        _ => panic!("Unexpected item {item:?}"),
1677                    }
1678                })
1679                .count()
1680                .await,
1681            1
1682        );
1683
1684        // If cursor_a wasn't woken, this will hang.
1685        next.await;
1686        // Validate logstats (must happen after the socket was handled)
1687        assert_data_tree!(
1688            inspector,
1689            root: contains {
1690                test: {
1691                    url: "",
1692                    last_timestamp: AnyProperty,
1693                    sockets_closed: 0u64,
1694                    sockets_opened: 1u64,
1695                    invalid: {
1696                        number: 0u64,
1697                        bytes: 0u64,
1698                    },
1699                    total: {
1700                        number: 1u64,
1701                        bytes: 88u64,
1702                    },
1703                    rolled_out: {
1704                        number: 0u64,
1705                        bytes: 0u64,
1706                    },
1707                    trace: {
1708                        number: 0u64,
1709                        bytes: 0u64,
1710                    },
1711                    debug: {
1712                        number: 1u64,
1713                        bytes: 88u64,
1714                    },
1715                    info: {
1716                        number: 0u64,
1717                        bytes: 0u64,
1718                    },
1719                    warn: {
1720                        number: 0u64,
1721                        bytes: 0u64,
1722                    },
1723                    error: {
1724                        number: 0u64,
1725                        bytes: 0u64,
1726                    },
1727                    fatal: {
1728                        number: 0u64,
1729                        bytes: 0u64,
1730                    },
1731                }
1732            }
1733        );
1734    }
1735
1736    #[fuchsia::test]
1737    async fn socket() {
1738        let buffer = Arc::new(SharedBuffer::new(
1739            create_ring_buffer(MAX_MESSAGE_SIZE),
1740            Box::new(|_| {}),
1741            Default::default(),
1742        ));
1743        let container_a =
1744            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1745        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1746
1747        let (local, remote) = zx::Socket::create_datagram();
1748        container_a.add_socket(remote);
1749
1750        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1751
1752        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1753        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1754        let mut futures = FuturesUnordered::new();
1755        futures.push(async move {
1756            let mut cursor_a = pin!(cursor_a);
1757            cursor_a.next().await
1758        });
1759        let mut next = futures.next();
1760        assert!(futures::poll!(&mut next).is_pending());
1761
1762        local.write(msg.bytes()).unwrap();
1763
1764        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1765
1766        assert_eq!(
1767            cursor_b
1768                .map(|item| {
1769                    match item {
1770                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1771                        _ => panic!("Unexpected item {item:?}"),
1772                    }
1773                })
1774                .count()
1775                .await,
1776            1
1777        );
1778
1779        // If cursor_a wasn't woken, this will hang.
1780        next.await;
1781    }
1782
1783    #[fuchsia::test]
1784    async fn socket_on_inactive() {
1785        let on_inactive = Arc::new(AtomicU64::new(0));
1786        let a_identity = Arc::new(vec!["a"].into());
1787        let buffer = Arc::new(SharedBuffer::new(
1788            create_ring_buffer(MAX_MESSAGE_SIZE),
1789            {
1790                let on_inactive = Arc::clone(&on_inactive);
1791                let a_identity = Arc::clone(&a_identity);
1792                Box::new(move |id| {
1793                    assert_eq!(id, a_identity);
1794                    on_inactive.fetch_add(1, Ordering::Relaxed);
1795                })
1796            },
1797            SharedBufferOptions { sleep_time: Duration::ZERO },
1798        ));
1799        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1800        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1801
1802        let (local, remote) = zx::Socket::create_datagram();
1803        container_a.add_socket(remote);
1804
1805        local.write(msg.bytes()).unwrap();
1806
1807        let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1808
1809        assert_eq!(
1810            cursor
1811                .map(|item| {
1812                    match item {
1813                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1814                        _ => panic!("Unexpected item {item:?}"),
1815                    }
1816                })
1817                .count()
1818                .await,
1819            1
1820        );
1821
1822        // Now roll out a's messages.
1823        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1824        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1825            container_b.push_back(msg.bytes());
1826
1827            // Yield to the executor to allow messages to be rolled out.
1828            yield_to_executor().await;
1829        }
1830
1831        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1832
1833        // Close the socket.
1834        std::mem::drop(local);
1835
1836        // We don't know when the socket thread will run so we have to loop.
1837        while on_inactive.load(Ordering::Relaxed) != 1 {
1838            fasync::Timer::new(Duration::from_millis(50)).await;
1839        }
1840    }
1841
1842    #[fuchsia::test]
1843    async fn flush() {
1844        let a_identity = Arc::new(vec!["a"].into());
1845        let buffer = Arc::new(SharedBuffer::new(
1846            create_ring_buffer(1024 * 1024),
1847            Box::new(|_| {}),
1848            Default::default(),
1849        ));
1850        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1851        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1852
1853        let (local, remote) = zx::Socket::create_datagram();
1854        container_a.add_socket(remote);
1855
1856        let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1857
1858        const COUNT: usize = 1000;
1859        for _ in 0..COUNT {
1860            local.write(msg.bytes()).unwrap();
1861        }
1862
1863        // Race two flush futures.
1864        let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1865        flush_futures.next().await;
1866
1867        let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1868        assert!(messages.is_some());
1869
1870        // Make sure the other one finishes too.
1871        flush_futures.next().await;
1872
1873        // Make sure we can still terminate the buffer.
1874        buffer.terminate().await;
1875    }
1876}