1use anyhow::Error;
6use diagnostics_data::LogsData;
7use fidl_fuchsia_test_manager as ftest_manager;
8use futures::stream::BoxStream;
9use futures::{Stream, StreamExt};
10use log_command::LogsDataStream;
11use pin_project::pin_project;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15#[cfg(target_os = "fuchsia")]
16use crate::diagnostics::fuchsia::BatchLogStream;
17
18#[pin_project]
19pub struct LogStream {
20 #[pin]
21 stream: BoxStream<'static, Result<LogsData, Error>>,
22}
23
24impl LogStream {
25 fn new<S>(stream: S) -> Self
26 where
27 S: Stream<Item = Result<LogsData, Error>> + Send + 'static,
28 {
29 Self { stream: stream.boxed() }
30 }
31
32 pub fn from_syslog(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
33 get_log_stream(syslog)
34 }
35}
36
37impl Stream for LogStream {
38 type Item = Result<LogsData, Error>;
39 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40 let this = self.project();
41 this.stream.poll_next(cx)
42 }
43}
44
45#[cfg(target_os = "fuchsia")]
46fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
47 match syslog {
48 ftest_manager::Syslog::Batch(client_end) => {
49 Ok(LogStream::new(BatchLogStream::from_client_end(client_end)?))
50 }
51 ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
52 LogsDataStream::new(fuchsia_async::Socket::from_socket(client_end))
53 .map(|result| Ok(result)),
54 )),
55 _ => {
56 panic!("not supported")
57 }
58 }
59}
60
61#[cfg(not(target_os = "fuchsia"))]
62fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
63 match syslog {
64 ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
65 LogsDataStream::new(fuchsia_async::Socket::from_socket(client_end))
66 .map(|result| Ok(result)),
67 )),
68 ftest_manager::Syslog::Batch(_) => panic!("batch iterator not supported on host"),
69 _ => {
70 panic!("not supported")
71 }
72 }
73}
74
75#[cfg(target_os = "fuchsia")]
76mod fuchsia {
77 use super::*;
78 use diagnostics_reader::Subscription;
79 use fidl::endpoints::ClientEnd;
80 use fidl_fuchsia_diagnostics::BatchIteratorMarker;
81
82 #[pin_project]
83 pub struct BatchLogStream {
84 #[pin]
85 subscription: Subscription,
86 }
87
88 impl BatchLogStream {
89 #[cfg(test)]
90 pub fn new() -> Result<(Self, ftest_manager::LogsIterator), fidl::Error> {
91 let (proxy, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
92 let subscription = Subscription::new(proxy);
93 Ok((Self { subscription }, ftest_manager::LogsIterator::Batch(server_end)))
94 }
95
96 pub fn from_client_end(
97 client_end: ClientEnd<BatchIteratorMarker>,
98 ) -> Result<Self, fidl::Error> {
99 Ok(Self { subscription: Subscription::new(client_end.into_proxy()) })
100 }
101 }
102
103 impl Stream for BatchLogStream {
104 type Item = Result<LogsData, Error>;
105 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106 let this = self.project();
107 match this.subscription.poll_next(cx) {
108 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
109 Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
110 Poll::Ready(None) => Poll::Ready(None),
111 Poll::Pending => Poll::Pending,
112 }
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use diagnostics_data::{ExtendedMoniker, Severity, Timestamp};
121 use fuchsia_async as fasync;
122
123 #[cfg(target_os = "fuchsia")]
124 mod fuchsia {
125 use super::*;
126 use assert_matches::assert_matches;
127 use fidl::endpoints::ServerEnd;
128 use fidl_fuchsia_diagnostics::{
129 BatchIteratorMarker, BatchIteratorRequest, FormattedContent, ReaderError,
130 };
131 use fidl_fuchsia_mem as fmem;
132 use futures::TryStreamExt;
133
134 fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
135 let (stream, iterator) = BatchLogStream::new()?;
136 Ok((LogStream::new(stream), iterator))
137 }
138
139 struct BatchIteratorOpts {
140 with_error: bool,
141 }
142
143 async fn spawn_batch_iterator_server(
146 server_end: ServerEnd<BatchIteratorMarker>,
147 opts: BatchIteratorOpts,
148 ) {
149 let mut request_stream = server_end.into_stream();
150 let mut values = vec![1i64, 2, 3].into_iter();
151 while let Some(request) = request_stream.try_next().await.expect("get next request") {
152 match request {
153 BatchIteratorRequest::WaitForReady { responder } => {
154 responder.send().expect("sent response")
155 }
156 BatchIteratorRequest::GetNext { responder } => match values.next() {
157 None => {
158 responder.send(Ok(vec![])).expect("send empty response");
159 }
160 Some(value) => {
161 if opts.with_error {
162 responder.send(Err(ReaderError::Io)).expect("send error");
163 continue;
164 }
165 let content = get_json_data(value);
166 let size = content.len() as u64;
167 let vmo = zx::Vmo::create(size).expect("create vmo");
168 vmo.write(content.as_bytes(), 0).expect("write vmo");
169 let result = FormattedContent::Json(fmem::Buffer { vmo, size });
170 responder.send(Ok(vec![result])).expect("send response");
171 }
172 },
173 BatchIteratorRequest::_UnknownMethod { .. } => {
174 unreachable!("We aren't expecting any other call");
175 }
176 }
177 }
178 }
179
180 #[fasync::run_singlethreaded(test)]
181 async fn log_stream_returns_logs() {
182 let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
183 let server_end = match iterator {
184 ftest_manager::LogsIterator::Batch(server_end) => server_end,
185 _ => panic!("unexpected logs iterator server end"),
186 };
187 fasync::Task::spawn(spawn_batch_iterator_server(
188 server_end,
189 BatchIteratorOpts { with_error: false },
190 ))
191 .detach();
192 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
193 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
194 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
195 assert_matches!(log_stream.next().await, None);
196 }
197
198 #[fasync::run_singlethreaded(test)]
199 async fn log_stream_can_return_errors() {
200 let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
201 let server_end = match iterator {
202 ftest_manager::LogsIterator::Batch(server_end) => server_end,
203 _ => panic!("unexpected logs iterator server end"),
204 };
205 fasync::Task::spawn(spawn_batch_iterator_server(
206 server_end,
207 BatchIteratorOpts { with_error: true },
208 ))
209 .detach();
210 assert_matches!(log_stream.next().await, Some(Err(_)));
211 }
212 }
213
214 #[cfg(not(target_os = "fuchsia"))]
215 mod host {
216 use super::*;
217 use assert_matches::assert_matches;
218 use futures::AsyncWriteExt;
219
220 fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
221 let (client_end, server_end) = fidl::Socket::create_stream();
222 let (stream, iterator) = (
223 LogStream::new(
224 LogsDataStream::new(fuchsia_async::Socket::from_socket(client_end))
225 .map(|result| Ok(result)),
226 ),
227 ftest_manager::LogsIterator::Stream(server_end),
228 );
229 Ok((LogStream::new(stream), iterator))
230 }
231
232 async fn spawn_archive_iterator_server(socket: fidl::Socket, with_error: bool) {
233 let mut socket = fuchsia_async::Socket::from_socket(socket);
234 let mut values = vec![1, 2, 3].into_iter();
235 loop {
236 match values.next() {
237 None => {
238 break;
240 }
241 Some(value) => {
242 if with_error {
243 let _ = socket.write_all("5".as_bytes()).await;
246 continue;
247 }
248 socket.write_all(get_json_data(value).as_bytes()).await.unwrap();
249 }
250 }
251 }
252 }
253
254 async fn archive_stream_returns_logs() {
255 let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
256 let server_end = match iterator {
257 ftest_manager::LogsIterator::Stream(server_end) => server_end,
258 _ => panic!("unexpected logs iterator server end"),
259 };
260 fasync::Task::spawn(spawn_archive_iterator_server(server_end, false)).detach();
261 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
262 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
263 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
264 assert_matches!(log_stream.next().await, None);
265 }
266
267 #[fasync::run_singlethreaded(test)]
268 async fn archive_stream_returns_logs_inline() {
269 archive_stream_returns_logs().await;
270 }
271
272 async fn archive_stream_can_return_errors() {
273 let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
274 let server_end = match iterator {
275 ftest_manager::LogsIterator::Stream(server_end) => server_end,
276 _ => panic!("unexpected logs iterator server end"),
277 };
278 fasync::Task::spawn(spawn_archive_iterator_server(server_end, true)).detach();
279 assert_matches!(log_stream.next().await, None);
280 }
281
282 #[fasync::run_singlethreaded(test)]
283 async fn archive_stream_can_return_errors_inline() {
284 archive_stream_can_return_errors().await;
285 }
286 }
287
288 fn get_json_data(value: i64) -> String {
289 let data = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
290 timestamp: Timestamp::from_nanos(0),
291 component_url: Some("fake-url".into()),
292 moniker: ExtendedMoniker::parse_str("test/moniker").unwrap(),
293 severity: Severity::Info,
294 })
295 .set_message(value.to_string())
296 .build();
297
298 serde_json::to_string(&data).expect("serialize to json")
299 }
300}