archivist_lib/logs/
multiplex.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::container::{CursorItem, LogsArtifactsContainer};
7use derivative::Derivative;
8use diagnostics_data::{Data, Logs};
9use fidl_fuchsia_diagnostics::{Selector, StreamMode};
10use fuchsia_trace as ftrace;
11use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
12use futures::{Stream, StreamExt};
13use log::trace;
14use selectors::SelectorExt;
15use std::cmp::Ordering;
16use std::pin::Pin;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21pub type PinStream<I> = Pin<Box<dyn Stream<Item = I> + Send + 'static>>;
22
23static MULTIPLEXER_ID: std::sync::atomic::AtomicUsize = AtomicUsize::new(0);
24
25/// Stream-ordering multiplexer
26///
27/// A Multiplexer takes multiple possibly-ordered streams and attempts to impose
28/// a sensible ordering over the yielded items without risking starvation. New
29/// streams can be added to the multiplexer by sending them on a channel.
30pub struct Multiplexer<I> {
31    // TODO(https://fxbug.dev/42147260) explore using a BinaryHeap for sorting substreams
32    current: Vec<SubStream<I>>,
33    incoming: UnboundedReceiver<IncomingStream<PinStream<I>>>,
34    incoming_is_live: bool,
35    selectors: Option<Vec<Selector>>,
36    id: usize,
37
38    /// The multiplexer id will be sent through this channel when the Multiplexer is dropped. This
39    /// is used to clean up MultiplexerHandles in the MultiplexerBroker.
40    on_drop_id_sender: Option<UnboundedSender<usize>>,
41}
42
43impl<I> Multiplexer<I> {
44    pub fn new(
45        parent_trace_id: ftrace::Id,
46        selectors: Option<Vec<Selector>>,
47        substreams: impl Iterator<Item = (Arc<ComponentIdentity>, PinStream<I>)>,
48    ) -> (Self, MultiplexerHandle<I>) {
49        let (sender, incoming) = futures::channel::mpsc::unbounded();
50        let id = MULTIPLEXER_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
51        let current = substreams
52            .filter(|(identity, _)| Self::is_identity_allowed(&selectors, identity))
53            .map(|(identity, stream)| SubStream::new(identity, stream))
54            .collect();
55        (
56            Self {
57                current,
58                incoming,
59                incoming_is_live: true,
60                selectors,
61                id,
62                on_drop_id_sender: None,
63            },
64            MultiplexerHandle { sender, id, trace_id: parent_trace_id },
65        )
66    }
67
68    pub fn id(&self) -> usize {
69        self.id
70    }
71
72    pub fn set_on_drop_id_sender(&mut self, snd: UnboundedSender<usize>) {
73        self.on_drop_id_sender = Some(snd);
74    }
75
76    /// Drain the incoming channel to be sure we have all live sub-streams available when
77    /// considering ordering.
78    fn integrate_incoming_sub_streams(&mut self, cx: &mut Context<'_>) {
79        if self.incoming_is_live {
80            loop {
81                match self.incoming.poll_next_unpin(cx) {
82                    // incoming has more for us right now
83                    Poll::Ready(Some(IncomingStream::Next { identity, stream })) => {
84                        if self.selectors_allow(&identity) {
85                            self.current.push(SubStream::new(Arc::clone(&identity), stream));
86                        }
87                    }
88
89                    // incoming has no more for us
90                    Poll::Ready(Some(IncomingStream::Done)) | Poll::Ready(None) => {
91                        self.incoming_is_live = false;
92                        break;
93                    }
94
95                    // incoming has more for us, but not now
96                    Poll::Pending => break,
97                }
98            }
99        }
100    }
101
102    fn selectors_allow(&self, identity: &ComponentIdentity) -> bool {
103        Self::is_identity_allowed(&self.selectors, identity)
104    }
105
106    fn is_identity_allowed(
107        selectors: &Option<Vec<Selector>>,
108        identity: &ComponentIdentity,
109    ) -> bool {
110        let component_selectors = selectors
111            .as_ref()
112            .map(|ss| ss.iter().filter_map(|s| s.component_selector.as_ref()).collect::<Vec<_>>());
113        match &component_selectors {
114            None => true,
115            Some(selectors) => identity
116                .moniker
117                .match_against_component_selectors(selectors)
118                .map(|matched_selectors| !matched_selectors.is_empty())
119                .unwrap_or(false),
120        }
121    }
122}
123
124impl<I> Drop for Multiplexer<I> {
125    fn drop(&mut self) {
126        if let Some(snd) = &self.on_drop_id_sender {
127            let _ = snd.unbounded_send(self.id());
128        }
129    }
130}
131
132impl<I: Ord + Unpin> Stream for Multiplexer<I> {
133    type Item = I;
134
135    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        // ensure the incoming channel is empty so that we're considering all streams here
137        self.integrate_incoming_sub_streams(cx);
138
139        // ensure we have the latest item cached from each stream that has results
140        self.current.iter_mut().for_each(|s| s.poll_cache(cx));
141
142        // ensure we only have substreams which can still yield values
143        self.current.retain(SubStream::is_live);
144
145        // sort by the cached latest item from each sub-stream
146        self.current.sort_unstable_by(compare_sub_streams);
147
148        if self.current.is_empty() && !self.incoming_is_live {
149            // we don't have any live sub-streams and we're not getting any more
150            Poll::Ready(None)
151        } else if let Some(next) = self.current.get_mut(0).and_then(|c| c.cached.take()) {
152            // get the front item among our substreams and return it if available
153            Poll::Ready(Some(next))
154        } else {
155            // we're out of cached values but have sub-streams that claim to be pending
156            Poll::Pending
157        }
158    }
159}
160
161pub trait MultiplexerHandleAction {
162    fn send_cursor_from(&self, mode: StreamMode, container: &Arc<LogsArtifactsContainer>) -> bool;
163    /// Notify the multiplexer that no new sub-streams will be arriving.
164    fn close(&self);
165    fn multiplexer_id(&self) -> usize;
166}
167
168enum IncomingStream<S> {
169    Next { identity: Arc<ComponentIdentity>, stream: S },
170    Done,
171}
172
173pub struct MultiplexerHandle<I> {
174    id: usize,
175    trace_id: ftrace::Id,
176    sender: UnboundedSender<IncomingStream<PinStream<I>>>,
177}
178
179impl<I> MultiplexerHandle<I> {
180    fn send(&self, identity: Arc<ComponentIdentity>, stream: PinStream<I>) -> bool {
181        self.sender.unbounded_send(IncomingStream::Next { identity, stream }).is_ok()
182    }
183}
184
185impl MultiplexerHandleAction for MultiplexerHandle<Arc<Data<Logs>>> {
186    fn send_cursor_from(&self, mode: StreamMode, container: &Arc<LogsArtifactsContainer>) -> bool {
187        let stream = container.cursor(mode, self.trace_id);
188        self.send(Arc::clone(&container.identity), stream)
189    }
190
191    fn multiplexer_id(&self) -> usize {
192        self.id
193    }
194
195    fn close(&self) {
196        self.sender.unbounded_send(IncomingStream::Done).ok();
197    }
198}
199
200impl MultiplexerHandleAction for MultiplexerHandle<CursorItem> {
201    fn send_cursor_from(&self, mode: StreamMode, container: &Arc<LogsArtifactsContainer>) -> bool {
202        let stream = container.cursor_raw(mode);
203        self.send(Arc::clone(&container.identity), stream)
204    }
205
206    fn multiplexer_id(&self) -> usize {
207        self.id
208    }
209
210    fn close(&self) {
211        self.sender.unbounded_send(IncomingStream::Done).ok();
212    }
213}
214
215#[cfg(test)]
216impl MultiplexerHandleAction for MultiplexerHandle<i32> {
217    fn send_cursor_from(
218        &self,
219        _mode: StreamMode,
220        _container: &Arc<LogsArtifactsContainer>,
221    ) -> bool {
222        unreachable!("Not used in the tests");
223    }
224
225    fn multiplexer_id(&self) -> usize {
226        self.id
227    }
228
229    fn close(&self) {
230        self.sender.unbounded_send(IncomingStream::Done).ok();
231    }
232}
233
234/// A `SubStream` wraps an inner stream and keeps its latest value cached inline for comparison
235/// with the cached values of other `SubStream`s, allowing for semi-ordered merging of streams.
236#[derive(Derivative)]
237#[derivative(Debug)]
238pub struct SubStream<I> {
239    identity: Arc<ComponentIdentity>,
240    cached: Option<I>,
241    inner_is_live: bool,
242    #[derivative(Debug = "ignore")]
243    inner: PinStream<I>,
244}
245
246impl<I> SubStream<I> {
247    pub fn new(identity: Arc<ComponentIdentity>, inner: PinStream<I>) -> Self {
248        Self { identity, cached: None, inner_is_live: true, inner }
249    }
250}
251
252impl<I> SubStream<I> {
253    /// Attempts to populate the inline cache of the latest stream value, if needed.
254    fn poll_cache(&mut self, cx: &mut Context<'_>) {
255        if self.cached.is_none() && self.inner_is_live {
256            match self.inner.as_mut().poll_next(cx) {
257                Poll::Ready(Some(item)) => self.cached = Some(item),
258                Poll::Ready(None) => self.inner_is_live = false,
259                Poll::Pending => (),
260            }
261        }
262    }
263
264    fn is_live(&self) -> bool {
265        self.inner_is_live || self.cached.is_some()
266    }
267}
268
269impl<I> Drop for SubStream<I> {
270    fn drop(&mut self) {
271        trace!(identity:% = self.identity; "substream terminated");
272    }
273}
274
275/// Compare two SubStreams so that streams with cached values come before those without cached
276/// values, deferring to `I`'s `Ord` impl for those SubStreams with cached values.
277fn compare_sub_streams<I: Ord>(a: &SubStream<I>, b: &SubStream<I>) -> Ordering {
278    match (&a.cached, &b.cached) {
279        (Some(a), Some(b)) => a.cmp(b),
280        (None, Some(_)) => Ordering::Greater,
281        (Some(_), None) => Ordering::Less,
282        (None, None) => Ordering::Equal,
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use futures::prelude::*;
290    use futures::stream::iter as iter2stream;
291    use selectors::FastError;
292
293    #[fuchsia::test]
294    async fn empty_multiplexer_terminates() {
295        let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
296        handle.close();
297        let observed: Vec<i32> = mux.collect().await;
298        let expected: Vec<i32> = vec![];
299        assert_eq!(observed, expected);
300    }
301
302    #[fuchsia::test]
303    async fn empty_input_streams_terminate() {
304        let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
305
306        handle
307            .send(Arc::new(vec!["empty1"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
308        handle
309            .send(Arc::new(vec!["empty2"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
310        handle
311            .send(Arc::new(vec!["empty3"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
312
313        handle.close();
314        let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
315        let expected: Vec<i32> = vec![];
316        assert_eq!(observed, expected);
317    }
318
319    #[fuchsia::test]
320    async fn outputs_are_ordered() {
321        let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
322        handle.send(
323            Arc::new(vec!["first"].into()),
324            Box::pin(iter2stream(vec![1, 3, 5, 7])) as PinStream<i32>,
325        );
326        handle.send(
327            Arc::new(vec!["second"].into()),
328            Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>,
329        );
330        handle.send(
331            Arc::new(vec!["third"].into()),
332            Box::pin(iter2stream(vec![9, 10, 11])) as PinStream<i32>,
333        );
334
335        handle.close();
336        let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
337        let expected: Vec<i32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
338        assert_eq!(observed, expected);
339    }
340
341    #[fuchsia::test]
342    async fn semi_sorted_substream_semi_sorted() {
343        let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
344        handle.send(
345            Arc::new(vec!["unordered"].into()),
346            Box::pin(iter2stream(vec![1, 7, 3, 5])) as PinStream<i32>,
347        );
348        handle.send(
349            Arc::new(vec!["ordered"].into()),
350            Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>,
351        );
352
353        handle.close();
354        let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
355        // we get the stream in a weird order because `unordered`'s 3 & 5 are held up behind 7
356        let expected: Vec<i32> = vec![1, 2, 4, 6, 7, 3, 5, 8];
357        assert_eq!(observed, expected);
358    }
359
360    #[fuchsia::test]
361    async fn single_stream() {
362        let (mut send, recv) = futures::channel::mpsc::unbounded();
363        let (mut mux, handle) =
364            Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
365        handle.send(Arc::new(vec!["recv"].into()), Box::pin(recv) as PinStream<i32>);
366
367        assert!(mux.next().now_or_never().is_none());
368        send.unbounded_send(1).unwrap();
369        assert_eq!(mux.next().await.unwrap(), 1);
370        assert!(mux.next().now_or_never().is_none());
371
372        send.unbounded_send(2).unwrap();
373        send.unbounded_send(3).unwrap();
374        send.unbounded_send(4).unwrap();
375        send.unbounded_send(5).unwrap();
376        send.unbounded_send(6).unwrap();
377        send.disconnect();
378        handle.close();
379
380        let observed: Vec<i32> = mux.collect().await;
381        assert_eq!(observed, vec![2, 3, 4, 5, 6]);
382    }
383
384    #[fuchsia::test]
385    async fn two_streams_merged() {
386        let (mut send1, recv1) = futures::channel::mpsc::unbounded();
387        let (mut send2, recv2) = futures::channel::mpsc::unbounded();
388        let (mut mux, handle) =
389            Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
390        handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
391        handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
392
393        assert!(mux.next().now_or_never().is_none());
394        send1.unbounded_send(2).unwrap();
395        send2.unbounded_send(1).unwrap();
396        assert_eq!(mux.next().await.unwrap(), 1);
397        assert_eq!(mux.next().await.unwrap(), 2);
398        assert!(mux.next().now_or_never().is_none());
399
400        send1.unbounded_send(2).unwrap();
401        send2.unbounded_send(3).unwrap();
402        send1.disconnect();
403        assert_eq!(mux.next().await.unwrap(), 2);
404        assert_eq!(mux.next().await.unwrap(), 3);
405        assert!(mux.next().now_or_never().is_none());
406
407        send2.unbounded_send(4).unwrap();
408        send2.unbounded_send(5).unwrap();
409        send2.disconnect();
410        assert_eq!(mux.next().await.unwrap(), 4);
411        assert_eq!(mux.next().await.unwrap(), 5);
412        assert!(
413            mux.next().now_or_never().is_none(),
414            "multiplexer stays open even with current streams terminated"
415        );
416
417        handle.close();
418        assert!(mux.next().await.is_none());
419    }
420
421    #[fuchsia::test]
422    async fn new_sub_streams_are_merged() {
423        let (mut send1, recv1) = futures::channel::mpsc::unbounded();
424        let (mut send2, recv2) = futures::channel::mpsc::unbounded();
425        let (mut send3, recv3) = futures::channel::mpsc::unbounded();
426        let (mut mux, handle) =
427            Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
428        handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
429        handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
430
431        send3.unbounded_send(0).unwrap(); // this shouldn't show up until we add it to the mux below
432
433        assert!(mux.next().now_or_never().is_none());
434        send1.unbounded_send(2).unwrap();
435        send2.unbounded_send(1).unwrap();
436        assert_eq!(mux.next().await.unwrap(), 1);
437        assert_eq!(mux.next().await.unwrap(), 2);
438        assert!(mux.next().now_or_never().is_none());
439
440        send1.unbounded_send(3).unwrap();
441        handle.send(Arc::new(vec!["recv3"].into()), Box::pin(recv3) as PinStream<i32>);
442        assert_eq!(mux.next().await.unwrap(), 0);
443        assert_eq!(mux.next().await.unwrap(), 3);
444        assert!(mux.next().now_or_never().is_none());
445
446        handle.close();
447        assert!(mux.next().now_or_never().is_none(), "open substreams hold the multiplexer open");
448
449        send1.disconnect();
450        send2.disconnect();
451        send3.disconnect();
452        assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
453    }
454
455    #[fuchsia::test]
456    async fn snapshot_with_stopped_substream() {
457        let (mut send1, recv1) = futures::channel::mpsc::unbounded();
458        let (mut send2, recv2) = futures::channel::mpsc::unbounded();
459        let (mut mux, handle) =
460            Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
461        send1.unbounded_send(1).unwrap();
462        send1.disconnect();
463        handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1));
464
465        send2.unbounded_send(2).unwrap();
466        handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2));
467        handle.close();
468
469        assert_eq!(mux.next().await.unwrap(), 1);
470        assert_eq!(mux.next().await.unwrap(), 2);
471        assert!(mux.next().now_or_never().is_none());
472
473        send2.unbounded_send(3).unwrap();
474        assert_eq!(mux.next().await.unwrap(), 3);
475        assert!(mux.next().now_or_never().is_none());
476        send2.disconnect();
477        assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
478    }
479
480    #[fuchsia_async::run_singlethreaded(test)]
481    async fn multiplexer_selectors() {
482        let (mut send1, recv1) = futures::channel::mpsc::unbounded();
483        let (send2, recv2) = futures::channel::mpsc::unbounded();
484        let (mut mux, handle) = Multiplexer::<i32>::new(
485            ftrace::Id::random(),
486            Some(vec![selectors::parse_selector::<FastError>("recv1:root").unwrap()]),
487            std::iter::empty(),
488        );
489
490        handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
491        handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
492
493        // Verify we never see recv2 messages and we didn't event connect it.
494        assert!(mux.next().now_or_never().is_none());
495        send1.unbounded_send(1).unwrap();
496        assert!(send2.unbounded_send(2).unwrap_err().is_disconnected());
497        assert_eq!(mux.next().await.unwrap(), 1);
498        assert!(mux.next().now_or_never().is_none());
499
500        send1.disconnect();
501        handle.close();
502        assert!(mux.next().await.is_none());
503    }
504}