1use crate::identity::ComponentIdentity;
6use crate::logs::container::{CursorItem, LogsArtifactsContainer};
7use derivative::Derivative;
8use diagnostics_data::{Data, Logs};
9use fidl_fuchsia_diagnostics::{Selector, StreamMode};
10use fuchsia_trace as ftrace;
11use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
12use futures::{Stream, StreamExt};
13use log::trace;
14use selectors::SelectorExt;
15use std::cmp::Ordering;
16use std::pin::Pin;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21pub type PinStream<I> = Pin<Box<dyn Stream<Item = I> + Send + 'static>>;
22
23static MULTIPLEXER_ID: std::sync::atomic::AtomicUsize = AtomicUsize::new(0);
24
25pub struct Multiplexer<I> {
31 current: Vec<SubStream<I>>,
33 incoming: UnboundedReceiver<IncomingStream<PinStream<I>>>,
34 incoming_is_live: bool,
35 selectors: Option<Vec<Selector>>,
36 id: usize,
37
38 on_drop_id_sender: Option<UnboundedSender<usize>>,
41}
42
43impl<I> Multiplexer<I> {
44 pub fn new(
45 parent_trace_id: ftrace::Id,
46 selectors: Option<Vec<Selector>>,
47 substreams: impl Iterator<Item = (Arc<ComponentIdentity>, PinStream<I>)>,
48 ) -> (Self, MultiplexerHandle<I>) {
49 let (sender, incoming) = futures::channel::mpsc::unbounded();
50 let id = MULTIPLEXER_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
51 let current = substreams
52 .filter(|(identity, _)| Self::is_identity_allowed(&selectors, identity))
53 .map(|(identity, stream)| SubStream::new(identity, stream))
54 .collect();
55 (
56 Self {
57 current,
58 incoming,
59 incoming_is_live: true,
60 selectors,
61 id,
62 on_drop_id_sender: None,
63 },
64 MultiplexerHandle { sender, id, trace_id: parent_trace_id },
65 )
66 }
67
68 pub fn id(&self) -> usize {
69 self.id
70 }
71
72 pub fn set_on_drop_id_sender(&mut self, snd: UnboundedSender<usize>) {
73 self.on_drop_id_sender = Some(snd);
74 }
75
76 fn integrate_incoming_sub_streams(&mut self, cx: &mut Context<'_>) {
79 if self.incoming_is_live {
80 loop {
81 match self.incoming.poll_next_unpin(cx) {
82 Poll::Ready(Some(IncomingStream::Next { identity, stream })) => {
84 if self.selectors_allow(&identity) {
85 self.current.push(SubStream::new(Arc::clone(&identity), stream));
86 }
87 }
88
89 Poll::Ready(Some(IncomingStream::Done)) | Poll::Ready(None) => {
91 self.incoming_is_live = false;
92 break;
93 }
94
95 Poll::Pending => break,
97 }
98 }
99 }
100 }
101
102 fn selectors_allow(&self, identity: &ComponentIdentity) -> bool {
103 Self::is_identity_allowed(&self.selectors, identity)
104 }
105
106 fn is_identity_allowed(
107 selectors: &Option<Vec<Selector>>,
108 identity: &ComponentIdentity,
109 ) -> bool {
110 let component_selectors = selectors
111 .as_ref()
112 .map(|ss| ss.iter().filter_map(|s| s.component_selector.as_ref()).collect::<Vec<_>>());
113 match &component_selectors {
114 None => true,
115 Some(selectors) => identity
116 .moniker
117 .match_against_component_selectors(selectors)
118 .map(|matched_selectors| !matched_selectors.is_empty())
119 .unwrap_or(false),
120 }
121 }
122}
123
124impl<I> Drop for Multiplexer<I> {
125 fn drop(&mut self) {
126 if let Some(snd) = &self.on_drop_id_sender {
127 let _ = snd.unbounded_send(self.id());
128 }
129 }
130}
131
132impl<I: Ord + Unpin> Stream for Multiplexer<I> {
133 type Item = I;
134
135 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 self.integrate_incoming_sub_streams(cx);
138
139 self.current.iter_mut().for_each(|s| s.poll_cache(cx));
141
142 self.current.retain(SubStream::is_live);
144
145 self.current.sort_unstable_by(compare_sub_streams);
147
148 if self.current.is_empty() && !self.incoming_is_live {
149 Poll::Ready(None)
151 } else if let Some(next) = self.current.get_mut(0).and_then(|c| c.cached.take()) {
152 Poll::Ready(Some(next))
154 } else {
155 Poll::Pending
157 }
158 }
159}
160
161pub trait MultiplexerHandleAction {
162 fn send_cursor_from(&self, mode: StreamMode, container: &Arc<LogsArtifactsContainer>) -> bool;
163 fn close(&self);
165 fn multiplexer_id(&self) -> usize;
166}
167
168enum IncomingStream<S> {
169 Next { identity: Arc<ComponentIdentity>, stream: S },
170 Done,
171}
172
173pub struct MultiplexerHandle<I> {
174 id: usize,
175 trace_id: ftrace::Id,
176 sender: UnboundedSender<IncomingStream<PinStream<I>>>,
177}
178
179impl<I> MultiplexerHandle<I> {
180 fn send(&self, identity: Arc<ComponentIdentity>, stream: PinStream<I>) -> bool {
181 self.sender.unbounded_send(IncomingStream::Next { identity, stream }).is_ok()
182 }
183}
184
185impl MultiplexerHandleAction for MultiplexerHandle<Arc<Data<Logs>>> {
186 fn send_cursor_from(&self, mode: StreamMode, container: &Arc<LogsArtifactsContainer>) -> bool {
187 let stream = container.cursor(mode, self.trace_id);
188 self.send(Arc::clone(&container.identity), stream)
189 }
190
191 fn multiplexer_id(&self) -> usize {
192 self.id
193 }
194
195 fn close(&self) {
196 self.sender.unbounded_send(IncomingStream::Done).ok();
197 }
198}
199
200impl MultiplexerHandleAction for MultiplexerHandle<CursorItem> {
201 fn send_cursor_from(&self, mode: StreamMode, container: &Arc<LogsArtifactsContainer>) -> bool {
202 let stream = container.cursor_raw(mode);
203 self.send(Arc::clone(&container.identity), stream)
204 }
205
206 fn multiplexer_id(&self) -> usize {
207 self.id
208 }
209
210 fn close(&self) {
211 self.sender.unbounded_send(IncomingStream::Done).ok();
212 }
213}
214
215#[cfg(test)]
216impl MultiplexerHandleAction for MultiplexerHandle<i32> {
217 fn send_cursor_from(
218 &self,
219 _mode: StreamMode,
220 _container: &Arc<LogsArtifactsContainer>,
221 ) -> bool {
222 unreachable!("Not used in the tests");
223 }
224
225 fn multiplexer_id(&self) -> usize {
226 self.id
227 }
228
229 fn close(&self) {
230 self.sender.unbounded_send(IncomingStream::Done).ok();
231 }
232}
233
234#[derive(Derivative)]
237#[derivative(Debug)]
238pub struct SubStream<I> {
239 identity: Arc<ComponentIdentity>,
240 cached: Option<I>,
241 inner_is_live: bool,
242 #[derivative(Debug = "ignore")]
243 inner: PinStream<I>,
244}
245
246impl<I> SubStream<I> {
247 pub fn new(identity: Arc<ComponentIdentity>, inner: PinStream<I>) -> Self {
248 Self { identity, cached: None, inner_is_live: true, inner }
249 }
250}
251
252impl<I> SubStream<I> {
253 fn poll_cache(&mut self, cx: &mut Context<'_>) {
255 if self.cached.is_none() && self.inner_is_live {
256 match self.inner.as_mut().poll_next(cx) {
257 Poll::Ready(Some(item)) => self.cached = Some(item),
258 Poll::Ready(None) => self.inner_is_live = false,
259 Poll::Pending => (),
260 }
261 }
262 }
263
264 fn is_live(&self) -> bool {
265 self.inner_is_live || self.cached.is_some()
266 }
267}
268
269impl<I> Drop for SubStream<I> {
270 fn drop(&mut self) {
271 trace!(identity:% = self.identity; "substream terminated");
272 }
273}
274
275fn compare_sub_streams<I: Ord>(a: &SubStream<I>, b: &SubStream<I>) -> Ordering {
278 match (&a.cached, &b.cached) {
279 (Some(a), Some(b)) => a.cmp(b),
280 (None, Some(_)) => Ordering::Greater,
281 (Some(_), None) => Ordering::Less,
282 (None, None) => Ordering::Equal,
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use futures::prelude::*;
290 use futures::stream::iter as iter2stream;
291 use selectors::FastError;
292
293 #[fuchsia::test]
294 async fn empty_multiplexer_terminates() {
295 let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
296 handle.close();
297 let observed: Vec<i32> = mux.collect().await;
298 let expected: Vec<i32> = vec![];
299 assert_eq!(observed, expected);
300 }
301
302 #[fuchsia::test]
303 async fn empty_input_streams_terminate() {
304 let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
305
306 handle
307 .send(Arc::new(vec!["empty1"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
308 handle
309 .send(Arc::new(vec!["empty2"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
310 handle
311 .send(Arc::new(vec!["empty3"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
312
313 handle.close();
314 let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
315 let expected: Vec<i32> = vec![];
316 assert_eq!(observed, expected);
317 }
318
319 #[fuchsia::test]
320 async fn outputs_are_ordered() {
321 let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
322 handle.send(
323 Arc::new(vec!["first"].into()),
324 Box::pin(iter2stream(vec![1, 3, 5, 7])) as PinStream<i32>,
325 );
326 handle.send(
327 Arc::new(vec!["second"].into()),
328 Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>,
329 );
330 handle.send(
331 Arc::new(vec!["third"].into()),
332 Box::pin(iter2stream(vec![9, 10, 11])) as PinStream<i32>,
333 );
334
335 handle.close();
336 let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
337 let expected: Vec<i32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
338 assert_eq!(observed, expected);
339 }
340
341 #[fuchsia::test]
342 async fn semi_sorted_substream_semi_sorted() {
343 let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
344 handle.send(
345 Arc::new(vec!["unordered"].into()),
346 Box::pin(iter2stream(vec![1, 7, 3, 5])) as PinStream<i32>,
347 );
348 handle.send(
349 Arc::new(vec!["ordered"].into()),
350 Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>,
351 );
352
353 handle.close();
354 let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
355 let expected: Vec<i32> = vec![1, 2, 4, 6, 7, 3, 5, 8];
357 assert_eq!(observed, expected);
358 }
359
360 #[fuchsia::test]
361 async fn single_stream() {
362 let (mut send, recv) = futures::channel::mpsc::unbounded();
363 let (mut mux, handle) =
364 Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
365 handle.send(Arc::new(vec!["recv"].into()), Box::pin(recv) as PinStream<i32>);
366
367 assert!(mux.next().now_or_never().is_none());
368 send.unbounded_send(1).unwrap();
369 assert_eq!(mux.next().await.unwrap(), 1);
370 assert!(mux.next().now_or_never().is_none());
371
372 send.unbounded_send(2).unwrap();
373 send.unbounded_send(3).unwrap();
374 send.unbounded_send(4).unwrap();
375 send.unbounded_send(5).unwrap();
376 send.unbounded_send(6).unwrap();
377 send.disconnect();
378 handle.close();
379
380 let observed: Vec<i32> = mux.collect().await;
381 assert_eq!(observed, vec![2, 3, 4, 5, 6]);
382 }
383
384 #[fuchsia::test]
385 async fn two_streams_merged() {
386 let (mut send1, recv1) = futures::channel::mpsc::unbounded();
387 let (mut send2, recv2) = futures::channel::mpsc::unbounded();
388 let (mut mux, handle) =
389 Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
390 handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
391 handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
392
393 assert!(mux.next().now_or_never().is_none());
394 send1.unbounded_send(2).unwrap();
395 send2.unbounded_send(1).unwrap();
396 assert_eq!(mux.next().await.unwrap(), 1);
397 assert_eq!(mux.next().await.unwrap(), 2);
398 assert!(mux.next().now_or_never().is_none());
399
400 send1.unbounded_send(2).unwrap();
401 send2.unbounded_send(3).unwrap();
402 send1.disconnect();
403 assert_eq!(mux.next().await.unwrap(), 2);
404 assert_eq!(mux.next().await.unwrap(), 3);
405 assert!(mux.next().now_or_never().is_none());
406
407 send2.unbounded_send(4).unwrap();
408 send2.unbounded_send(5).unwrap();
409 send2.disconnect();
410 assert_eq!(mux.next().await.unwrap(), 4);
411 assert_eq!(mux.next().await.unwrap(), 5);
412 assert!(
413 mux.next().now_or_never().is_none(),
414 "multiplexer stays open even with current streams terminated"
415 );
416
417 handle.close();
418 assert!(mux.next().await.is_none());
419 }
420
421 #[fuchsia::test]
422 async fn new_sub_streams_are_merged() {
423 let (mut send1, recv1) = futures::channel::mpsc::unbounded();
424 let (mut send2, recv2) = futures::channel::mpsc::unbounded();
425 let (mut send3, recv3) = futures::channel::mpsc::unbounded();
426 let (mut mux, handle) =
427 Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
428 handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
429 handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
430
431 send3.unbounded_send(0).unwrap(); assert!(mux.next().now_or_never().is_none());
434 send1.unbounded_send(2).unwrap();
435 send2.unbounded_send(1).unwrap();
436 assert_eq!(mux.next().await.unwrap(), 1);
437 assert_eq!(mux.next().await.unwrap(), 2);
438 assert!(mux.next().now_or_never().is_none());
439
440 send1.unbounded_send(3).unwrap();
441 handle.send(Arc::new(vec!["recv3"].into()), Box::pin(recv3) as PinStream<i32>);
442 assert_eq!(mux.next().await.unwrap(), 0);
443 assert_eq!(mux.next().await.unwrap(), 3);
444 assert!(mux.next().now_or_never().is_none());
445
446 handle.close();
447 assert!(mux.next().now_or_never().is_none(), "open substreams hold the multiplexer open");
448
449 send1.disconnect();
450 send2.disconnect();
451 send3.disconnect();
452 assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
453 }
454
455 #[fuchsia::test]
456 async fn snapshot_with_stopped_substream() {
457 let (mut send1, recv1) = futures::channel::mpsc::unbounded();
458 let (mut send2, recv2) = futures::channel::mpsc::unbounded();
459 let (mut mux, handle) =
460 Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
461 send1.unbounded_send(1).unwrap();
462 send1.disconnect();
463 handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1));
464
465 send2.unbounded_send(2).unwrap();
466 handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2));
467 handle.close();
468
469 assert_eq!(mux.next().await.unwrap(), 1);
470 assert_eq!(mux.next().await.unwrap(), 2);
471 assert!(mux.next().now_or_never().is_none());
472
473 send2.unbounded_send(3).unwrap();
474 assert_eq!(mux.next().await.unwrap(), 3);
475 assert!(mux.next().now_or_never().is_none());
476 send2.disconnect();
477 assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
478 }
479
480 #[fuchsia_async::run_singlethreaded(test)]
481 async fn multiplexer_selectors() {
482 let (mut send1, recv1) = futures::channel::mpsc::unbounded();
483 let (send2, recv2) = futures::channel::mpsc::unbounded();
484 let (mut mux, handle) = Multiplexer::<i32>::new(
485 ftrace::Id::random(),
486 Some(vec![selectors::parse_selector::<FastError>("recv1:root").unwrap()]),
487 std::iter::empty(),
488 );
489
490 handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
491 handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
492
493 assert!(mux.next().now_or_never().is_none());
495 send1.unbounded_send(1).unwrap();
496 assert!(send2.unbounded_send(2).unwrap_err().is_disconnected());
497 assert_eq!(mux.next().await.unwrap(), 1);
498 assert!(mux.next().now_or_never().is_none());
499
500 send1.disconnect();
501 handle.close();
502 assert!(mux.next().await.is_none());
503 }
504}