wait_timeout/unix.rs
1//! Unix implementation of waiting for children with timeouts
2//!
3//! On unix, wait() and its friends have no timeout parameters, so there is
4//! no way to time out a thread in wait(). From some googling and some
5//! thinking, it appears that there are a few ways to handle timeouts in
6//! wait(), but the only real reasonable one for a multi-threaded program is
7//! to listen for SIGCHLD.
8//!
9//! With this in mind, the waiting mechanism with a timeout only uses
10//! waitpid() with WNOHANG, but otherwise all the necessary blocking is done by
11//! waiting for a SIGCHLD to arrive (and that blocking has a timeout). Note,
12//! however, that waitpid() is still used to actually reap the child.
13//!
14//! Signal handling is super tricky in general, and this is no exception. Due
15//! to the async nature of SIGCHLD, we use the self-pipe trick to transmit
16//! data out of the signal handler to the rest of the application.
17
18#![allow(bad_style)]
19
20use std::cmp;
21use std::collections::HashMap;
22use std::io::{self, Read, Write};
23use std::mem;
24use std::os::unix::net::UnixStream;
25use std::os::unix::prelude::*;
26use std::process::{Child, ExitStatus};
27use std::sync::{Mutex, Once};
28use std::time::{Duration, Instant};
29
30use libc::{self, c_int};
31
32static INIT: Once = Once::new();
33static mut STATE: *mut State = 0 as *mut _;
34
35struct State {
36 prev: libc::sigaction,
37 write: UnixStream,
38 read: UnixStream,
39 map: Mutex<StateMap>,
40}
41
42type StateMap = HashMap<*mut Child, (UnixStream, Option<ExitStatus>)>;
43
44pub fn wait_timeout(child: &mut Child, dur: Duration) -> io::Result<Option<ExitStatus>> {
45 INIT.call_once(State::init);
46 unsafe { (*STATE).wait_timeout(child, dur) }
47}
48
49impl State {
50 #[allow(unused_assignments)]
51 fn init() {
52 unsafe {
53 // Create our "self pipe" and then set both ends to nonblocking
54 // mode.
55 let (read, write) = UnixStream::pair().unwrap();
56 read.set_nonblocking(true).unwrap();
57 write.set_nonblocking(true).unwrap();
58
59 let state = Box::new(State {
60 prev: mem::zeroed(),
61 write: write,
62 read: read,
63 map: Mutex::new(HashMap::new()),
64 });
65
66 // Register our sigchld handler
67 let mut new: libc::sigaction = mem::zeroed();
68 new.sa_sigaction = sigchld_handler as usize;
69 new.sa_flags = libc::SA_NOCLDSTOP | libc::SA_RESTART | libc::SA_SIGINFO;
70
71 STATE = Box::into_raw(state);
72
73 assert_eq!(libc::sigaction(libc::SIGCHLD, &new, &mut (*STATE).prev), 0);
74 }
75 }
76
77 fn wait_timeout(&self, child: &mut Child, dur: Duration) -> io::Result<Option<ExitStatus>> {
78 // First up, prep our notification pipe which will tell us when our
79 // child has been reaped (other threads may signal this pipe).
80 let (read, write) = UnixStream::pair()?;
81 read.set_nonblocking(true)?;
82 write.set_nonblocking(true)?;
83
84 // Next, take a lock on the map of children currently waiting. Right
85 // after this, **before** we add ourselves to the map, we check to see
86 // if our child has actually already exited via a `try_wait`. If the
87 // child has exited then we return immediately as we'll never otherwise
88 // receive a SIGCHLD notification.
89 //
90 // If the wait reports the child is still running, however, we add
91 // ourselves to the map and then block in `select` waiting for something
92 // to happen.
93 let mut map = self.map.lock().unwrap();
94 if let Some(status) = child.try_wait()? {
95 return Ok(Some(status));
96 }
97 assert!(map.insert(child, (write, None)).is_none());
98 drop(map);
99
100 // Make sure that no matter what when we exit our pointer is removed
101 // from the map.
102 struct Remove<'a> {
103 state: &'a State,
104 child: &'a mut Child,
105 }
106 impl<'a> Drop for Remove<'a> {
107 fn drop(&mut self) {
108 let mut map = self.state.map.lock().unwrap();
109 drop(map.remove(&(self.child as *mut Child)));
110 }
111 }
112 let remove = Remove { state: self, child };
113
114 // Alright, we're guaranteed that we'll eventually get a SIGCHLD due
115 // to our `try_wait` failing, and we're also guaranteed that we'll
116 // get notified about this because we're in the map. Next up wait
117 // for an event.
118 //
119 // Note that this happens in a loop for two reasons; we could
120 // receive EINTR or we could pick up a SIGCHLD for other threads but not
121 // actually be ready oureslves.
122 let start = Instant::now();
123 let mut fds = [
124 libc::pollfd {
125 fd: self.read.as_raw_fd(),
126 events: libc::POLLIN,
127
128 revents: 0,
129 },
130 libc::pollfd {
131 fd: read.as_raw_fd(),
132 events: libc::POLLIN,
133 revents: 0,
134 },
135 ];
136 loop {
137 let elapsed = start.elapsed();
138 if elapsed >= dur {
139 break;
140 }
141 let timeout = dur - elapsed;
142 let timeout = timeout
143 .as_secs()
144 .checked_mul(1_000)
145 .and_then(|amt| amt.checked_add(timeout.subsec_nanos() as u64 / 1_000_000))
146 .unwrap_or(u64::max_value());
147 let timeout = cmp::min(<c_int>::max_value() as u64, timeout) as c_int;
148 let r = unsafe { libc::poll(fds.as_mut_ptr(), 2, timeout) };
149 let timeout = match r {
150 0 => true,
151 n if n > 0 => false,
152 n => {
153 let err = io::Error::last_os_error();
154 if err.kind() == io::ErrorKind::Interrupted {
155 continue;
156 } else {
157 panic!("error in select = {}: {}", n, err)
158 }
159 }
160 };
161
162 // Now that something has happened, we need to process what actually
163 // happened. There's are three reasons we could have woken up:
164 //
165 // 1. The file descriptor in our SIGCHLD handler was written to.
166 // This means that a SIGCHLD was received and we need to poll the
167 // entire list of waiting processes to figure out which ones
168 // actually exited.
169 // 2. Our file descriptor was written to. This means that another
170 // thread reaped our child and listed the exit status in the
171 // local map.
172 // 3. We timed out. This means we need to remove ourselves from the
173 // map and simply carry on.
174 //
175 // In the case that a SIGCHLD signal was received, we do that
176 // processing and keep going. If our fd was written to or a timeout
177 // was received then we break out of the loop and return from this
178 // call.
179 let mut map = self.map.lock().unwrap();
180 if drain(&self.read) {
181 self.process_sigchlds(&mut map);
182 }
183
184 if drain(&read) || timeout {
185 break;
186 }
187 }
188
189 let mut map = self.map.lock().unwrap();
190 let (_write, ret) = map.remove(&(remove.child as *mut Child)).unwrap();
191 drop(map);
192 Ok(ret)
193 }
194
195 fn process_sigchlds(&self, map: &mut StateMap) {
196 for (&k, &mut (ref write, ref mut status)) in map {
197 // Already reaped, nothing to do here
198 if status.is_some() {
199 continue;
200 }
201
202 *status = unsafe { (*k).try_wait().unwrap() };
203 if status.is_some() {
204 notify(write);
205 }
206 }
207 }
208}
209
210fn drain(mut file: &UnixStream) -> bool {
211 let mut ret = false;
212 let mut buf = [0u8; 16];
213 loop {
214 match file.read(&mut buf) {
215 Ok(0) => return true, // EOF == something happened
216 Ok(..) => ret = true, // data read, but keep draining
217 Err(e) => {
218 if e.kind() == io::ErrorKind::WouldBlock {
219 return ret;
220 } else {
221 panic!("bad read: {}", e)
222 }
223 }
224 }
225 }
226}
227
228fn notify(mut file: &UnixStream) {
229 match file.write(&[1]) {
230 Ok(..) => {}
231 Err(e) => {
232 if e.kind() != io::ErrorKind::WouldBlock {
233 panic!("bad error on write fd: {}", e)
234 }
235 }
236 }
237}
238
239// Signal handler for SIGCHLD signals, must be async-signal-safe!
240//
241// This function will write to the writing half of the "self pipe" to wake
242// up the helper thread if it's waiting. Note that this write must be
243// nonblocking because if it blocks and the reader is the thread we
244// interrupted, then we'll deadlock.
245//
246// When writing, if the write returns EWOULDBLOCK then we choose to ignore
247// it. At that point we're guaranteed that there's something in the pipe
248// which will wake up the other end at some point, so we just allow this
249// signal to be coalesced with the pending signals on the pipe.
250#[allow(unused_assignments)]
251extern "C" fn sigchld_handler(signum: c_int, info: *mut libc::siginfo_t, ptr: *mut libc::c_void) {
252 type FnSigaction = extern "C" fn(c_int, *mut libc::siginfo_t, *mut libc::c_void);
253 type FnHandler = extern "C" fn(c_int);
254
255 unsafe {
256 let state = &*STATE;
257 notify(&state.write);
258
259 let fnptr = state.prev.sa_sigaction;
260 if fnptr == 0 {
261 return;
262 }
263 if state.prev.sa_flags & libc::SA_SIGINFO == 0 {
264 let action = mem::transmute::<usize, FnHandler>(fnptr);
265 action(signum)
266 } else {
267 let action = mem::transmute::<usize, FnSigaction>(fnptr);
268 action(signum, info, ptr)
269 }
270 }
271}