archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::stats::LogStreamStats;
7use crate::logs::stored_message::StoredMessage;
8use derivative::Derivative;
9use diagnostics_log_encoding::{Header, FXT_HEADER_SIZE, TRACING_FORMAT_LOG_RECORD_TYPE};
10use fidl_fuchsia_diagnostics::StreamMode;
11use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
12use fuchsia_async as fasync;
13use fuchsia_async::condition::{Condition, WakerEntry};
14use fuchsia_sync::Mutex;
15use futures::Stream;
16use log::debug;
17use pin_project::{pin_project, pinned_drop};
18use std::ops::{Deref, DerefMut, Range};
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use zerocopy::FromBytes;
23use zx::AsHandleRef as _;
24
25// The buffer needs to be big enough for at least 1 message which is MAX_DATAGRAM_LEN_BYTES (32768)
26// bytes, but we allow up to 65536 bytes.
27const MIN_BUFFER_SIZE: usize = (MAX_DATAGRAM_LEN_BYTES * 2) as usize;
28
29pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
30
31pub struct SharedBuffer {
32    inner: Condition<InnerAndSockets>,
33
34    // Callback for when a container is inactive (i.e. no logs and no sockets).
35    on_inactive: OnInactive,
36
37    // The port that we use to service the sockets.
38    port: zx::Port,
39
40    // Event used for termination.
41    event: zx::Event,
42
43    // The thread that we use to service the sockets.
44    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
45}
46
47// We have to do this to avoid "cannot borrow as mutable because it is also borrowed as immutable"
48// errors.
49struct InnerAndSockets(Inner, Slab<Socket>);
50
51impl Deref for InnerAndSockets {
52    type Target = Inner;
53    fn deref(&self) -> &Inner {
54        &self.0
55    }
56}
57
58impl DerefMut for InnerAndSockets {
59    fn deref_mut(&mut self) -> &mut Inner {
60        &mut self.0
61    }
62}
63
64struct Inner {
65    // The underlying buffer.
66    buffer: Buffer,
67
68    // Head sequence number.  It's u64, so we don't worry about wrapping.  The offset in the buffer
69    // is this modulo buffer length.
70    head: u64,
71
72    // Tail sequence number.
73    tail: u64,
74
75    // Registered containers.
76    containers: Containers,
77}
78
79impl SharedBuffer {
80    pub fn new(capacity: usize, on_inactive: OnInactive) -> Arc<Self> {
81        let this = Arc::new(Self {
82            inner: Condition::new(InnerAndSockets(
83                Inner {
84                    buffer: Buffer::new(capacity),
85                    head: 0,
86                    tail: 0,
87                    containers: Containers::default(),
88                },
89                Slab::default(),
90            )),
91            on_inactive,
92            port: zx::Port::create(),
93            event: zx::Event::create(),
94            socket_thread: Mutex::default(),
95        });
96        *this.socket_thread.lock() = Some({
97            let this = Arc::clone(&this);
98            let ehandle = fasync::EHandle::local();
99            std::thread::spawn(move || this.socket_thread(ehandle))
100        });
101        this
102    }
103
104    pub fn new_container_buffer(
105        self: &Arc<Self>,
106        identity: Arc<ComponentIdentity>,
107        stats: Arc<LogStreamStats>,
108    ) -> ContainerBuffer {
109        ContainerBuffer {
110            shared_buffer: Arc::clone(self),
111            container_id: self.inner.lock().containers.new_container(identity, stats),
112        }
113    }
114
115    /// Returns the number of registered containers in use by the buffer.
116    #[cfg(test)]
117    pub fn container_count(&self) -> usize {
118        self.inner.lock().0.containers.len()
119    }
120
121    /// Terminates the socket thread.  The socket thread will drain the sockets before terminating.
122    pub async fn terminate(&self) {
123        self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
124        let join_handle = self.socket_thread.lock().take().unwrap();
125        fasync::unblock(|| {
126            let _ = join_handle.join();
127        })
128        .await;
129    }
130
131    fn socket_thread(&self, ehandle: fasync::EHandle) {
132        let mut sockets_ready = Vec::new();
133
134        // Register event used for termination.
135        const TERMINATE_KEY: u64 = u64::MAX;
136        self.event
137            .wait_async_handle(
138                &self.port,
139                TERMINATE_KEY,
140                zx::Signals::USER_0,
141                zx::WaitAsyncOpts::empty(),
142            )
143            .unwrap();
144
145        let mut terminate = false;
146        let mut finished = false;
147        let mut on_inactive = OnInactiveNotifier::new(self);
148
149        while !finished {
150            let mut deadline = if terminate {
151                // Run through the sockets one last time.
152                finished = true;
153                zx::MonotonicInstant::INFINITE_PAST
154            } else {
155                // Wait for 200ms so that we're not constantly waking up for every log message that
156                // is queued.  Ignore errors here.  We only do this on release builds.
157                #[cfg(not(debug_assertions))]
158                let _ = self.event.wait_handle(
159                    zx::Signals::USER_0,
160                    zx::MonotonicInstant::after(zx::Duration::from_millis(200)),
161                );
162
163                zx::MonotonicInstant::INFINITE
164            };
165
166            // Gather the list of sockets that are ready to read.
167            loop {
168                match self.port.wait(deadline) {
169                    Ok(packet) => {
170                        if packet.key() == TERMINATE_KEY {
171                            terminate = true;
172                        } else {
173                            sockets_ready.push(SocketId(packet.key() as u32))
174                        }
175                    }
176                    Err(zx::Status::TIMED_OUT) => break,
177                    Err(status) => panic!("port wait error: {status:?}"),
178                }
179                deadline = zx::MonotonicInstant::INFINITE_PAST;
180            }
181
182            let mut inner = self.inner.lock();
183
184            {
185                let InnerAndSockets(inner, sockets) = &mut *inner;
186
187                for socket_id in sockets_ready.drain(..) {
188                    let Some(socket) = sockets.get(socket_id.0) else {
189                        // Spurious packet for a removed socket.
190                        continue;
191                    };
192
193                    if inner.read_socket(socket, &mut on_inactive) {
194                        let container_id = socket.container_id;
195                        if let Some(container) = inner.containers.get_mut(container_id) {
196                            container.remove_socket(socket_id, sockets);
197                            if !container.is_active() {
198                                if container.should_free() {
199                                    inner.containers.free(container_id);
200                                } else {
201                                    on_inactive.push(&container.identity);
202                                }
203                            }
204                        }
205                    } else {
206                        socket
207                            .socket
208                            .wait_async_handle(
209                                &self.port,
210                                socket_id.0 as u64,
211                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
212                                zx::WaitAsyncOpts::empty(),
213                            )
214                            .unwrap();
215                    }
216                }
217            }
218
219            let wake_tasks = || {
220                for waker in inner.drain_wakers() {
221                    waker.wake();
222                }
223
224                std::mem::drop(inner);
225
226                on_inactive.notify();
227            };
228
229            if cfg!(test) {
230                // Tests don't always use a multi-threaded executor so we can't use poll_tasks.
231                wake_tasks()
232            } else {
233                // This results in a performance win because we can typically poll woken tasks
234                // without needing to wake up any other threads.
235                ehandle.poll_tasks(wake_tasks);
236            }
237        }
238    }
239}
240
241impl Inner {
242    // Returns list of containers that no longer have any messages.
243    fn ingest(
244        &mut self,
245        msg: &[u8],
246        container_id: ContainerId,
247        on_inactive: &mut OnInactiveNotifier<'_>,
248    ) {
249        if msg.len() < std::mem::size_of::<Header>() {
250            debug!("message too short ({})", msg.len());
251            if let Some(container) = self.containers.get(container_id) {
252                container.stats.increment_invalid(msg.len());
253            }
254            return;
255        }
256
257        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
258
259        // NOTE: Some tests send messages that are bigger than the header indicates.  We ignore the
260        // tail of any such messages.
261        let msg_len = header.size_words() as usize * 8;
262
263        // Check the correct type and size.
264        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
265            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
266            if let Some(container) = self.containers.get(container_id) {
267                container.stats.increment_invalid(msg.len());
268            }
269            return;
270        }
271
272        // Make sure there's space.
273        let mut space = self.space();
274        let total_len = msg_len + FXT_HEADER_SIZE;
275
276        while space < total_len {
277            space += self.pop(on_inactive).expect("No messages in buffer!");
278        }
279
280        let Some(container_info) = self.containers.get_mut(container_id) else {
281            return;
282        };
283
284        if container_info.msg_ids.end == container_info.msg_ids.start {
285            container_info.first_index = self.head;
286        }
287
288        let msg_id = container_info.msg_ids.end;
289        container_info.msg_ids.end += 1;
290
291        let (container_msg_id, remaining) =
292            u64::mut_from_prefix(self.buffer.slice_from_index_mut(self.head)).unwrap();
293        *container_msg_id = ((container_id.0 as u64) << 32) | (msg_id & 0xffff_ffff);
294        remaining[..msg_len].copy_from_slice(&msg[..msg_len]);
295
296        self.head += total_len as u64;
297    }
298
299    fn space(&self) -> usize {
300        self.buffer.len() - (self.head - self.tail) as usize
301    }
302
303    fn ensure_space(&mut self, required: usize, on_inactive: &mut OnInactiveNotifier<'_>) {
304        let mut space = self.space();
305        while space < required {
306            space += self.pop(on_inactive).expect("No messages in buffer!");
307        }
308    }
309
310    /// Pops a message and returns its total size.
311    ///
312    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
313    /// necessarily the message with the *oldest* timestamp because we might not have received the
314    /// messages in perfect timestamp order.  This should be close enough for all use cases we care
315    /// about, and besides, the timestamps can't be trusted anyway.
316    fn pop(&mut self, on_inactive: &mut OnInactiveNotifier<'_>) -> Option<usize> {
317        if self.head == self.tail {
318            return None;
319        }
320        let (container_id, msg_id, record, timestamp) = self.parse_record(self.tail);
321        let total_len = record.len() + FXT_HEADER_SIZE;
322        self.tail += total_len as u64;
323        let container = self.containers.get_mut(container_id).unwrap();
324        container.stats.increment_rolled_out(total_len);
325        assert_eq!(container.msg_ids.start as u32, msg_id);
326        container.msg_ids.start += 1;
327        if let Some(timestamp) = timestamp {
328            container.last_rolled_out_timestamp = timestamp;
329        }
330        if !container.is_active() {
331            if container.should_free() {
332                self.containers.free(container_id);
333            } else {
334                on_inactive.push(&container.identity);
335            }
336        }
337        Some(total_len)
338    }
339
340    /// Parses the record and returns (container_id, msg_id, message, timestamp).
341    fn parse_record(&self, index: u64) -> (ContainerId, u32, &[u8], Option<zx::BootInstant>) {
342        let buf = self.buffer.slice_from_index(index);
343        let (container_msg_id, msg) = u64::read_from_prefix(buf).unwrap();
344        let (header, remainder) = u64::read_from_prefix(msg).unwrap();
345        let header = Header(header);
346        let record_len = header.size_words() as usize * 8;
347        (
348            ContainerId((container_msg_id >> 32) as u32),
349            container_msg_id as u32,
350            &msg[..record_len],
351            i64::read_from_prefix(remainder).map(|(t, _)| zx::BootInstant::from_nanos(t)).ok(),
352        )
353    }
354
355    /// Returns true if the socket should be closed.
356    fn read_socket(&mut self, socket: &Socket, on_inactive: &mut OnInactiveNotifier<'_>) -> bool {
357        loop {
358            self.ensure_space(FXT_HEADER_SIZE + MAX_DATAGRAM_LEN_BYTES as usize, on_inactive);
359
360            let dest = self.buffer.slice_from_index_mut(self.head);
361
362            // Read directly into the buffer leaving space for the header.
363            let amount_read = match socket
364                .socket
365                .read(&mut dest[FXT_HEADER_SIZE..FXT_HEADER_SIZE + MAX_DATAGRAM_LEN_BYTES as usize])
366            {
367                Ok(a) => a,
368                Err(zx::Status::SHOULD_WAIT) => return false,
369                Err(_) => return true,
370            };
371
372            let Some(container) = self.containers.get_mut(socket.container_id) else {
373                // Container no longer exists.
374                return true;
375            };
376
377            if amount_read < 8 {
378                container.stats.increment_invalid(amount_read);
379                continue;
380            }
381            let header = Header::read_from_bytes(
382                &dest[FXT_HEADER_SIZE..FXT_HEADER_SIZE + std::mem::size_of::<Header>()],
383            )
384            .unwrap();
385            let msg_len = header.size_words() as usize * 8;
386            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != amount_read {
387                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, amount_read);
388                container.stats.increment_invalid(amount_read);
389                continue;
390            }
391
392            container.stats.ingest_message(amount_read, header.severity().into());
393
394            if container.msg_ids.end == container.msg_ids.start {
395                container.first_index = self.head;
396            }
397
398            let msg_id = container.msg_ids.end;
399            container.msg_ids.end += 1;
400
401            // Every message in the shared buffer contains an 8 byte header consisting of a 32 bit
402            // container ID, followed by a the least significant 32 bits of the per-container
403            // message ID.
404            let (container_msg_id, _) = u64::mut_from_prefix(dest).unwrap();
405            *container_msg_id = ((socket.container_id.0 as u64) << 32) | (msg_id & 0xffff_ffff);
406
407            self.head += (FXT_HEADER_SIZE + amount_read) as u64;
408        }
409    }
410}
411
412#[derive(Default)]
413struct Containers {
414    slab: Slab<ContainerInfo>,
415}
416
417#[derive(Clone, Copy, Debug)]
418struct ContainerId(u32);
419
420impl Containers {
421    #[cfg(test)]
422    fn len(&self) -> usize {
423        self.slab.len()
424    }
425
426    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
427        self.slab.get(id.0)
428    }
429
430    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
431        self.slab.get_mut(id.0)
432    }
433
434    fn new_container(
435        &mut self,
436        identity: Arc<ComponentIdentity>,
437        stats: Arc<LogStreamStats>,
438    ) -> ContainerId {
439        ContainerId(self.slab.insert(|_| ContainerInfo::new(identity, stats)))
440    }
441
442    fn free(&mut self, id: ContainerId) {
443        self.slab.free(id.0)
444    }
445}
446
447#[derive(Derivative)]
448#[derivative(Debug)]
449struct ContainerInfo {
450    // The number of references that prevent this object from being freed.
451    refs: usize,
452
453    // The identity of the container.
454    identity: Arc<ComponentIdentity>,
455
456    // The first index in the shared buffer for a message for this container.  This index
457    // might have been rolled out.  This is used as an optimisation to set the starting
458    // cursor position.
459    first_index: u64,
460
461    // The first and last message IDs stored in the shared buffer.
462    msg_ids: Range<u64>,
463
464    // Whether the container is terminated.
465    terminated: bool,
466
467    // Inspect instrumentation.
468    #[derivative(Debug = "ignore")]
469    stats: Arc<LogStreamStats>,
470
471    // The timestamp of the message most recently rolled out.
472    last_rolled_out_timestamp: zx::BootInstant,
473
474    // The first socket ID for this container.
475    first_socket_id: SocketId,
476}
477
478impl ContainerInfo {
479    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>) -> Self {
480        Self {
481            refs: 0,
482            identity,
483            first_index: 0,
484            msg_ids: 0..0,
485            terminated: false,
486            stats,
487            last_rolled_out_timestamp: zx::BootInstant::ZERO,
488            first_socket_id: SocketId::NULL,
489        }
490    }
491
492    fn should_free(&self) -> bool {
493        self.terminated && self.refs == 0 && !self.is_active()
494    }
495
496    fn is_active(&self) -> bool {
497        self.msg_ids.end != self.msg_ids.start || self.first_socket_id != SocketId::NULL
498    }
499
500    // # Panics
501    //
502    // This will panic if the socket isn't found.
503    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
504        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
505        if prev == SocketId::NULL {
506            self.first_socket_id = next;
507        } else {
508            sockets.get_mut(prev.0).unwrap().next = next;
509        }
510        if next != SocketId::NULL {
511            sockets
512                .get_mut(next.0)
513                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
514                .prev = prev;
515        }
516        sockets.free(socket_id.0);
517        debug!(identity:% = self.identity; "Socket closed.");
518    }
519}
520
521pub struct ContainerBuffer {
522    shared_buffer: Arc<SharedBuffer>,
523    container_id: ContainerId,
524}
525
526impl ContainerBuffer {
527    /// Ingests a new message.
528    ///
529    /// If the message is invalid, it is dropped.
530    pub fn push_back(&self, msg: &[u8]) {
531        let mut on_inactive = OnInactiveNotifier::new(&self.shared_buffer);
532        let mut inner = self.shared_buffer.inner.lock();
533
534        inner.ingest(msg, self.container_id, &mut on_inactive);
535
536        for waker in inner.drain_wakers() {
537            waker.wake();
538        }
539    }
540
541    /// Returns a cursor.
542    pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
543        let mut inner = self.shared_buffer.inner.lock();
544        let head = inner.head;
545        let Some(container) = inner.containers.get_mut(self.container_id) else {
546            // We've hit a race where the container has terminated.
547            return None;
548        };
549
550        container.refs += 1;
551        let stats = Arc::clone(&container.stats);
552
553        let (index, next_id, end) = match mode {
554            StreamMode::Snapshot => {
555                (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
556            }
557            StreamMode::Subscribe => (head, container.msg_ids.end, CursorEnd::Stream),
558            StreamMode::SnapshotThenSubscribe => {
559                (container.first_index, container.msg_ids.start, CursorEnd::Stream)
560            }
561        };
562
563        Some(Cursor {
564            index,
565            container_id: self.container_id,
566            next_id,
567            end,
568            buffer: Arc::clone(&self.shared_buffer),
569            waker_entry: WakerEntry::new(),
570            stats,
571        })
572    }
573
574    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
575    /// The component's data will remain in the buffer until the messages are rolled out.  This will
576    /// *not* drain sockets.
577    pub fn terminate(&self) {
578        let mut guard = self.shared_buffer.inner.lock();
579        let InnerAndSockets(inner, sockets) = &mut *guard;
580        if let Some(container) = inner.containers.get_mut(self.container_id) {
581            container.terminated = true;
582            if container.first_socket_id != SocketId::NULL {
583                loop {
584                    container.remove_socket(container.first_socket_id, sockets);
585                    if container.first_socket_id == SocketId::NULL {
586                        break;
587                    }
588                }
589            }
590            if container.should_free() {
591                inner.containers.free(self.container_id);
592            }
593            for waker in guard.drain_wakers() {
594                waker.wake();
595            }
596        }
597    }
598
599    /// Returns true if the container has messages or sockets.
600    pub fn is_active(&self) -> bool {
601        self.shared_buffer.inner.lock().containers.get(self.container_id).is_some_and(|c| {
602            c.msg_ids.start != c.msg_ids.end || c.first_socket_id != SocketId::NULL
603        })
604    }
605
606    /// Adds a socket for this container.
607    pub fn add_socket(&self, socket: zx::Socket) {
608        let mut guard = self.shared_buffer.inner.lock();
609        let InnerAndSockets(inner, sockets) = &mut *guard;
610        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
611        let next = container.first_socket_id;
612        let socket_id = SocketId(sockets.insert(|socket_id| {
613            socket
614                .wait_async_handle(
615                    &self.shared_buffer.port,
616                    socket_id as u64,
617                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
618                    zx::WaitAsyncOpts::empty(),
619                )
620                .unwrap();
621            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
622        }));
623        if next != SocketId::NULL {
624            sockets.get_mut(next.0).unwrap().prev = socket_id;
625        }
626        container.first_socket_id = socket_id;
627    }
628}
629
630impl Drop for ContainerBuffer {
631    fn drop(&mut self) {
632        self.terminate();
633    }
634}
635
636#[pin_project(PinnedDrop)]
637#[derive(Derivative)]
638#[derivative(Debug)]
639pub struct Cursor {
640    // Index in the buffer that we should continue searching from.
641    index: u64,
642
643    container_id: ContainerId,
644
645    // The next expected message ID.
646    next_id: u64,
647
648    // The ID, if any, that we should end at (exclusive).
649    end: CursorEnd,
650
651    #[derivative(Debug = "ignore")]
652    buffer: Arc<SharedBuffer>,
653
654    // waker_entry is used to register a waker for subscribing cursors.
655    #[pin]
656    #[derivative(Debug = "ignore")]
657    waker_entry: WakerEntry<InnerAndSockets>,
658
659    #[derivative(Debug = "ignore")]
660    stats: Arc<LogStreamStats>,
661}
662
663/// The next element in the stream or a marker of the number of items rolled out since last polled.
664#[derive(Debug, PartialEq)]
665pub enum LazyItem<T> {
666    /// The next item in the stream.
667    Next(Arc<T>),
668    /// A count of the items dropped between the last call to poll_next and this one.
669    ItemsRolledOut(u64, zx::BootInstant),
670}
671
672impl Stream for Cursor {
673    type Item = LazyItem<StoredMessage>;
674
675    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
676        let mut this = self.project();
677        let mut on_inactive = OnInactiveNotifier::new(this.buffer);
678        let mut guard = this.buffer.inner.lock();
679        let InnerAndSockets(inner, sockets) = &mut *guard;
680
681        let mut container = match inner.containers.get(*this.container_id) {
682            None => return Poll::Ready(None),
683            Some(container) => container,
684        };
685
686        let end_id = match &mut this.end {
687            CursorEnd::Snapshot(None) => {
688                // Drain the sockets associated with this container first so that we capture
689                // pending messages.  Some tests rely on this.
690                let mut socket_id = container.first_socket_id;
691                while socket_id != SocketId::NULL {
692                    let socket = sockets.get(socket_id.0).unwrap();
693                    let next = socket.next;
694                    if inner.read_socket(socket, &mut on_inactive) {
695                        let container = inner.containers.get_mut(*this.container_id).unwrap();
696                        container.remove_socket(socket_id, sockets);
697                        if !container.is_active() {
698                            on_inactive.push(&container.identity);
699                        }
700                    }
701                    socket_id = next;
702                }
703
704                container = inner.containers.get(*this.container_id).unwrap();
705                *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
706                container.msg_ids.end
707            }
708            CursorEnd::Snapshot(Some(end)) => *end,
709            CursorEnd::Stream => u64::MAX,
710        };
711
712        if *this.next_id == end_id {
713            return Poll::Ready(None);
714        }
715
716        // See if messages have been rolled out.
717        if container.msg_ids.start > *this.next_id {
718            let mut next_id = container.msg_ids.start;
719            if end_id < next_id {
720                next_id = end_id;
721            }
722            let rolled_out = next_id - *this.next_id;
723            *this.next_id = next_id;
724            return Poll::Ready(Some(LazyItem::ItemsRolledOut(
725                rolled_out,
726                container.last_rolled_out_timestamp,
727            )));
728        }
729
730        if inner.tail > *this.index {
731            *this.index = inner.tail;
732        }
733
734        // Some optimisations to reduce the amount of searching we might have to do.
735        if container.first_index > *this.index {
736            *this.index = container.first_index;
737        }
738        if *this.next_id == container.msg_ids.end {
739            *this.index = inner.head;
740        } else {
741            // Scan forward until we find a record matching this container.
742            while *this.index < inner.head {
743                let (container_id, msg_id, record, _timestamp) = inner.parse_record(*this.index);
744                let record_len = record.len();
745
746                // Move index to the next record.
747                *this.index += record_len as u64 + 8;
748                assert!(*this.index <= inner.head);
749
750                if container_id.0 == this.container_id.0 {
751                    assert_eq!(*this.next_id as u32, msg_id);
752                    *this.next_id += 1;
753                    if let Some(msg) = StoredMessage::new(record.into(), this.stats) {
754                        return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
755                    } else {
756                        // The message is corrupt.  Just skip it.
757                    }
758                }
759            }
760        }
761
762        if container.terminated {
763            Poll::Ready(None)
764        } else {
765            guard.add_waker(this.waker_entry, cx.waker().clone());
766            Poll::Pending
767        }
768    }
769}
770
771#[pinned_drop]
772impl PinnedDrop for Cursor {
773    fn drop(self: Pin<&mut Self>) {
774        let mut inner = self.buffer.inner.lock();
775        if let Some(container) = inner.containers.get_mut(self.container_id) {
776            container.refs -= 1;
777            if container.should_free() {
778                inner.containers.free(self.container_id);
779            }
780        }
781    }
782}
783
784#[derive(Debug)]
785enum CursorEnd {
786    // The id will be None when the cursor is first created and is set after the cursor
787    // is first polled.
788    Snapshot(Option<u64>),
789    Stream,
790}
791
792/// Buffer wraps a vmo and mapping for the VMO.
793struct Buffer {
794    _vmo: zx::Vmo,
795    _vmar: zx::Vmar,
796    base: usize,
797    len: usize,
798}
799
800impl Buffer {
801    fn new(capacity: usize) -> Self {
802        let capacity = std::cmp::max(
803            MIN_BUFFER_SIZE,
804            capacity.next_multiple_of(zx::system_get_page_size() as usize),
805        );
806
807        let vmo = zx::Vmo::create(capacity as u64).unwrap();
808        let name = zx::Name::new("LogBuffer").unwrap();
809        vmo.set_name(&name).unwrap();
810        let root_vmar = fuchsia_runtime::vmar_root_self();
811
812        // Allocate a buffer but repeat the mapping for the first 64 KiB of the buffer at the end
813        // which makes dealing with wrapping much easier.  NOTE: dropping the vmar will drop the
814        // mappings.
815        let (vmar, base) = root_vmar
816            .allocate(
817                0,
818                capacity + 65536,
819                zx::VmarFlags::CAN_MAP_READ
820                    | zx::VmarFlags::CAN_MAP_WRITE
821                    | zx::VmarFlags::CAN_MAP_SPECIFIC,
822            )
823            .unwrap();
824        vmar.map(
825            /* vmar_offset */ 0,
826            &vmo,
827            /* vmo_offset */ 0,
828            /* len */ capacity,
829            zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
830        )
831        .unwrap();
832        vmar.map(
833            /* vmar_offset */ capacity,
834            &vmo,
835            /* vmo_offset */ 0,
836            /* len */ 65536,
837            zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
838        )
839        .unwrap();
840        Self { _vmo: vmo, _vmar: vmar, base, len: capacity }
841    }
842
843    fn len(&self) -> usize {
844        self.len
845    }
846
847    /// Returns a 64 KiB slice at the specified index.  The slice is valid even where the index
848    /// involves wrapping.
849    fn slice_from_index(&self, index: u64) -> &[u8] {
850        let index = index as usize % self.len;
851        // SAFETY: Safe because we mapped this range.
852        unsafe { std::slice::from_raw_parts((self.base + index) as *const u8, 65536) }
853    }
854
855    /// Returns a 64 KiB mutable slice at the specified index.  The slice is valid even where the
856    /// index involves wrapping.
857    fn slice_from_index_mut(&mut self, index: u64) -> &mut [u8] {
858        let index = index as usize % self.len;
859        // SAFETY: Safe because we mapped this range.
860        unsafe { std::slice::from_raw_parts_mut((self.base + index) as *mut u8, 65536) }
861    }
862}
863
864/// Implements a simple Slab allocator.
865struct Slab<T> {
866    slab: Vec<Slot<T>>,
867    free_index: usize,
868}
869
870impl<T> Default for Slab<T> {
871    fn default() -> Self {
872        Self { slab: Vec::new(), free_index: usize::MAX }
873    }
874}
875
876enum Slot<T> {
877    Used(T),
878    Free(usize),
879}
880
881impl<T> Slab<T> {
882    /// Returns the number of used entries.  This is not performant.
883    #[cfg(test)]
884    fn len(&self) -> usize {
885        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
886    }
887
888    fn free(&mut self, index: u32) {
889        let index = index as usize;
890        assert!(matches!(
891            std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)),
892            Slot::Used(_)
893        ));
894        self.free_index = index;
895    }
896
897    fn get(&self, id: u32) -> Option<&T> {
898        self.slab.get(id as usize).and_then(|s| match s {
899            Slot::Used(s) => Some(s),
900            _ => None,
901        })
902    }
903
904    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
905        self.slab.get_mut(id as usize).and_then(|s| match s {
906            Slot::Used(s) => Some(s),
907            _ => None,
908        })
909    }
910
911    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
912        let free_index = self.free_index;
913        if free_index != usize::MAX {
914            self.free_index = match std::mem::replace(
915                &mut self.slab[free_index],
916                Slot::Used(value(free_index as u32)),
917            ) {
918                Slot::Free(next) => next,
919                _ => unreachable!(),
920            };
921            free_index as u32
922        } else {
923            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
924            // (see SocketId::NULL below).
925            assert!(self.slab.len() < u32::MAX as usize);
926            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
927            (self.slab.len() - 1) as u32
928        }
929    }
930}
931
932#[derive(Clone, Copy, Debug, Eq, PartialEq)]
933struct SocketId(u32);
934
935impl SocketId {
936    const NULL: Self = SocketId(0xffff_ffff);
937}
938
939struct Socket {
940    socket: zx::Socket,
941    container_id: ContainerId,
942    // Sockets are stored as a linked list for each container.
943    prev: SocketId,
944    next: SocketId,
945}
946
947/// Sends on-inactive notifications when dropped.  This *must* be dropped when no locks are held.
948struct OnInactiveNotifier<'a>(&'a SharedBuffer, Vec<Arc<ComponentIdentity>>);
949
950impl<'a> OnInactiveNotifier<'a> {
951    fn new(buffer: &'a SharedBuffer) -> Self {
952        Self(buffer, Vec::new())
953    }
954
955    fn push(&mut self, identity: &Arc<ComponentIdentity>) {
956        self.1.push(Arc::clone(identity));
957    }
958
959    fn notify(&mut self) {
960        for identity in self.1.drain(..) {
961            (*self.0.on_inactive)(identity);
962        }
963    }
964}
965
966impl Drop for OnInactiveNotifier<'_> {
967    fn drop(&mut self) {
968        self.notify();
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use super::SharedBuffer;
975    use crate::logs::shared_buffer::LazyItem;
976    use crate::logs::stats::LogStreamStats;
977    use crate::logs::testing::make_message;
978    use assert_matches::assert_matches;
979    use diagnostics_assertions::{assert_data_tree, AnyProperty};
980    use fidl_fuchsia_diagnostics::StreamMode;
981    use fuchsia_async as fasync;
982    use fuchsia_inspect::{Inspector, InspectorConfig};
983    use fuchsia_inspect_derive::WithInspect;
984    use futures::channel::mpsc;
985    use futures::poll;
986    use futures::stream::{FuturesUnordered, StreamExt as _};
987    use std::pin::pin;
988    use std::sync::atomic::{AtomicU64, Ordering};
989    use std::sync::Arc;
990
991    #[fuchsia::test]
992    async fn push_one_message() {
993        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
994        let container_buffer =
995            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
996        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
997        container_buffer.push_back(msg.bytes());
998
999        // Make sure the cursor can find it.
1000        let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1001        assert_eq!(
1002            cursor
1003                .map(|item| {
1004                    match item {
1005                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1006                        _ => panic!("Unexpected item {item:?}"),
1007                    }
1008                })
1009                .count()
1010                .await,
1011            1
1012        );
1013    }
1014
1015    #[fuchsia::test]
1016    async fn message_too_short() {
1017        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1018
1019        let container_buffer =
1020            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1021        container_buffer.push_back(&[0]);
1022
1023        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1024    }
1025
1026    #[fuchsia::test]
1027    async fn bad_type() {
1028        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1029        let container_buffer =
1030            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1031        container_buffer.push_back(&[0x77; 16]);
1032
1033        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1034    }
1035
1036    #[fuchsia::test]
1037    async fn message_truncated() {
1038        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1039        let container_buffer =
1040            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1041        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1042        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1043
1044        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1045    }
1046
1047    #[fuchsia::test]
1048    async fn buffer_wrapping() {
1049        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1050        let container_buffer =
1051            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1052
1053        // Create one message so we can figure out what the metadata costs.
1054        let msg1 = make_message("a", None, zx::BootInstant::from_nanos(1));
1055        container_buffer.push_back(msg1.bytes());
1056
1057        // The maximum message size is 32760 because size words must fit in 11 bits, so that's 4095
1058        // * 8.
1059        let delta = 32760 - msg1.bytes().len();
1060        let msg2 = make_message(
1061            std::str::from_utf8(&vec![66; 1 + delta]).unwrap(),
1062            None,
1063            zx::BootInstant::from_nanos(2),
1064        );
1065        container_buffer.push_back(msg2.bytes());
1066
1067        // And one last message to take us up to offset 65528 in the buffer.
1068        let delta = 65528 - 2 * (msg1.bytes().len() + 8) - (msg2.bytes().len() + 8);
1069        let msg3 = make_message(
1070            std::str::from_utf8(&vec![67; 1 + delta]).unwrap(),
1071            None,
1072            zx::BootInstant::from_nanos(3),
1073        );
1074        container_buffer.push_back(msg3.bytes());
1075
1076        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 3);
1077
1078        // The next message we write should wrap, and will cause the first message to be dropped.
1079        let msg4 = make_message("d", None, zx::BootInstant::from_nanos(4));
1080        container_buffer.push_back(msg4.bytes());
1081
1082        let mut expected = [msg2.bytes(), msg3.bytes(), msg4.bytes()].into_iter();
1083        assert_eq!(
1084            container_buffer
1085                .cursor(StreamMode::Snapshot)
1086                .unwrap()
1087                .map(|item| match item {
1088                    LazyItem::Next(item) => assert_eq!(item.bytes(), expected.next().unwrap()),
1089                    _ => panic!("Unexpected item {item:?}"),
1090                })
1091                .count()
1092                .await,
1093            3
1094        );
1095    }
1096
1097    #[fuchsia::test]
1098    async fn on_inactive() {
1099        let identity = Arc::new(vec!["a"].into());
1100        let on_inactive = Arc::new(AtomicU64::new(0));
1101        let buffer = {
1102            let on_inactive = Arc::clone(&on_inactive);
1103            let identity = Arc::clone(&identity);
1104            Arc::new(SharedBuffer::new(
1105                65536,
1106                Box::new(move |i| {
1107                    assert_eq!(i, identity);
1108                    on_inactive.fetch_add(1, Ordering::Relaxed);
1109                }),
1110            ))
1111        };
1112        let container_a = buffer.new_container_buffer(identity, Arc::default());
1113        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1114
1115        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1116        container_a.push_back(msg.bytes());
1117
1118        // Repeatedly write messages to b until a is rolled out.
1119        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1120            container_b.push_back(msg.bytes());
1121        }
1122
1123        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1124    }
1125
1126    #[fuchsia::test]
1127    async fn terminate_drops_container() {
1128        // Silence a clippy warning; SharedBuffer needs an executor.
1129        async {}.await;
1130
1131        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1132
1133        // terminate when buffer has no logs.
1134        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1135        assert_eq!(buffer.container_count(), 1);
1136        container_a.terminate();
1137
1138        assert_eq!(buffer.container_count(), 0);
1139
1140        // terminate when buffer has logs.
1141        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1142        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1143        container_a.push_back(msg.bytes());
1144        assert_eq!(buffer.container_count(), 1);
1145        container_a.terminate();
1146
1147        // The container should still be there because it has logs.
1148        assert_eq!(buffer.container_count(), 1);
1149
1150        // Roll out the logs.
1151        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1152        assert_eq!(buffer.container_count(), 2);
1153
1154        // Repeatedly write messages to b until a's message is dropped and then the container will
1155        // be dropped.
1156        while buffer.container_count() != 1 {
1157            container_b.push_back(msg.bytes());
1158        }
1159
1160        assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1161    }
1162
1163    #[fuchsia::test(allow_stalls = false)]
1164    async fn cursor_subscribe() {
1165        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1166            let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1167            let container =
1168                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1169            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1170            container.push_back(msg.bytes());
1171
1172            let (sender, mut receiver) = mpsc::unbounded();
1173
1174            // Run the cursor in a separate task so that we can test it gets woken correctly.
1175            {
1176                let container = Arc::clone(&container);
1177                fasync::Task::spawn(async move {
1178                    let mut cursor = pin!(container.cursor(mode).unwrap());
1179                    while let Some(item) = cursor.next().await {
1180                        sender.unbounded_send(item).unwrap();
1181                    }
1182                })
1183                .detach();
1184            }
1185
1186            // The existing message should only be returned with SnapshotThenSubscribe
1187            if mode == StreamMode::SnapshotThenSubscribe {
1188                assert_matches!(
1189                    receiver.next().await,
1190                    Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1191                );
1192            }
1193
1194            assert!(fasync::TestExecutor::poll_until_stalled(receiver.next()).await.is_pending());
1195
1196            container.push_back(msg.bytes());
1197
1198            // The message should arrive now.
1199            assert_matches!(
1200                receiver.next().await,
1201                Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1202            );
1203
1204            container.terminate();
1205
1206            // The cursor should terminate now.
1207            assert!(receiver.next().await.is_none());
1208        }
1209    }
1210
1211    #[fuchsia::test(allow_stalls = false)]
1212    async fn cursor_rolled_out() {
1213        // On the first pass we roll out before the cursor has started.  On the second pass, we roll
1214        // out after the cursor has started.
1215        for pass in 0..2 {
1216            let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1217            let container_a =
1218                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1219            let container_b =
1220                Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1221            let container_c =
1222                Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1223            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1224
1225            container_a.push_back(msg.bytes());
1226            container_a.push_back(msg.bytes());
1227            container_b.push_back(msg.bytes());
1228            container_a.push_back(msg.bytes());
1229
1230            let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1231
1232            // Get the first stored message on the first pass.
1233            if pass == 0 {
1234                assert!(cursor.next().await.is_some());
1235            }
1236
1237            // Roll out messages until container_b is rolled out.
1238            while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1239                container_c.push_back(msg.bytes());
1240            }
1241
1242            // We should have rolled out the second message in container a.
1243            assert_matches!(
1244                cursor.next().await,
1245                Some(LazyItem::ItemsRolledOut(rolled_out, t))
1246                    if t == zx::BootInstant::from_nanos(1) && rolled_out == pass + 1
1247            );
1248
1249            // And there should be one more message remaining.
1250            assert_eq!(cursor.count().await, 1);
1251        }
1252    }
1253
1254    #[fuchsia::test]
1255    async fn drained_post_termination_cursors() {
1256        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1257        let container =
1258            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1259        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1260
1261        let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1262        let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1263
1264        container.push_back(msg.bytes());
1265        container.push_back(msg.bytes());
1266        container.push_back(msg.bytes());
1267        container.push_back(msg.bytes());
1268        container.push_back(msg.bytes());
1269
1270        let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1271        assert!(cursor_a.next().await.is_some());
1272        assert!(cursor_b.next().await.is_some());
1273        assert!(cursor_c.next().await.is_some());
1274
1275        container.terminate();
1276
1277        // All cursors should return the 4 remaining messages.
1278        assert_eq!(cursor_a.count().await, 4);
1279        assert_eq!(cursor_b.count().await, 4);
1280        assert_eq!(cursor_c.count().await, 4);
1281    }
1282
1283    #[fuchsia::test]
1284    async fn empty_post_termination_cursors() {
1285        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1286        let container =
1287            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1288
1289        let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1290        let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1291        let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1292
1293        container.terminate();
1294
1295        assert_eq!(cursor_a.count().await, 0);
1296        assert_eq!(cursor_b.count().await, 0);
1297        assert_eq!(cursor_c.count().await, 0);
1298    }
1299
1300    #[fuchsia::test]
1301    async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1302        let buffer = SharedBuffer::new(65536, Box::new(|_| {}));
1303        let container_a =
1304            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1305        let container_b =
1306            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1307        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1308        container_a.push_back(msg.bytes());
1309        container_a.push_back(msg.bytes());
1310        container_a.push_back(msg.bytes());
1311        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1312
1313        // Roll out all the messages.
1314        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1315            container_b.push_back(msg.bytes());
1316        }
1317
1318        assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1319
1320        assert!(poll!(cursor.next()).is_pending());
1321
1322        container_a.terminate();
1323        assert_eq!(cursor.count().await, 0);
1324    }
1325
1326    #[fuchsia::test]
1327    async fn recycled_container_slot() {
1328        let buffer = Arc::new(SharedBuffer::new(65536, Box::new(|_| {})));
1329        let container_a =
1330            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1331        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1332        container_a.push_back(msg.bytes());
1333
1334        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1335        assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1336
1337        // Roll out all the messages.
1338        let container_b =
1339            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1340        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1341            container_b.push_back(msg.bytes());
1342        }
1343
1344        container_a.terminate();
1345
1346        // This should create a new container that uses a new slot and shouldn't interfere with
1347        // container_a.
1348        let container_c =
1349            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1350        container_c.push_back(msg.bytes());
1351        container_c.push_back(msg.bytes());
1352
1353        // The original cursor should have finished.
1354        assert_matches!(cursor.next().await, None);
1355    }
1356
1357    #[fuchsia::test]
1358    async fn socket_increments_logstats() {
1359        let inspector = Inspector::new(InspectorConfig::default());
1360        let stats: Arc<LogStreamStats> =
1361            Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1362        let buffer = Arc::new(SharedBuffer::new(65536, Box::new(|_| {})));
1363        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1364        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1365
1366        let (local, remote) = zx::Socket::create_datagram();
1367        container_a.add_socket(remote);
1368
1369        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1370
1371        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1372        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1373        let mut futures = FuturesUnordered::new();
1374        futures.push(async move {
1375            let mut cursor_a = pin!(cursor_a);
1376            cursor_a.next().await
1377        });
1378        let mut next = futures.next();
1379        assert!(futures::poll!(&mut next).is_pending());
1380
1381        local.write(msg.bytes()).unwrap();
1382
1383        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1384
1385        assert_eq!(
1386            cursor_b
1387                .map(|item| {
1388                    match item {
1389                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1390                        _ => panic!("Unexpected item {item:?}"),
1391                    }
1392                })
1393                .count()
1394                .await,
1395            1
1396        );
1397
1398        // If cursor_a wasn't woken, this will hang.
1399        next.await;
1400        // Validate logstats (must happen after the socket was handled)
1401        assert_data_tree!(
1402            inspector,
1403            root: contains {
1404                test: {
1405                    url: "",
1406                    last_timestamp: AnyProperty,
1407                    sockets_closed: 0u64,
1408                    sockets_opened: 0u64,
1409                    invalid: {
1410                        number: 0u64,
1411                        bytes: 0u64,
1412                    },
1413                    total: {
1414                        number: 1u64,
1415                        bytes: 88u64,
1416                    },
1417                    rolled_out: {
1418                        number: 0u64,
1419                        bytes: 0u64,
1420                    },
1421                    trace: {
1422                        number: 0u64,
1423                        bytes: 0u64,
1424                    },
1425                    debug: {
1426                        number: 1u64,
1427                        bytes: 88u64,
1428                    },
1429                    info: {
1430                        number: 0u64,
1431                        bytes: 0u64,
1432                    },
1433                    warn: {
1434                        number: 0u64,
1435                        bytes: 0u64,
1436                    },
1437                    error: {
1438                        number: 0u64,
1439                        bytes: 0u64,
1440                    },
1441                    fatal: {
1442                        number: 0u64,
1443                        bytes: 0u64,
1444                    },
1445                }
1446            }
1447        );
1448    }
1449
1450    #[fuchsia::test]
1451    async fn socket() {
1452        let buffer = Arc::new(SharedBuffer::new(65536, Box::new(|_| {})));
1453        let container_a =
1454            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1455        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1456
1457        let (local, remote) = zx::Socket::create_datagram();
1458        container_a.add_socket(remote);
1459
1460        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1461
1462        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1463        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1464        let mut futures = FuturesUnordered::new();
1465        futures.push(async move {
1466            let mut cursor_a = pin!(cursor_a);
1467            cursor_a.next().await
1468        });
1469        let mut next = futures.next();
1470        assert!(futures::poll!(&mut next).is_pending());
1471
1472        local.write(msg.bytes()).unwrap();
1473
1474        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1475
1476        assert_eq!(
1477            cursor_b
1478                .map(|item| {
1479                    match item {
1480                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1481                        _ => panic!("Unexpected item {item:?}"),
1482                    }
1483                })
1484                .count()
1485                .await,
1486            1
1487        );
1488
1489        // If cursor_a wasn't woken, this will hang.
1490        next.await;
1491    }
1492
1493    #[fuchsia::test]
1494    async fn socket_on_inactive() {
1495        let on_inactive = Arc::new(AtomicU64::new(0));
1496        let a_identity = Arc::new(vec!["a"].into());
1497        let buffer = Arc::new(SharedBuffer::new(65536, {
1498            let on_inactive = Arc::clone(&on_inactive);
1499            let a_identity = Arc::clone(&a_identity);
1500            Box::new(move |id| {
1501                assert_eq!(id, a_identity);
1502                on_inactive.fetch_add(1, Ordering::Relaxed);
1503            })
1504        }));
1505        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1506        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1507
1508        let (local, remote) = zx::Socket::create_datagram();
1509        container_a.add_socket(remote);
1510
1511        local.write(msg.bytes()).unwrap();
1512
1513        let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1514
1515        assert_eq!(
1516            cursor
1517                .map(|item| {
1518                    match item {
1519                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1520                        _ => panic!("Unexpected item {item:?}"),
1521                    }
1522                })
1523                .count()
1524                .await,
1525            1
1526        );
1527
1528        // Now roll out a's messages.
1529        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1530        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1531            container_b.push_back(msg.bytes());
1532        }
1533
1534        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1535
1536        // Close the socket.
1537        std::mem::drop(local);
1538
1539        // We don't know when the socket thread will run so we have to loop.
1540        while on_inactive.load(Ordering::Relaxed) != 1 {
1541            fasync::Timer::new(std::time::Duration::from_millis(50)).await;
1542        }
1543    }
1544}