1use fuchsia_async as fasync;
8use futures::{future, AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _};
9use std::num::NonZeroUsize;
10use thiserror::Error;
11use zx::HandleBased as _;
12
13const SOCKET_BUFFER_SIZE: usize = 2048;
15
16const MAX_LINE_BUFFER_LENGTH: usize = 4096;
19
20#[derive(Debug, PartialEq, Eq, Error, Clone)]
22pub enum LoggerError {
23 #[error("cannot create socket: {:?}", _0)]
24 CreateSocket(zx::Status),
25
26 #[error("cannot duplicate socket: {:?}", _0)]
27 DuplicateSocket(zx::Status),
28
29 #[error("invalid socket: {:?}", _0)]
30 InvalidSocket(zx::Status),
31}
32
33#[derive(Debug, Error)]
35pub enum LogError {
36 #[error("can't get logs: {:?}", _0)]
38 Read(std::io::Error),
39
40 #[error("can't write logs: {:?}", _0)]
42 Write(std::io::Error),
43}
44
45pub fn create_std_combined_log_stream(
48) -> Result<(LoggerStream, zx::Handle, zx::Handle), LoggerError> {
49 let (client, log) = zx::Socket::create_stream();
50
51 let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
52 let clone =
53 log.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(LoggerError::DuplicateSocket)?;
54
55 Ok((stream, log.into_handle(), clone.into_handle()))
56}
57
58pub fn create_log_stream() -> Result<(LoggerStream, zx::Handle), LoggerError> {
61 let (client, log) = zx::Socket::create_stream();
62
63 let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
64
65 Ok((stream, log.into_handle()))
66}
67pub struct LogStreamReader {
69 fut: future::RemoteHandle<Result<Vec<u8>, LogError>>,
70}
71
72impl LogStreamReader {
73 pub fn new(logger: LoggerStream) -> Self {
74 let (logger_handle, logger_fut) = logger.read_to_end().remote_handle();
75 fasync::Task::spawn(logger_handle).detach();
76 Self { fut: logger_fut }
77 }
78
79 pub async fn get_logs(self) -> Result<Vec<u8>, LogError> {
81 self.fut.await
82 }
83}
84
85pub struct LoggerStream {
89 socket: fasync::Socket,
90}
91
92impl Unpin for LoggerStream {}
93
94impl LoggerStream {
95 pub fn new(socket: zx::Socket) -> Result<LoggerStream, zx::Status> {
98 let l = LoggerStream { socket: fasync::Socket::from_socket(socket) };
99 Ok(l)
100 }
101
102 pub async fn read_to_end(mut self) -> Result<Vec<u8>, LogError> {
104 let mut buffer: Vec<u8> = Vec::new();
105 let _bytes_read = self.socket.read_to_end(&mut buffer).await.map_err(LogError::Read)?;
106 Ok(buffer)
107 }
108
109 pub async fn buffer_drain_and_peek(
114 mut self,
115 writer: &mut SocketLogWriter,
116 peek_fn: Option<impl Fn(&[u8])>,
117 ) -> Result<(), LogError> {
118 let mut line_buffer: Vec<u8> = Vec::with_capacity(MAX_LINE_BUFFER_LENGTH);
119 let mut socket_buffer: Vec<u8> = vec![0; SOCKET_BUFFER_SIZE];
120
121 while let Some(bytes_read) = NonZeroUsize::new(
122 self.socket.read(&mut socket_buffer[..]).await.map_err(LogError::Read)?,
123 ) {
124 let bytes_read = bytes_read.get();
125
126 let newline_iter =
127 socket_buffer[..bytes_read].iter().enumerate().filter_map(|(i, &b)| {
128 if b == b'\n' {
129 Some(i)
130 } else {
131 None
132 }
133 });
134
135 let mut prev_offset = 0;
136 for idx in newline_iter {
137 let line = &socket_buffer[prev_offset..idx + 1];
138 if !line_buffer.is_empty() {
139 writer.write(line_buffer.drain(..).as_slice()).await?;
140 }
141 if let Some(ref peek) = &peek_fn {
142 peek(line);
143 }
144 writer.write(line).await?;
145 prev_offset = idx + 1;
146 }
147 if prev_offset != bytes_read {
148 line_buffer.extend_from_slice(&socket_buffer[prev_offset..bytes_read]);
149 }
150
151 if line_buffer.len() > MAX_LINE_BUFFER_LENGTH {
152 let bytes = &line_buffer[..MAX_LINE_BUFFER_LENGTH];
153 if let Some(ref peek) = &peek_fn {
154 peek(bytes);
155 }
156 writer.write(bytes).await?;
157 line_buffer.drain(..MAX_LINE_BUFFER_LENGTH);
158 }
159 }
160
161 if !line_buffer.is_empty() {
162 let bytes = &line_buffer[..];
163 if let Some(ref peek) = &peek_fn {
164 peek(bytes);
165 }
166 writer.write(bytes).await?;
167 }
168
169 Ok(())
170 }
171
172 pub async fn buffer_and_drain(self, writer: &mut SocketLogWriter) -> Result<(), LogError> {
174 self.buffer_drain_and_peek(writer, None::<fn(&[u8])>).await
175 }
176
177 pub fn take_socket(self) -> fasync::Socket {
179 self.socket
180 }
181}
182
183pub struct SocketLogWriter {
185 logger: fasync::Socket,
186}
187
188impl SocketLogWriter {
189 pub fn new(logger: fasync::Socket) -> Self {
190 Self { logger }
191 }
192
193 pub async fn write_str(&mut self, s: &str) -> Result<(), LogError> {
194 self.write(s.as_bytes()).await
195 }
196
197 pub async fn write(&mut self, bytes: &[u8]) -> Result<(), LogError> {
198 self.logger.write_all(bytes).await.map_err(LogError::Write)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use anyhow::{format_err, Context as _, Error};
206 use assert_matches::assert_matches;
207 use futures::{try_join, TryStreamExt as _};
208 use rand::distributions::{Alphanumeric, DistString as _};
209 use rand::thread_rng;
210 use std::sync::mpsc;
211 use test_case::test_case;
212
213 #[fuchsia_async::run_singlethreaded(test)]
214 async fn log_writer_reader_work() {
215 let (sock1, sock2) = zx::Socket::create_stream();
216 let mut log_writer = SocketLogWriter::new(fasync::Socket::from_socket(sock1));
217
218 let reader = LoggerStream::new(sock2).unwrap();
219 let reader = LogStreamReader::new(reader);
220
221 log_writer.write_str("this is string one.").await.unwrap();
222 log_writer.write_str("this is string two.").await.unwrap();
223 drop(log_writer);
224
225 let actual = reader.get_logs().await.unwrap();
226 let actual = std::str::from_utf8(&actual).unwrap();
227 assert_eq!(actual, "this is string one.this is string two.".to_owned());
228 }
229
230 #[test_case(String::from("Hello World!") ; "consumes_simple_msg")]
231 #[test_case(get_random_string(10000) ; "consumes_large_msg")]
232 #[fasync::run_singlethreaded(test)]
233 async fn logger_stream_read_to_end(msg: String) -> Result<(), Error> {
234 let (stream, tx) = create_logger_stream()?;
235
236 let () = take_and_write_to_socket(tx, &msg)?;
237 let result = stream.read_to_end().await.context("Failed to read from socket")?;
238 let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
239
240 assert_eq!(actual, msg);
241 Ok(())
242 }
243
244 #[fasync::run_singlethreaded(test)]
245 async fn logger_stream_read_to_end_consumes_concat_msgs() -> Result<(), Error> {
246 let (stream, tx) = create_logger_stream()?;
247 let msgs =
248 vec!["Hello World!".to_owned(), "Hola Mundo!".to_owned(), "你好,世界!".to_owned()];
249
250 for msg in msgs.iter() {
251 let () = write_to_socket(&tx, &msg)?;
252 }
253 std::mem::drop(tx);
254 let result = stream.read_to_end().await.context("Failed to read from socket")?;
255 let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
256
257 assert_eq!(actual, msgs.join(""));
258 Ok(())
259 }
260
261 #[fasync::run_singlethreaded(test)]
262 async fn buffer_and_drain_reads_each_line_as_a_new_message() -> Result<(), Error> {
263 let (stream, tx) = create_logger_stream()?;
264 let (mut logger, rx) = create_datagram_logger()?;
265 let msg = "Hello World\nHola Mundo!\n你好,世界!";
266
267 let (tx_peeks, rx_peeks) = mpsc::channel();
268
269 let () = take_and_write_to_socket(tx, msg)?;
270 let (actual, ()) = try_join!(read_all_messages(rx), async move {
271 stream
272 .buffer_drain_and_peek(
273 &mut logger,
274 Some(move |line: &[u8]| tx_peeks.send(line.len()).unwrap()),
275 )
276 .await
277 .context("Failed to drain stream")
278 },)?;
279
280 let expected = vec![
281 "Hello World\n".to_string(),
282 "Hola Mundo!\n".to_string(),
283 "你好,世界!".to_string(),
284 ];
285 assert_eq!(actual, expected);
286
287 let lengths = rx_peeks.iter().collect::<Vec<_>>();
288
289 assert_eq!(lengths, expected.iter().map(|v| v.len()).collect::<Vec<_>>());
290
291 Ok(())
292 }
293
294 #[fasync::run_singlethreaded(test)]
295 async fn buffer_and_drain_does_not_buffer_past_maximum_size() -> Result<(), Error> {
296 let msg = get_random_string(MAX_LINE_BUFFER_LENGTH + 10);
297 let (stream, tx) = create_logger_stream()?;
298 let (mut logger, rx) = create_datagram_logger()?;
299
300 let (tx_peeks, rx_peeks) = mpsc::channel();
301
302 let () = take_and_write_to_socket(tx, &msg)?;
303 let (actual, ()) = try_join!(read_all_messages(rx), async move {
304 stream
305 .buffer_drain_and_peek(
306 &mut logger,
307 Some(move |line: &[u8]| {
308 tx_peeks.send(line.len()).unwrap();
309 }),
310 )
311 .await
312 .context("Failed to drain stream")
313 },)?;
314
315 let lengths = rx_peeks.iter().collect::<Vec<_>>();
316
317 assert_eq!(actual.len(), 2);
318 assert_eq!(actual[0], msg[0..MAX_LINE_BUFFER_LENGTH]);
319 assert_eq!(actual[1], msg[MAX_LINE_BUFFER_LENGTH..]);
320
321 assert_eq!(lengths, vec![MAX_LINE_BUFFER_LENGTH, 10]);
322
323 Ok(())
324 }
325
326 #[fasync::run_singlethreaded(test)]
327 async fn buffer_and_drain_dumps_full_buffer_if_no_newline_seen() -> Result<(), Error> {
328 let (stream, tx) = create_logger_stream()?;
329 let (mut logger, rx) = create_datagram_logger()?;
330
331 let ((), ()) = try_join!(
332 async move {
333 let msg = get_random_string(SOCKET_BUFFER_SIZE);
334 let () = write_to_socket(&tx, &msg[..SOCKET_BUFFER_SIZE - 1])?;
337
338 let rx = rx.into_zx_socket();
341 let mut buffer = vec![0u8; SOCKET_BUFFER_SIZE];
342 let maybe_bytes_read = rx.read(&mut buffer);
343 assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
344
345 let () = write_to_socket(&tx, &msg[SOCKET_BUFFER_SIZE - 1..SOCKET_BUFFER_SIZE])?;
347
348 let maybe_bytes_read = rx.read(&mut buffer);
350 assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
351
352 std::mem::drop(tx);
354
355 let mut rx = fasync::Socket::from_socket(rx);
357 let bytes_read =
358 rx.read(&mut buffer).await.context("Failed to read from socket")?;
359 let msg_written = std::str::from_utf8(&buffer).context("Failed to parse bytes")?;
360
361 assert_eq!(bytes_read, SOCKET_BUFFER_SIZE);
362 assert_eq!(msg_written, msg);
363
364 Ok(())
365 },
366 async move { stream.buffer_and_drain(&mut logger).await.context("Failed to drain stream") },
367 )?;
368
369 Ok(())
370 }
371
372 #[fasync::run_singlethreaded(test)]
373 async fn buffer_and_drain_return_error_if_stream_polls_err() -> Result<(), Error> {
374 let (tx, rx) = zx::Socket::create_stream();
375 let () = rx.half_close()?;
377 let () = tx.half_close()?;
378 let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
379 let (mut logger, _rx) = create_datagram_logger()?;
380
381 let result = stream.buffer_and_drain(&mut logger).await;
382
383 assert_matches!(result, Err(LogError::Read(_)));
384 Ok(())
385 }
386
387 async fn read_all_messages(socket: fasync::Socket) -> Result<Vec<String>, Error> {
388 let mut results = Vec::new();
389 let mut stream = socket.into_datagram_stream();
390 while let Some(bytes) = stream.try_next().await.context("Failed to read socket stream")? {
391 results.push(
392 std::str::from_utf8(&bytes).context("Failed to parse bytes into utf8")?.to_owned(),
393 );
394 }
395
396 Ok(results)
397 }
398
399 fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
400 write_to_socket(&socket, &message)
401 }
402
403 fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
404 let bytes_written =
405 socket.write(message.as_bytes()).context("Failed to write to socket")?;
406 match bytes_written == message.len() {
407 true => Ok(()),
408 false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
409 }
410 }
411
412 fn create_datagram_logger() -> Result<(SocketLogWriter, fasync::Socket), Error> {
413 let (tx, rx) = zx::Socket::create_datagram();
414 let logger = SocketLogWriter::new(fasync::Socket::from_socket(tx));
415 let rx = fasync::Socket::from_socket(rx);
416 Ok((logger, rx))
417 }
418
419 fn create_logger_stream() -> Result<(LoggerStream, zx::Socket), Error> {
420 let (tx, rx) = zx::Socket::create_stream();
421 let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
422 Ok((stream, tx))
423 }
424
425 fn get_random_string(size: usize) -> String {
426 Alphanumeric.sample_string(&mut thread_rng(), size)
427 }
428}