parking/lib.rs
1//! Thread parking and unparking.
2//!
3//! A [`Parker`] is in either the notified or unnotified state. The [`park()`][`Parker::park()`] method blocks
4//! the current thread until the [`Parker`] becomes notified and then puts it back into the unnotified
5//! state. The [`unpark()`][`Unparker::unpark()`] method puts it into the notified state.
6//!
7//! This API is similar to [`thread::park()`] and [`Thread::unpark()`] from the standard library.
8//! The difference is that the state "token" managed by those functions is shared across an entire
9//! thread, and anyone can call [`thread::current()`] to access it. If you use `park` and `unpark`,
10//! but you also call a function that uses `park` and `unpark` internally, that function could
11//! cause a deadlock by consuming a wakeup that was intended for you. The [`Parker`] object in this
12//! crate avoids that problem by managing its own state, which isn't shared with unrelated callers.
13//!
14//! [`thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html
15//! [`Thread::unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark
16//! [`thread::current()`]: https://doc.rust-lang.org/std/thread/fn.current.html
17//!
18//! # Examples
19//!
20//! ```
21//! use std::thread;
22//! use std::time::Duration;
23//! use parking::Parker;
24//!
25//! let p = Parker::new();
26//! let u = p.unparker();
27//!
28//! // Notify the parker.
29//! u.unpark();
30//!
31//! // Wakes up immediately because the parker is notified.
32//! p.park();
33//!
34//! thread::spawn(move || {
35//! thread::sleep(Duration::from_millis(500));
36//! u.unpark();
37//! });
38//!
39//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
40//! p.park();
41//! ```
42
43#![forbid(unsafe_code)]
44#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
45#![doc(
46 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
47)]
48#![doc(
49 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
50)]
51
52#[cfg(not(all(loom, feature = "loom")))]
53use std::sync;
54
55#[cfg(all(loom, feature = "loom"))]
56use loom::sync;
57
58use std::cell::Cell;
59use std::fmt;
60use std::marker::PhantomData;
61use std::sync::Arc;
62use std::task::{Wake, Waker};
63use std::time::Duration;
64
65#[cfg(not(all(loom, feature = "loom")))]
66use std::time::Instant;
67
68use sync::atomic::AtomicUsize;
69use sync::atomic::Ordering::SeqCst;
70use sync::{Condvar, Mutex};
71
72/// Creates a parker and an associated unparker.
73///
74/// # Examples
75///
76/// ```
77/// let (p, u) = parking::pair();
78/// ```
79pub fn pair() -> (Parker, Unparker) {
80 let p = Parker::new();
81 let u = p.unparker();
82 (p, u)
83}
84
85/// Waits for a notification.
86pub struct Parker {
87 unparker: Unparker,
88 _marker: PhantomData<Cell<()>>,
89}
90
91impl Parker {
92 /// Creates a new parker.
93 ///
94 /// # Examples
95 ///
96 /// ```
97 /// use parking::Parker;
98 ///
99 /// let p = Parker::new();
100 /// ```
101 ///
102 pub fn new() -> Parker {
103 Parker {
104 unparker: Unparker {
105 inner: Arc::new(Inner {
106 state: AtomicUsize::new(EMPTY),
107 lock: Mutex::new(()),
108 cvar: Condvar::new(),
109 }),
110 },
111 _marker: PhantomData,
112 }
113 }
114
115 /// Blocks until notified and then goes back into unnotified state.
116 ///
117 /// # Examples
118 ///
119 /// ```
120 /// use parking::Parker;
121 ///
122 /// let p = Parker::new();
123 /// let u = p.unparker();
124 ///
125 /// // Notify the parker.
126 /// u.unpark();
127 ///
128 /// // Wakes up immediately because the parker is notified.
129 /// p.park();
130 /// ```
131 pub fn park(&self) {
132 self.unparker.inner.park(None);
133 }
134
135 /// Blocks until notified and then goes back into unnotified state, or times out after
136 /// `duration`.
137 ///
138 /// Returns `true` if notified before the timeout.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use std::time::Duration;
144 /// use parking::Parker;
145 ///
146 /// let p = Parker::new();
147 ///
148 /// // Wait for a notification, or time out after 500 ms.
149 /// p.park_timeout(Duration::from_millis(500));
150 /// ```
151 #[cfg(not(loom))]
152 pub fn park_timeout(&self, duration: Duration) -> bool {
153 self.unparker.inner.park(Some(duration))
154 }
155
156 /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
157 ///
158 /// Returns `true` if notified before the deadline.
159 ///
160 /// # Examples
161 ///
162 /// ```
163 /// use std::time::{Duration, Instant};
164 /// use parking::Parker;
165 ///
166 /// let p = Parker::new();
167 ///
168 /// // Wait for a notification, or time out after 500 ms.
169 /// p.park_deadline(Instant::now() + Duration::from_millis(500));
170 /// ```
171 #[cfg(not(loom))]
172 pub fn park_deadline(&self, instant: Instant) -> bool {
173 self.unparker
174 .inner
175 .park(Some(instant.saturating_duration_since(Instant::now())))
176 }
177
178 /// Notifies the parker.
179 ///
180 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
181 /// was already notified.
182 ///
183 /// # Examples
184 ///
185 /// ```
186 /// use std::thread;
187 /// use std::time::Duration;
188 /// use parking::Parker;
189 ///
190 /// let p = Parker::new();
191 ///
192 /// assert_eq!(p.unpark(), true);
193 /// assert_eq!(p.unpark(), false);
194 ///
195 /// // Wakes up immediately.
196 /// p.park();
197 /// ```
198 pub fn unpark(&self) -> bool {
199 self.unparker.unpark()
200 }
201
202 /// Returns a handle for unparking.
203 ///
204 /// The returned [`Unparker`] can be cloned and shared among threads.
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use parking::Parker;
210 ///
211 /// let p = Parker::new();
212 /// let u = p.unparker();
213 ///
214 /// // Notify the parker.
215 /// u.unpark();
216 ///
217 /// // Wakes up immediately because the parker is notified.
218 /// p.park();
219 /// ```
220 pub fn unparker(&self) -> Unparker {
221 self.unparker.clone()
222 }
223}
224
225impl Default for Parker {
226 fn default() -> Parker {
227 Parker::new()
228 }
229}
230
231impl fmt::Debug for Parker {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 f.pad("Parker { .. }")
234 }
235}
236
237/// Notifies a parker.
238pub struct Unparker {
239 inner: Arc<Inner>,
240}
241
242impl Unparker {
243 /// Notifies the associated parker.
244 ///
245 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
246 /// was already notified.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use std::thread;
252 /// use std::time::Duration;
253 /// use parking::Parker;
254 ///
255 /// let p = Parker::new();
256 /// let u = p.unparker();
257 ///
258 /// thread::spawn(move || {
259 /// thread::sleep(Duration::from_millis(500));
260 /// u.unpark();
261 /// });
262 ///
263 /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
264 /// p.park();
265 /// ```
266 pub fn unpark(&self) -> bool {
267 self.inner.unpark()
268 }
269
270 /// Indicates whether this unparker will unpark the associated parker.
271 ///
272 /// This can be used to avoid unnecessary work before calling `unpark()`.
273 ///
274 /// # Examples
275 ///
276 /// ```
277 /// use parking::Parker;
278 ///
279 /// let p = Parker::new();
280 /// let u = p.unparker();
281 ///
282 /// assert!(u.will_unpark(&p));
283 /// ```
284 pub fn will_unpark(&self, parker: &Parker) -> bool {
285 Arc::ptr_eq(&self.inner, &parker.unparker.inner)
286 }
287
288 /// Indicates whether two unparkers will unpark the same parker.
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use parking::Parker;
294 ///
295 /// let p = Parker::new();
296 /// let u1 = p.unparker();
297 /// let u2 = p.unparker();
298 ///
299 /// assert!(u1.same_parker(&u2));
300 /// ```
301 pub fn same_parker(&self, other: &Unparker) -> bool {
302 Arc::ptr_eq(&self.inner, &other.inner)
303 }
304}
305
306impl fmt::Debug for Unparker {
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308 f.pad("Unparker { .. }")
309 }
310}
311
312impl Clone for Unparker {
313 fn clone(&self) -> Unparker {
314 Unparker {
315 inner: self.inner.clone(),
316 }
317 }
318}
319
320impl From<Unparker> for Waker {
321 fn from(up: Unparker) -> Self {
322 Waker::from(up.inner)
323 }
324}
325
326const EMPTY: usize = 0;
327const PARKED: usize = 1;
328const NOTIFIED: usize = 2;
329
330struct Inner {
331 state: AtomicUsize,
332 lock: Mutex<()>,
333 cvar: Condvar,
334}
335
336impl Inner {
337 fn park(&self, timeout: Option<Duration>) -> bool {
338 // If we were previously notified then we consume this notification and return quickly.
339 if self
340 .state
341 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
342 .is_ok()
343 {
344 return true;
345 }
346
347 // If the timeout is zero, then there is no need to actually block.
348 if let Some(dur) = timeout {
349 if dur == Duration::from_millis(0) {
350 return false;
351 }
352 }
353
354 // Otherwise we need to coordinate going to sleep.
355 let mut m = self.lock.lock().unwrap();
356
357 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
358 Ok(_) => {}
359 // Consume this notification to avoid spurious wakeups in the next park.
360 Err(NOTIFIED) => {
361 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
362 // because `unpark` may have been called again since we read `NOTIFIED` in the
363 // `compare_exchange` above. We must perform an acquire operation that synchronizes
364 // with that `unpark` to observe any writes it made before the call to `unpark`. To
365 // do that we must read from the write it made to `state`.
366 let old = self.state.swap(EMPTY, SeqCst);
367 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
368 return true;
369 }
370 Err(n) => panic!("inconsistent park_timeout state: {}", n),
371 }
372
373 match timeout {
374 None => {
375 loop {
376 // Block the current thread on the conditional variable.
377 m = self.cvar.wait(m).unwrap();
378
379 if self
380 .state
381 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
382 .is_ok()
383 {
384 // got a notification
385 return true;
386 }
387 }
388 }
389 Some(timeout) => {
390 #[cfg(not(loom))]
391 {
392 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
393 // notification we just want to unconditionally set `state` back to `EMPTY`, either
394 // consuming a notification or un-flagging ourselves as parked.
395 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
396
397 match self.state.swap(EMPTY, SeqCst) {
398 NOTIFIED => true, // got a notification
399 PARKED => false, // no notification
400 n => panic!("inconsistent park_timeout state: {}", n),
401 }
402 }
403
404 #[cfg(loom)]
405 {
406 let _ = timeout;
407 panic!("park_timeout is not supported under loom");
408 }
409 }
410 }
411 }
412
413 pub fn unpark(&self) -> bool {
414 // To ensure the unparked thread will observe any writes we made before this call, we must
415 // perform a release operation that `park` can synchronize with. To do that we must write
416 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
417 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
418 match self.state.swap(NOTIFIED, SeqCst) {
419 EMPTY => return true, // no one was waiting
420 NOTIFIED => return false, // already unparked
421 PARKED => {} // gotta go wake someone up
422 _ => panic!("inconsistent state in unpark"),
423 }
424
425 // There is a period between when the parked thread sets `state` to `PARKED` (or last
426 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
427 // If we were to notify during this period it would be ignored and then when the parked
428 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
429 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
430 //
431 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
432 // it doesn't get woken only to have to wait for us to release `lock`.
433 drop(self.lock.lock().unwrap());
434 self.cvar.notify_one();
435 true
436 }
437}
438
439impl Wake for Inner {
440 #[inline]
441 fn wake(self: Arc<Self>) {
442 self.unpark();
443 }
444
445 #[inline]
446 fn wake_by_ref(self: &Arc<Self>) {
447 self.unpark();
448 }
449}