1use anyhow::Context as _;
6use fidl_fuchsia_io as fio;
7use futures::future::Either;
8use futures::stream::StreamExt as _;
9use futures::{AsyncReadExt as _, AsyncWriteExt as _};
10use std::io::StdoutLock;
11use termion::raw::IntoRawMode as _;
12
13pub enum Stdout<'a> {
16 Raw(termion::raw::RawTerminal<StdoutLock<'a>>),
19 Buffered,
21}
22
23impl std::io::Write for Stdout<'_> {
24 fn flush(&mut self) -> Result<(), std::io::Error> {
25 match self {
26 Self::Raw(r) => r.flush(),
27 Self::Buffered => std::io::stdout().flush(),
28 }
29 }
30 fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
31 match self {
32 Self::Raw(r) => r.write(buf),
33 Self::Buffered => std::io::stdout().write(buf),
34 }
35 }
36}
37
38impl Stdout<'_> {
39 pub fn raw() -> anyhow::Result<Self> {
40 let stdout = std::io::stdout();
41
42 if !termion::is_tty(&stdout) {
43 anyhow::bail!("interactive mode does not support piping");
44 }
45
46 let term_out =
49 stdout.lock().into_raw_mode().context("could not set raw mode on terminal")?;
50
51 Ok(Self::Raw(term_out))
52 }
53
54 pub fn buffered() -> Self {
55 Self::Buffered
56 }
57}
58
59pub async fn connect_socket_to_stdio(
64 socket: fidl::Socket,
65 stdout: Stdout<'_>,
66) -> anyhow::Result<()> {
67 #[allow(clippy::large_futures)]
68 connect_socket_to_stdio_impl(
69 fuchsia_async::Socket::from_socket(socket),
70 || std::io::stdin().lock(),
71 stdout,
72 )?
73 .await
74}
75
76pub async fn connect_fdomain_socket_to_stdio(
78 socket: fdomain_client::Socket,
79 stdout: Stdout<'_>,
80) -> anyhow::Result<()> {
81 #[allow(clippy::large_futures)]
82 connect_socket_to_stdio_impl(socket, || std::io::stdin().lock(), stdout)?.await
83}
84
85fn connect_socket_to_stdio_impl<R>(
86 socket: impl futures::AsyncRead + futures::AsyncWrite,
87 stdin: impl FnOnce() -> R + Send + 'static,
88 mut stdout: impl std::io::Write,
89) -> anyhow::Result<impl futures::Future<Output = anyhow::Result<()>>>
90where
91 R: std::io::Read,
92{
93 let (stdin_send, mut stdin_recv) = futures::channel::mpsc::unbounded();
95 let _: std::thread::JoinHandle<_> = std::thread::Builder::new()
96 .name("connect_socket_to_stdio stdin thread".into())
97 .spawn(move || {
98 let mut stdin = stdin();
99 let mut buf = [0u8; fio::MAX_BUF as usize];
100 loop {
101 let bytes_read = stdin.read(&mut buf)?;
102 if bytes_read == 0 {
103 return Ok::<(), anyhow::Error>(());
104 }
105 let () = stdin_send.unbounded_send(buf[..bytes_read].to_vec())?;
106 }
107 })
108 .context("spawning stdin thread")?;
109
110 let (mut socket_in, mut socket_out) = socket.split();
111
112 let stdin_to_socket = async move {
113 while let Some(stdin) = stdin_recv.next().await {
114 socket_out.write_all(&stdin).await.context("writing to socket")?;
115 socket_out.flush().await.context("flushing socket")?;
116 }
117 Ok::<(), anyhow::Error>(())
118 };
119
120 let socket_to_stdout = async move {
121 loop {
122 let mut buf = [0u8; fio::MAX_BUF as usize];
123 let bytes_read = socket_in.read(&mut buf).await.context("reading from socket")?;
124 if bytes_read == 0 {
125 break;
126 }
127 stdout.write_all(&buf[..bytes_read]).context("writing to stdout")?;
128 stdout.flush().context("flushing stdout")?;
129 }
130 Ok::<(), anyhow::Error>(())
131 };
132
133 Ok(async move {
134 futures::pin_mut!(stdin_to_socket);
135 futures::pin_mut!(socket_to_stdout);
136 Ok(match futures::future::select(stdin_to_socket, socket_to_stdout).await {
137 Either::Left((stdin_to_socket, socket_to_stdout)) => {
138 let () = stdin_to_socket?;
139 let () = socket_to_stdout.await?;
143 }
144 Either::Right((socket_to_stdout, _)) => {
145 let () = socket_to_stdout?;
146 }
149 })
150 })
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[fuchsia::test]
158 async fn stdin_to_socket() {
159 let (socket, socket_remote) = fidl::Socket::create_stream();
160 let socket_remote = fuchsia_async::Socket::from_socket(socket_remote);
161
162 let connect_fut =
163 connect_socket_to_stdio_impl(socket_remote, || &b"test input"[..], vec![]).unwrap();
164
165 let (connect_res, bytes_from_socket) = futures::join!(connect_fut, async move {
166 let mut socket = fuchsia_async::Socket::from_socket(socket);
167 let mut out = vec![0u8; 100];
168 let bytes_read = socket.read(&mut out).await.unwrap();
169 drop(socket);
170 out.resize(bytes_read, 0);
171 out
172 });
173 let () = connect_res.unwrap();
174
175 assert_eq!(bytes_from_socket, &b"test input"[..]);
176 }
177
178 #[fuchsia::test]
179 async fn socket_to_stdout() {
180 let (socket, socket_remote) = fidl::Socket::create_stream();
181 assert_eq!(socket.write(&b"test input"[..]).unwrap(), 10);
182 drop(socket);
183 let mut stdout = vec![];
184 let (unblocker, block_until) = std::sync::mpsc::channel();
185
186 let socket_remote = fuchsia_async::Socket::from_socket(socket_remote);
187 #[allow(clippy::large_futures)]
188 let () = connect_socket_to_stdio_impl(
189 socket_remote,
190 move || {
191 let () = block_until.recv().unwrap();
192 &[][..]
193 },
194 &mut stdout,
195 )
196 .unwrap()
197 .await
198 .unwrap();
199
200 unblocker.send(()).unwrap();
202
203 assert_eq!(&stdout[..], &b"test input"[..]);
204 }
205}