validating_log_listener/
lib.rs1#![warn(missing_docs)]
8
9use fidl_fuchsia_logger::{
10 LogFilterOptions, LogListenerSafeMarker, LogListenerSafeRequest, LogListenerSafeRequestStream,
11 LogMessage, LogProxy,
12};
13use fuchsia_async as fasync;
14use futures::channel::mpsc::{channel, Receiver, Sender};
15use futures::sink::SinkExt;
16use futures::stream::StreamExt;
17
18pub async fn validate_log_stream(
25 expected: impl IntoIterator<Item = LogMessage>,
26 proxy: LogProxy,
27 filter_options: Option<LogFilterOptions>,
28) {
29 ValidatingListener::new(expected).run(proxy, filter_options, false).await;
30}
31
32pub async fn validate_log_dump(
40 expected: impl IntoIterator<Item = LogMessage>,
41 proxy: LogProxy,
42 filter_options: Option<LogFilterOptions>,
43) {
44 ValidatingListener::new(expected).run(proxy, filter_options, true).await;
45}
46
47enum Outcome {
48 AllExpectedReceived,
49 LogSentDone,
50 UnexpectedMessage(LogMessage),
51}
52
53struct ValidatingListener {
55 expected: Vec<LogMessage>,
56 outcomes: Option<Receiver<Outcome>>,
57 send_outcomes: Sender<Outcome>,
58}
59
60impl ValidatingListener {
61 fn new(expected: impl IntoIterator<Item = LogMessage>) -> Self {
62 let (send_outcomes, outcomes) = channel(3);
63 Self { expected: expected.into_iter().collect(), send_outcomes, outcomes: Some(outcomes) }
64 }
65
66 async fn run(
69 mut self,
70 proxy: LogProxy,
71 filter_options: Option<LogFilterOptions>,
72 dump_logs: bool,
73 ) {
74 let (client_end, stream) =
75 fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
76 let filter_options = filter_options.as_ref();
77
78 if dump_logs {
79 proxy.dump_logs_safe(client_end, filter_options).expect("failed to register listener");
80 } else {
81 proxy.listen_safe(client_end, filter_options).expect("failed to register listener");
82 }
83
84 let mut sink_says_done = false;
85 let mut all_expected = false;
86 let mut outcomes = self.outcomes.take().unwrap();
87 fasync::Task::spawn(self.handle_stream(stream)).detach();
88
89 'observe_outcomes: while let Some(outcome) = outcomes.next().await {
90 match outcome {
91 Outcome::AllExpectedReceived => all_expected = true,
92 Outcome::LogSentDone => sink_says_done = true,
93 Outcome::UnexpectedMessage(msg) => panic!("unexpected log message {:?}", msg),
94 }
95
96 if all_expected && (!dump_logs || sink_says_done) {
97 break 'observe_outcomes;
101 }
102 }
103
104 if dump_logs {
105 assert!(sink_says_done, "must have received all expected messages");
106 } else {
107 assert!(all_expected, "must have received all expected messages");
109 }
110 }
111
112 async fn handle_stream(mut self, mut stream: LogListenerSafeRequestStream) {
113 while let Some(Ok(req)) = stream.next().await {
114 self.handle_request(req).await;
115 }
116 }
117
118 async fn handle_request(&mut self, req: LogListenerSafeRequest) {
119 match req {
120 LogListenerSafeRequest::Log { log, responder } => {
121 self.log(log).await;
122 responder.send().ok();
123 }
124 LogListenerSafeRequest::LogMany { log, responder } => {
125 for msg in log {
126 self.log(msg).await;
127 }
128 responder.send().ok();
129 }
130 LogListenerSafeRequest::Done { .. } => {
131 self.send_outcomes.send(Outcome::LogSentDone).await.unwrap();
132 }
133 }
134 }
135
136 async fn log(&mut self, received: LogMessage) {
137 if let Some((i, _)) = self.expected.iter().enumerate().find(|(_, expected)| {
138 expected.msg == received.msg
139 && expected.tags == received.tags
140 && expected.severity == received.severity
141 }) {
142 self.expected.remove(i);
143 if self.expected.is_empty() {
144 self.send_outcomes.send(Outcome::AllExpectedReceived).await.unwrap();
145 }
146 } else {
147 self.send_outcomes.send(Outcome::UnexpectedMessage(received)).await.unwrap();
148 }
149 }
150}