1use anyhow::{Error, Result};
6use diagnostics_data::LogsData;
7use flex_fuchsia_test_manager as ftest_manager;
8use futures::stream::BoxStream;
9use futures::{Stream, StreamExt};
10#[cfg(not(feature = "fdomain"))]
11use log_command::LogsDataStream;
12#[cfg(feature = "fdomain")]
13use log_command_fdomain::LogsDataStream;
14use pin_project::pin_project;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17
18#[cfg(target_os = "fuchsia")]
19use crate::diagnostics::fuchsia::BatchLogStream;
20
21#[pin_project]
22pub struct LogStream {
23 #[pin]
24 stream: BoxStream<'static, Result<LogsData, Error>>,
25}
26
27impl LogStream {
28 fn new<S>(stream: S) -> Self
29 where
30 S: Stream<Item = Result<LogsData, Error>> + Send + 'static,
31 {
32 Self { stream: stream.boxed() }
33 }
34
35 pub fn from_syslog(syslog: ftest_manager::Syslog) -> Result<LogStream> {
36 get_log_stream(syslog)
37 }
38}
39
40impl Stream for LogStream {
41 type Item = Result<LogsData, Error>;
42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 let this = self.project();
44 this.stream.poll_next(cx)
45 }
46}
47
48#[cfg(target_os = "fuchsia")]
49fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream> {
50 match syslog {
51 ftest_manager::Syslog::Batch(client_end) => {
52 Ok(LogStream::new(BatchLogStream::from_client_end(client_end)?))
53 }
54 ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
55 LogsDataStream::new(flex_client::socket_to_async(client_end))
56 .map(|result| result.map_err(Error::from)),
57 )),
58 _ => {
59 panic!("not supported")
60 }
61 }
62}
63
64#[cfg(not(target_os = "fuchsia"))]
65fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream> {
66 match syslog {
67 ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
68 LogsDataStream::new(flex_client::socket_to_async(client_end))
69 .map(|result| result.map_err(Error::from)),
70 )),
71 ftest_manager::Syslog::Batch(_) => panic!("batch iterator not supported on host"),
72 _ => {
73 panic!("not supported")
74 }
75 }
76}
77
78#[cfg(target_os = "fuchsia")]
79mod fuchsia {
80 use super::*;
81 use diagnostics_reader::Subscription;
82 use fidl::endpoints::ClientEnd;
83 use fidl_fuchsia_diagnostics::BatchIteratorMarker;
84
85 #[pin_project]
86 pub struct BatchLogStream {
87 #[pin]
88 subscription: Subscription,
89 }
90
91 impl BatchLogStream {
92 #[cfg(test)]
93 pub fn new() -> Result<(Self, ftest_manager::LogsIterator), fidl::Error> {
94 let (proxy, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
95 let subscription = Subscription::new(proxy);
96 Ok((Self { subscription }, ftest_manager::LogsIterator::Batch(server_end)))
97 }
98
99 pub fn from_client_end(
100 client_end: ClientEnd<BatchIteratorMarker>,
101 ) -> Result<Self, fidl::Error> {
102 Ok(Self { subscription: Subscription::new(client_end.into_proxy()) })
103 }
104 }
105
106 impl Stream for BatchLogStream {
107 type Item = Result<LogsData, Error>;
108 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
109 let this = self.project();
110 match this.subscription.poll_next(cx) {
111 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
112 Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
113 Poll::Ready(None) => Poll::Ready(None),
114 Poll::Pending => Poll::Pending,
115 }
116 }
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use diagnostics_data::{ExtendedMoniker, Severity, Timestamp};
124 use fuchsia_async as fasync;
125
126 #[cfg(target_os = "fuchsia")]
127 mod fuchsia {
128 use super::*;
129 use assert_matches::assert_matches;
130 use fidl::endpoints::ServerEnd;
131 use fidl_fuchsia_diagnostics::{
132 BatchIteratorMarker, BatchIteratorRequest, FormattedContent, ReaderError,
133 };
134 use fidl_fuchsia_mem as fmem;
135 use futures::TryStreamExt;
136
137 fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
138 let (stream, iterator) = BatchLogStream::new()?;
139 Ok((LogStream::new(stream), iterator))
140 }
141
142 struct BatchIteratorOpts {
143 with_error: bool,
144 }
145
146 async fn spawn_batch_iterator_server(
149 server_end: ServerEnd<BatchIteratorMarker>,
150 opts: BatchIteratorOpts,
151 ) {
152 let mut request_stream = server_end.into_stream();
153 let mut values = vec![1i64, 2, 3].into_iter();
154 while let Some(request) = request_stream.try_next().await.expect("get next request") {
155 match request {
156 BatchIteratorRequest::WaitForReady { responder } => {
157 responder.send().expect("sent response")
158 }
159 BatchIteratorRequest::GetNext { responder } => match values.next() {
160 None => {
161 responder.send(Ok(vec![])).expect("send empty response");
162 }
163 Some(value) => {
164 if opts.with_error {
165 responder.send(Err(ReaderError::Io)).expect("send error");
166 continue;
167 }
168 let content = get_json_data(value);
169 let size = content.len() as u64;
170 let vmo = zx::Vmo::create(size).expect("create vmo");
171 vmo.write(content.as_bytes(), 0).expect("write vmo");
172 let result = FormattedContent::Json(fmem::Buffer { vmo, size });
173 responder.send(Ok(vec![result])).expect("send response");
174 }
175 },
176 BatchIteratorRequest::_UnknownMethod { .. } => {
177 unreachable!("We aren't expecting any other call");
178 }
179 }
180 }
181 }
182
183 #[fasync::run_singlethreaded(test)]
184 async fn log_stream_returns_logs() {
185 let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
186 let server_end = match iterator {
187 ftest_manager::LogsIterator::Batch(server_end) => server_end,
188 _ => panic!("unexpected logs iterator server end"),
189 };
190 fasync::Task::spawn(spawn_batch_iterator_server(
191 server_end,
192 BatchIteratorOpts { with_error: false },
193 ))
194 .detach();
195 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
196 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
197 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
198 assert_matches!(log_stream.next().await, None);
199 }
200
201 #[fasync::run_singlethreaded(test)]
202 async fn log_stream_can_return_errors() {
203 let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
204 let server_end = match iterator {
205 ftest_manager::LogsIterator::Batch(server_end) => server_end,
206 _ => panic!("unexpected logs iterator server end"),
207 };
208 fasync::Task::spawn(spawn_batch_iterator_server(
209 server_end,
210 BatchIteratorOpts { with_error: true },
211 ))
212 .detach();
213 assert_matches!(log_stream.next().await, Some(Err(_)));
214 }
215 }
216
217 #[cfg(not(target_os = "fuchsia"))]
218 mod host {
219 use super::*;
220 use assert_matches::assert_matches;
221 use futures::AsyncWriteExt;
222
223 fn create_log_stream(
224 client: flex_client::ClientArg,
225 ) -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
226 let (client_end, server_end) = client.create_stream_socket();
227 let (stream, iterator) = (
228 LogStream::new(
229 LogsDataStream::new(flex_client::socket_to_async(client_end))
230 .map(|result| result.map_err(Error::from)),
231 ),
232 ftest_manager::LogsIterator::Stream(server_end),
233 );
234 Ok((LogStream::new(stream), iterator))
235 }
236
237 async fn spawn_archive_iterator_server(socket: flex_client::Socket, with_error: bool) {
238 let mut socket = flex_client::socket_to_async(socket);
239 let mut values = vec![1, 2, 3].into_iter();
240 loop {
241 match values.next() {
242 None => {
243 break;
245 }
246 Some(value) => {
247 if with_error {
248 let _ = socket.write_all("5".as_bytes()).await;
251 continue;
252 }
253 socket.write_all(get_json_data(value).as_bytes()).await.unwrap();
254 }
255 }
256 }
257 }
258
259 async fn archive_stream_returns_logs(client: flex_client::ClientArg) {
260 let (mut log_stream, iterator) = create_log_stream(client).expect("got log stream");
261 let server_end = match iterator {
262 ftest_manager::LogsIterator::Stream(server_end) => server_end,
263 _ => panic!("unexpected logs iterator server end"),
264 };
265 fasync::Task::spawn(spawn_archive_iterator_server(server_end, false)).detach();
266 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
267 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
268 assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
269 assert_matches!(log_stream.next().await, None);
270 }
271
272 #[fasync::run_singlethreaded(test)]
273 async fn archive_stream_returns_logs_inline() {
274 let client = fdomain_local::local_client_empty();
275 archive_stream_returns_logs(client.clone()).await;
276 }
277
278 async fn archive_stream_can_return_errors(client: flex_client::ClientArg) {
279 let (mut log_stream, iterator) = create_log_stream(client).expect("got log stream");
280 let server_end = match iterator {
281 ftest_manager::LogsIterator::Stream(server_end) => server_end,
282 _ => panic!("unexpected logs iterator server end"),
283 };
284 fasync::Task::spawn(spawn_archive_iterator_server(server_end, true)).detach();
285 assert_matches!(log_stream.next().await, Some(Err(_)));
286 }
287
288 #[fasync::run_singlethreaded(test)]
289 async fn archive_stream_can_return_errors_inline() {
290 let client = fdomain_local::local_client_empty();
291 archive_stream_can_return_errors(client.clone()).await;
292 }
293 }
294
295 fn get_json_data(value: i64) -> String {
296 let data = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
297 timestamp: Timestamp::from_nanos(0),
298 component_url: Some("fake-url".into()),
299 moniker: ExtendedMoniker::parse_str("test/moniker").unwrap(),
300 severity: Severity::Info,
301 })
302 .set_message(value.to_string())
303 .build();
304
305 serde_json::to_string(&data).expect("serialize to json")
306 }
307}