wait_timeout/
unix.rs

1//! Unix implementation of waiting for children with timeouts
2//!
3//! On unix, wait() and its friends have no timeout parameters, so there is
4//! no way to time out a thread in wait(). From some googling and some
5//! thinking, it appears that there are a few ways to handle timeouts in
6//! wait(), but the only real reasonable one for a multi-threaded program is
7//! to listen for SIGCHLD.
8//!
9//! With this in mind, the waiting mechanism with a timeout only uses
10//! waitpid() with WNOHANG, but otherwise all the necessary blocking is done by
11//! waiting for a SIGCHLD to arrive (and that blocking has a timeout). Note,
12//! however, that waitpid() is still used to actually reap the child.
13//!
14//! Signal handling is super tricky in general, and this is no exception. Due
15//! to the async nature of SIGCHLD, we use the self-pipe trick to transmit
16//! data out of the signal handler to the rest of the application.
17
18#![allow(bad_style)]
19
20use std::cmp;
21use std::collections::HashMap;
22use std::io::{self, Read, Write};
23use std::mem;
24use std::os::unix::net::UnixStream;
25use std::os::unix::prelude::*;
26use std::process::{Child, ExitStatus};
27use std::sync::{Mutex, Once};
28use std::time::{Duration, Instant};
29
30use libc::{self, c_int};
31
32static INIT: Once = Once::new();
33static mut STATE: *mut State = 0 as *mut _;
34
35struct State {
36    prev: libc::sigaction,
37    write: UnixStream,
38    read: UnixStream,
39    map: Mutex<StateMap>,
40}
41
42type StateMap = HashMap<*mut Child, (UnixStream, Option<ExitStatus>)>;
43
44pub fn wait_timeout(child: &mut Child, dur: Duration) -> io::Result<Option<ExitStatus>> {
45    INIT.call_once(State::init);
46    unsafe { (*STATE).wait_timeout(child, dur) }
47}
48
49impl State {
50    #[allow(unused_assignments)]
51    fn init() {
52        unsafe {
53            // Create our "self pipe" and then set both ends to nonblocking
54            // mode.
55            let (read, write) = UnixStream::pair().unwrap();
56            read.set_nonblocking(true).unwrap();
57            write.set_nonblocking(true).unwrap();
58
59            let state = Box::new(State {
60                prev: mem::zeroed(),
61                write: write,
62                read: read,
63                map: Mutex::new(HashMap::new()),
64            });
65
66            // Register our sigchld handler
67            let mut new: libc::sigaction = mem::zeroed();
68            new.sa_sigaction = sigchld_handler as usize;
69            new.sa_flags = libc::SA_NOCLDSTOP | libc::SA_RESTART | libc::SA_SIGINFO;
70
71            STATE = Box::into_raw(state);
72
73            assert_eq!(libc::sigaction(libc::SIGCHLD, &new, &mut (*STATE).prev), 0);
74        }
75    }
76
77    fn wait_timeout(&self, child: &mut Child, dur: Duration) -> io::Result<Option<ExitStatus>> {
78        // First up, prep our notification pipe which will tell us when our
79        // child has been reaped (other threads may signal this pipe).
80        let (read, write) = UnixStream::pair()?;
81        read.set_nonblocking(true)?;
82        write.set_nonblocking(true)?;
83
84        // Next, take a lock on the map of children currently waiting. Right
85        // after this, **before** we add ourselves to the map, we check to see
86        // if our child has actually already exited via a `try_wait`. If the
87        // child has exited then we return immediately as we'll never otherwise
88        // receive a SIGCHLD notification.
89        //
90        // If the wait reports the child is still running, however, we add
91        // ourselves to the map and then block in `select` waiting for something
92        // to happen.
93        let mut map = self.map.lock().unwrap();
94        if let Some(status) = child.try_wait()? {
95            return Ok(Some(status));
96        }
97        assert!(map.insert(child, (write, None)).is_none());
98        drop(map);
99
100        // Make sure that no matter what when we exit our pointer is removed
101        // from the map.
102        struct Remove<'a> {
103            state: &'a State,
104            child: &'a mut Child,
105        }
106        impl<'a> Drop for Remove<'a> {
107            fn drop(&mut self) {
108                let mut map = self.state.map.lock().unwrap();
109                drop(map.remove(&(self.child as *mut Child)));
110            }
111        }
112        let remove = Remove { state: self, child };
113
114        // Alright, we're guaranteed that we'll eventually get a SIGCHLD due
115        // to our `try_wait` failing, and we're also guaranteed that we'll
116        // get notified about this because we're in the map. Next up wait
117        // for an event.
118        //
119        // Note that this happens in a loop for two reasons; we could
120        // receive EINTR or we could pick up a SIGCHLD for other threads but not
121        // actually be ready oureslves.
122        let start = Instant::now();
123        let mut fds = [
124            libc::pollfd {
125                fd: self.read.as_raw_fd(),
126                events: libc::POLLIN,
127
128                revents: 0,
129            },
130            libc::pollfd {
131                fd: read.as_raw_fd(),
132                events: libc::POLLIN,
133                revents: 0,
134            },
135        ];
136        loop {
137            let elapsed = start.elapsed();
138            if elapsed >= dur {
139                break;
140            }
141            let timeout = dur - elapsed;
142            let timeout = timeout
143                .as_secs()
144                .checked_mul(1_000)
145                .and_then(|amt| amt.checked_add(timeout.subsec_nanos() as u64 / 1_000_000))
146                .unwrap_or(u64::max_value());
147            let timeout = cmp::min(<c_int>::max_value() as u64, timeout) as c_int;
148            let r = unsafe { libc::poll(fds.as_mut_ptr(), 2, timeout) };
149            let timeout = match r {
150                0 => true,
151                n if n > 0 => false,
152                n => {
153                    let err = io::Error::last_os_error();
154                    if err.kind() == io::ErrorKind::Interrupted {
155                        continue;
156                    } else {
157                        panic!("error in select = {}: {}", n, err)
158                    }
159                }
160            };
161
162            // Now that something has happened, we need to process what actually
163            // happened. There's are three reasons we could have woken up:
164            //
165            // 1. The file descriptor in our SIGCHLD handler was written to.
166            //    This means that a SIGCHLD was received and we need to poll the
167            //    entire list of waiting processes to figure out which ones
168            //    actually exited.
169            // 2. Our file descriptor was written to. This means that another
170            //    thread reaped our child and listed the exit status in the
171            //    local map.
172            // 3. We timed out. This means we need to remove ourselves from the
173            //    map and simply carry on.
174            //
175            // In the case that a SIGCHLD signal was received, we do that
176            // processing and keep going. If our fd was written to or a timeout
177            // was received then we break out of the loop and return from this
178            // call.
179            let mut map = self.map.lock().unwrap();
180            if drain(&self.read) {
181                self.process_sigchlds(&mut map);
182            }
183
184            if drain(&read) || timeout {
185                break;
186            }
187        }
188
189        let mut map = self.map.lock().unwrap();
190        let (_write, ret) = map.remove(&(remove.child as *mut Child)).unwrap();
191        drop(map);
192        Ok(ret)
193    }
194
195    fn process_sigchlds(&self, map: &mut StateMap) {
196        for (&k, &mut (ref write, ref mut status)) in map {
197            // Already reaped, nothing to do here
198            if status.is_some() {
199                continue;
200            }
201
202            *status = unsafe { (*k).try_wait().unwrap() };
203            if status.is_some() {
204                notify(write);
205            }
206        }
207    }
208}
209
210fn drain(mut file: &UnixStream) -> bool {
211    let mut ret = false;
212    let mut buf = [0u8; 16];
213    loop {
214        match file.read(&mut buf) {
215            Ok(0) => return true, // EOF == something happened
216            Ok(..) => ret = true, // data read, but keep draining
217            Err(e) => {
218                if e.kind() == io::ErrorKind::WouldBlock {
219                    return ret;
220                } else {
221                    panic!("bad read: {}", e)
222                }
223            }
224        }
225    }
226}
227
228fn notify(mut file: &UnixStream) {
229    match file.write(&[1]) {
230        Ok(..) => {}
231        Err(e) => {
232            if e.kind() != io::ErrorKind::WouldBlock {
233                panic!("bad error on write fd: {}", e)
234            }
235        }
236    }
237}
238
239// Signal handler for SIGCHLD signals, must be async-signal-safe!
240//
241// This function will write to the writing half of the "self pipe" to wake
242// up the helper thread if it's waiting. Note that this write must be
243// nonblocking because if it blocks and the reader is the thread we
244// interrupted, then we'll deadlock.
245//
246// When writing, if the write returns EWOULDBLOCK then we choose to ignore
247// it. At that point we're guaranteed that there's something in the pipe
248// which will wake up the other end at some point, so we just allow this
249// signal to be coalesced with the pending signals on the pipe.
250#[allow(unused_assignments)]
251extern "C" fn sigchld_handler(signum: c_int, info: *mut libc::siginfo_t, ptr: *mut libc::c_void) {
252    type FnSigaction = extern "C" fn(c_int, *mut libc::siginfo_t, *mut libc::c_void);
253    type FnHandler = extern "C" fn(c_int);
254
255    unsafe {
256        let state = &*STATE;
257        notify(&state.write);
258
259        let fnptr = state.prev.sa_sigaction;
260        if fnptr == 0 {
261            return;
262        }
263        if state.prev.sa_flags & libc::SA_SIGINFO == 0 {
264            let action = mem::transmute::<usize, FnHandler>(fnptr);
265            action(signum)
266        } else {
267            let action = mem::transmute::<usize, FnSigaction>(fnptr);
268            action(signum, info, ptr)
269        }
270    }
271}