1use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::channel::mpsc;
10use futures::prelude::*;
11use futures::ready;
12use futures::task::{Context, Poll};
13use std::cell::RefCell;
14use std::io::Write;
15use std::pin::Pin;
16use std::sync::Arc;
17
18pub use crate::diagnostics::LogStream;
19
20mod diagnostics;
21pub mod zstd_compress;
22
23thread_local! {
24 static BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0; 1024*1024*2]);
25}
26
27pub struct SocketReadFut<'a, T, F>
29where
30 F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
31{
32 socket: &'a mut fidl::AsyncSocket,
33 on_read_fn: F,
34}
35
36impl<'a, T, F> SocketReadFut<'a, T, F>
37where
38 F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
39{
40 pub fn new(socket: &'a mut fidl::AsyncSocket, on_read_fn: F) -> Self {
41 Self { socket, on_read_fn }
42 }
43}
44
45impl<'a, T, F> Future for SocketReadFut<'a, T, F>
46where
47 F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
48{
49 type Output = Result<T, std::io::Error>;
50 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51 BUFFER.with(|b| {
52 let mut b = b.borrow_mut();
53 let mut_self = self.get_mut();
54 let len = ready!(Pin::new(&mut mut_self.socket).poll_read(cx, &mut *b)?);
55 match len {
56 0 => Poll::Ready((mut_self.on_read_fn)(None)),
57 l => Poll::Ready((mut_self.on_read_fn)(Some(&b[..l]))),
58 }
59 })
60 }
61}
62
63pub async fn collect_string_from_socket(socket: fidl::Socket) -> Result<String, anyhow::Error> {
64 let (s, mut r) = mpsc::channel(1024);
65 let task = fasync::Task::spawn(collect_and_send_string_output(socket, s));
66 let mut ret = String::new();
67 while let Some(content) = r.next().await {
68 ret.push_str(&content);
69 }
70 task.await?;
71 Ok(ret)
72}
73
74pub async fn collect_and_send_string_output(
75 socket: fidl::Socket,
76 mut sender: mpsc::Sender<String>,
77) -> Result<(), anyhow::Error> {
78 let mut async_socket = fidl::AsyncSocket::from_socket(socket);
79 loop {
80 let maybe_string = SocketReadFut::new(&mut async_socket, |maybe_buf| {
81 Ok(maybe_buf.map(|buf| String::from_utf8_lossy(buf).into()))
82 })
83 .await?;
84 match maybe_string {
85 Some(string) => sender.send(string).await?,
86 None => return Ok(()),
87 }
88 }
89}
90
91pub struct StdoutBuffer<W: Write + Send + 'static> {
98 inner: Arc<Mutex<StdoutBufferInner<W>>>,
99 _timer: fuchsia_async::Task<()>,
100}
101
102impl<W: Write + Send + 'static> StdoutBuffer<W> {
103 pub fn new(duration: std::time::Duration, writer: W, max_capacity: usize) -> Self {
109 let (inner, timer) = StdoutBufferInner::new(duration, writer, max_capacity);
110 Self { inner, _timer: timer }
111 }
112}
113
114impl<W: Write + Send + 'static> Write for StdoutBuffer<W> {
115 fn flush(&mut self) -> Result<(), std::io::Error> {
116 self.inner.lock().flush()
117 }
118
119 fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
120 self.inner.lock().write(bytes)
121 }
122}
123
124struct StdoutBufferInner<W: Write + Send + 'static> {
125 writer: W,
126 buffer: Option<Vec<u8>>,
128 stop_buffer_error: Option<std::io::Error>,
129 max_capacity: usize,
130}
131
132impl<W: Write + Send + 'static> StdoutBufferInner<W> {
133 fn new(
134 duration: std::time::Duration,
135 writer: W,
136 max_capacity: usize,
137 ) -> (Arc<Mutex<Self>>, fuchsia_async::Task<()>) {
138 let new_self = Arc::new(Mutex::new(StdoutBufferInner {
139 writer,
140 buffer: Some(Vec::with_capacity(max_capacity)),
141 stop_buffer_error: None,
142 max_capacity,
143 }));
144
145 let timer = fuchsia_async::Timer::new(duration);
146 let log_buffer = Arc::downgrade(&new_self);
147 let f = async move {
148 timer.await;
149 if let Some(log_buffer) = log_buffer.upgrade() {
150 log_buffer.lock().stop_buffering();
151 }
152 };
153
154 (new_self, fuchsia_async::Task::spawn(f))
155 }
156
157 fn stop_buffering(&mut self) {
158 if let Some(buf) = self.buffer.take() {
159 if let Err(e) = self.writer.write_all(&buf) {
160 self.stop_buffer_error = Some(e);
161 }
162 }
163 }
164}
165
166impl<W: Write + Send + 'static> Write for StdoutBufferInner<W> {
167 fn flush(&mut self) -> Result<(), std::io::Error> {
168 self.stop_buffering();
169 match self.stop_buffer_error.take() {
170 Some(e) => Err(e),
171 None => self.writer.flush(),
172 }
173 }
174
175 fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
176 if let Some(e) = self.stop_buffer_error.take() {
177 return Err(e);
178 }
179 match self.buffer.as_mut() {
180 None => self.writer.write(bytes),
181 Some(buf) if buf.len() + bytes.len() > self.max_capacity => {
182 self.writer.write_all(&buf)?;
183 buf.truncate(0);
184 self.writer.write(bytes)
185 }
186 Some(buf) => Write::write(buf, bytes),
187 }
188 }
189}
190
191impl<W: Write + Send + 'static> Drop for StdoutBufferInner<W> {
192 fn drop(&mut self) {
193 let _ = self.flush();
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use fidl::HandleBased;
201 use futures::StreamExt;
202 use pretty_assertions::assert_eq;
203
204 async fn collect_until_eq<S: Stream<Item = String> + Unpin>(mut stream: S, target: &str) {
205 let mut accumulator = "".to_string();
206 while accumulator.len() < target.len() {
207 match stream.next().await {
208 Some(string) => accumulator.push_str(&string),
209 None => panic!(
210 "Expected string '{}' but stream terminated after '{}'",
211 target, accumulator
212 ),
213 }
214 }
215 assert_eq!(target, accumulator);
216 }
217
218 #[fuchsia_async::run_singlethreaded(test)]
219 async fn collect_test_stdout() {
220 let (sock_server, sock_client) = fidl::Socket::create_stream();
221
222 let (sender, mut recv) = mpsc::channel(1);
223
224 let fut =
225 fuchsia_async::Task::spawn(collect_and_send_string_output(sock_client, sender.into()));
226
227 sock_server.write(b"test message 1").expect("Can't write msg to socket");
228 sock_server.write(b"test message 2").expect("Can't write msg to socket");
229 sock_server.write(b"test message 3").expect("Can't write msg to socket");
230
231 collect_until_eq(&mut recv, "test message 1test message 2test message 3").await;
232
233 sock_server.write(b"test message 4").expect("Can't write msg to socket");
235 collect_until_eq(&mut recv, "test message 4").await;
236
237 sock_server.write(b"test message 5").expect("Can't write msg to socket");
239 sock_server.into_handle(); fut.await.expect("log collection should not fail");
241 collect_until_eq(&mut recv, "test message 5").await;
242
243 let msg = recv.next().await;
245 assert_eq!(msg, None);
246 }
247
248 #[cfg(target_os = "fuchsia")]
250 mod stdout {
251 use super::*;
252 use fuchsia_async::TestExecutor;
253
254 use pretty_assertions::assert_eq;
255 use std::ops::Add;
256
257 struct MutexBytes(Arc<Mutex<Vec<u8>>>);
258
259 impl Write for MutexBytes {
260 fn flush(&mut self) -> Result<(), std::io::Error> {
261 Write::flush(&mut *self.0.lock())
262 }
263
264 fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
265 Write::write(&mut *self.0.lock(), bytes)
266 }
267 }
268
269 #[test]
270 fn log_buffer_without_timeout() {
271 let mut executor = TestExecutor::new_with_fake_time();
272 let output = Arc::new(Mutex::new(vec![]));
273 let writer = MutexBytes(output.clone());
274 let (log_buffer, mut timeout_task) =
275 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
276
277 write!(log_buffer.lock(), "message1").expect("write message");
278 assert_eq!(*output.lock(), b"");
279 write!(log_buffer.lock(), "message2").expect("write message");
280 assert_eq!(*output.lock(), b"");
281
282 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
283 assert_eq!(*output.lock(), b"");
284
285 log_buffer.lock().flush().expect("flush buffer");
286 assert_eq!(*output.lock(), b"message1message2");
287 }
288
289 #[test]
290 fn log_buffer_flush_on_drop() {
291 let mut executor = TestExecutor::new_with_fake_time();
292 let output = Arc::new(Mutex::new(vec![]));
293 let writer = MutexBytes(output.clone());
294 let (log_buffer, mut timeout_task) =
295 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
296
297 write!(log_buffer.lock(), "message1").expect("write message");
298 assert_eq!(*output.lock(), b"");
299 write!(log_buffer.lock(), "message2").expect("write message");
300 assert_eq!(*output.lock(), b"");
301
302 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
303 assert_eq!(*output.lock(), b"");
304
305 drop(log_buffer);
306 assert_eq!(*output.lock(), b"message1message2");
307 }
308
309 #[test]
310 fn log_buffer_with_timeout() {
311 let mut executor = TestExecutor::new_with_fake_time();
312 let output = Arc::new(Mutex::new(vec![]));
313 let writer = MutexBytes(output.clone());
314 let (log_buffer, mut timeout_task) =
315 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
316
317 write!(log_buffer.lock(), "message1").expect("write message");
318 assert_eq!(*output.lock(), b"");
319 write!(log_buffer.lock(), "message2").expect("write message");
320 assert_eq!(*output.lock(), b"");
321
322 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
323 assert_eq!(*output.lock(), b"");
324
325 executor.set_fake_time(executor.now().add(zx::MonotonicDuration::from_seconds(6)));
326 executor.wake_next_timer();
327 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Ready(()));
328 assert_eq!(*output.lock(), b"message1message2");
329 }
330
331 #[test]
332 fn log_buffer_capacity_reached() {
333 let _executor = TestExecutor::new_with_fake_time();
334 let output = Arc::new(Mutex::new(vec![]));
335 let writer = MutexBytes(output.clone());
336 let (log_buffer, _timeout_task) =
337 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 10);
338
339 write!(log_buffer.lock(), "message1").expect("write message");
340 assert_eq!(*output.lock(), b"");
341 write!(log_buffer.lock(), "message2").expect("write message");
342 assert_eq!(*output.lock(), b"message1message2");
343
344 write!(log_buffer.lock(), "message1").expect("write message");
346 assert_eq!(*output.lock(), b"message1message2");
347 write!(log_buffer.lock(), "message2").expect("write message");
348 assert_eq!(*output.lock(), b"message1message2message1message2");
349 }
350 }
351}