socket_to_stdio/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use fidl_fuchsia_io as fio;
7use futures::future::Either;
8use futures::stream::StreamExt as _;
9use futures::{AsyncReadExt as _, AsyncWriteExt as _};
10use std::io::StdoutLock;
11use termion::raw::IntoRawMode as _;
12
13/// Abstracts stdout for `connect_socket_to_stdio`. Allows callers to determine if stdout should be
14/// exclusively owned for the duration of the call.
15pub enum Stdout<'a> {
16    /// Exclusive ownership of stdout (nothing else can write to stdout while this exists),
17    /// put into raw mode.
18    Raw(termion::raw::RawTerminal<StdoutLock<'a>>),
19    /// Shared ownership of stdout (output may be interleaved with output from other sources).
20    Buffered,
21}
22
23impl std::io::Write for Stdout<'_> {
24    fn flush(&mut self) -> Result<(), std::io::Error> {
25        match self {
26            Self::Raw(r) => r.flush(),
27            Self::Buffered => std::io::stdout().flush(),
28        }
29    }
30    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
31        match self {
32            Self::Raw(r) => r.write(buf),
33            Self::Buffered => std::io::stdout().write(buf),
34        }
35    }
36}
37
38impl Stdout<'_> {
39    pub fn raw() -> anyhow::Result<Self> {
40        let stdout = std::io::stdout();
41
42        if !termion::is_tty(&stdout) {
43            anyhow::bail!("interactive mode does not support piping");
44        }
45
46        // Put the host terminal into raw mode, so input characters are not echoed, streams are not
47        // buffered and newlines are not changed.
48        let term_out =
49            stdout.lock().into_raw_mode().context("could not set raw mode on terminal")?;
50
51        Ok(Self::Raw(term_out))
52    }
53
54    pub fn buffered() -> Self {
55        Self::Buffered
56    }
57}
58
59/// Concurrently:
60///   1. locks stdin and copies the input to `socket`
61///   2. reads data from `socket` and writes it to `stdout`
62/// Finishes when the remote end of the socket closes (when (2) completes).
63pub async fn connect_socket_to_stdio(
64    socket: fidl::Socket,
65    stdout: Stdout<'_>,
66) -> anyhow::Result<()> {
67    #[allow(clippy::large_futures)]
68    connect_socket_to_stdio_impl(
69        fuchsia_async::Socket::from_socket(socket),
70        || std::io::stdin().lock(),
71        stdout,
72    )?
73    .await
74}
75
76/// Same as `connect_socket_to_stdio` but operates on an FDomain socket.
77pub async fn connect_fdomain_socket_to_stdio(
78    socket: fdomain_client::Socket,
79    stdout: Stdout<'_>,
80) -> anyhow::Result<()> {
81    #[allow(clippy::large_futures)]
82    connect_socket_to_stdio_impl(socket, || std::io::stdin().lock(), stdout)?.await
83}
84
85fn connect_socket_to_stdio_impl<R>(
86    socket: impl futures::AsyncRead + futures::AsyncWrite,
87    stdin: impl FnOnce() -> R + Send + 'static,
88    mut stdout: impl std::io::Write,
89) -> anyhow::Result<impl futures::Future<Output = anyhow::Result<()>>>
90where
91    R: std::io::Read,
92{
93    // Use a separate thread to read from stdin without blocking the executor.
94    let (stdin_send, mut stdin_recv) = futures::channel::mpsc::unbounded();
95    let _: std::thread::JoinHandle<_> = std::thread::Builder::new()
96        .name("connect_socket_to_stdio stdin thread".into())
97        .spawn(move || {
98            let mut stdin = stdin();
99            let mut buf = [0u8; fio::MAX_BUF as usize];
100            loop {
101                let bytes_read = stdin.read(&mut buf)?;
102                if bytes_read == 0 {
103                    return Ok::<(), anyhow::Error>(());
104                }
105                let () = stdin_send.unbounded_send(buf[..bytes_read].to_vec())?;
106            }
107        })
108        .context("spawning stdin thread")?;
109
110    let (mut socket_in, mut socket_out) = socket.split();
111
112    let stdin_to_socket = async move {
113        while let Some(stdin) = stdin_recv.next().await {
114            socket_out.write_all(&stdin).await.context("writing to socket")?;
115            socket_out.flush().await.context("flushing socket")?;
116        }
117        Ok::<(), anyhow::Error>(())
118    };
119
120    let socket_to_stdout = async move {
121        loop {
122            let mut buf = [0u8; fio::MAX_BUF as usize];
123            let bytes_read = socket_in.read(&mut buf).await.context("reading from socket")?;
124            if bytes_read == 0 {
125                break;
126            }
127            stdout.write_all(&buf[..bytes_read]).context("writing to stdout")?;
128            stdout.flush().context("flushing stdout")?;
129        }
130        Ok::<(), anyhow::Error>(())
131    };
132
133    Ok(async move {
134        futures::pin_mut!(stdin_to_socket);
135        futures::pin_mut!(socket_to_stdout);
136        Ok(match futures::future::select(stdin_to_socket, socket_to_stdout).await {
137            Either::Left((stdin_to_socket, socket_to_stdout)) => {
138                let () = stdin_to_socket?;
139                // Wait for output even after stdin closes. The remote may be responding to the
140                // final input, or the remote may not be reading from stdin at all (consider
141                // "bash -c $CMD").
142                let () = socket_to_stdout.await?;
143            }
144            Either::Right((socket_to_stdout, _)) => {
145                let () = socket_to_stdout?;
146                // No reason to wait for stdin because the socket is closed so writing stdin to it
147                // would fail.
148            }
149        })
150    })
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[fuchsia::test]
158    async fn stdin_to_socket() {
159        let (socket, socket_remote) = fidl::Socket::create_stream();
160        let socket_remote = fuchsia_async::Socket::from_socket(socket_remote);
161
162        let connect_fut =
163            connect_socket_to_stdio_impl(socket_remote, || &b"test input"[..], vec![]).unwrap();
164
165        let (connect_res, bytes_from_socket) = futures::join!(connect_fut, async move {
166            let mut socket = fuchsia_async::Socket::from_socket(socket);
167            let mut out = vec![0u8; 100];
168            let bytes_read = socket.read(&mut out).await.unwrap();
169            drop(socket);
170            out.resize(bytes_read, 0);
171            out
172        });
173        let () = connect_res.unwrap();
174
175        assert_eq!(bytes_from_socket, &b"test input"[..]);
176    }
177
178    #[fuchsia::test]
179    async fn socket_to_stdout() {
180        let (socket, socket_remote) = fidl::Socket::create_stream();
181        assert_eq!(socket.write(&b"test input"[..]).unwrap(), 10);
182        drop(socket);
183        let mut stdout = vec![];
184        let (unblocker, block_until) = std::sync::mpsc::channel();
185
186        let socket_remote = fuchsia_async::Socket::from_socket(socket_remote);
187        #[allow(clippy::large_futures)]
188        let () = connect_socket_to_stdio_impl(
189            socket_remote,
190            move || {
191                let () = block_until.recv().unwrap();
192                &[][..]
193            },
194            &mut stdout,
195        )
196        .unwrap()
197        .await
198        .unwrap();
199
200        // let the stdin_to_socket thread finish before test cleanup
201        unblocker.send(()).unwrap();
202
203        assert_eq!(&stdout[..], &b"test input"[..]);
204    }
205}