parking/
lib.rs

1//! Thread parking and unparking.
2//!
3//! A [`Parker`] is in either the notified or unnotified state. The [`park()`][`Parker::park()`] method blocks
4//! the current thread until the [`Parker`] becomes notified and then puts it back into the unnotified
5//! state. The [`unpark()`][`Unparker::unpark()`] method puts it into the notified state.
6//!
7//! This API is similar to [`thread::park()`] and [`Thread::unpark()`] from the standard library.
8//! The difference is that the state "token" managed by those functions is shared across an entire
9//! thread, and anyone can call [`thread::current()`] to access it. If you use `park` and `unpark`,
10//! but you also call a function that uses `park` and `unpark` internally, that function could
11//! cause a deadlock by consuming a wakeup that was intended for you. The [`Parker`] object in this
12//! crate avoids that problem by managing its own state, which isn't shared with unrelated callers.
13//!
14//! [`thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html
15//! [`Thread::unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark
16//! [`thread::current()`]: https://doc.rust-lang.org/std/thread/fn.current.html
17//!
18//! # Examples
19//!
20//! ```
21//! use std::thread;
22//! use std::time::Duration;
23//! use parking::Parker;
24//!
25//! let p = Parker::new();
26//! let u = p.unparker();
27//!
28//! // Notify the parker.
29//! u.unpark();
30//!
31//! // Wakes up immediately because the parker is notified.
32//! p.park();
33//!
34//! thread::spawn(move || {
35//!     thread::sleep(Duration::from_millis(500));
36//!     u.unpark();
37//! });
38//!
39//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
40//! p.park();
41//! ```
42
43#![forbid(unsafe_code)]
44#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
45#![doc(
46    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
47)]
48#![doc(
49    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
50)]
51
52#[cfg(not(all(loom, feature = "loom")))]
53use std::sync;
54
55#[cfg(all(loom, feature = "loom"))]
56use loom::sync;
57
58use std::cell::Cell;
59use std::fmt;
60use std::marker::PhantomData;
61use std::sync::Arc;
62use std::task::{Wake, Waker};
63use std::time::Duration;
64
65#[cfg(not(all(loom, feature = "loom")))]
66use std::time::Instant;
67
68use sync::atomic::AtomicUsize;
69use sync::atomic::Ordering::SeqCst;
70use sync::{Condvar, Mutex};
71
72/// Creates a parker and an associated unparker.
73///
74/// # Examples
75///
76/// ```
77/// let (p, u) = parking::pair();
78/// ```
79pub fn pair() -> (Parker, Unparker) {
80    let p = Parker::new();
81    let u = p.unparker();
82    (p, u)
83}
84
85/// Waits for a notification.
86pub struct Parker {
87    unparker: Unparker,
88    _marker: PhantomData<Cell<()>>,
89}
90
91impl Parker {
92    /// Creates a new parker.
93    ///
94    /// # Examples
95    ///
96    /// ```
97    /// use parking::Parker;
98    ///
99    /// let p = Parker::new();
100    /// ```
101    ///
102    pub fn new() -> Parker {
103        Parker {
104            unparker: Unparker {
105                inner: Arc::new(Inner {
106                    state: AtomicUsize::new(EMPTY),
107                    lock: Mutex::new(()),
108                    cvar: Condvar::new(),
109                }),
110            },
111            _marker: PhantomData,
112        }
113    }
114
115    /// Blocks until notified and then goes back into unnotified state.
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// use parking::Parker;
121    ///
122    /// let p = Parker::new();
123    /// let u = p.unparker();
124    ///
125    /// // Notify the parker.
126    /// u.unpark();
127    ///
128    /// // Wakes up immediately because the parker is notified.
129    /// p.park();
130    /// ```
131    pub fn park(&self) {
132        self.unparker.inner.park(None);
133    }
134
135    /// Blocks until notified and then goes back into unnotified state, or times out after
136    /// `duration`.
137    ///
138    /// Returns `true` if notified before the timeout.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use std::time::Duration;
144    /// use parking::Parker;
145    ///
146    /// let p = Parker::new();
147    ///
148    /// // Wait for a notification, or time out after 500 ms.
149    /// p.park_timeout(Duration::from_millis(500));
150    /// ```
151    #[cfg(not(loom))]
152    pub fn park_timeout(&self, duration: Duration) -> bool {
153        self.unparker.inner.park(Some(duration))
154    }
155
156    /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
157    ///
158    /// Returns `true` if notified before the deadline.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// use std::time::{Duration, Instant};
164    /// use parking::Parker;
165    ///
166    /// let p = Parker::new();
167    ///
168    /// // Wait for a notification, or time out after 500 ms.
169    /// p.park_deadline(Instant::now() + Duration::from_millis(500));
170    /// ```
171    #[cfg(not(loom))]
172    pub fn park_deadline(&self, instant: Instant) -> bool {
173        self.unparker
174            .inner
175            .park(Some(instant.saturating_duration_since(Instant::now())))
176    }
177
178    /// Notifies the parker.
179    ///
180    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
181    /// was already notified.
182    ///
183    /// # Examples
184    ///
185    /// ```
186    /// use std::thread;
187    /// use std::time::Duration;
188    /// use parking::Parker;
189    ///
190    /// let p = Parker::new();
191    ///
192    /// assert_eq!(p.unpark(), true);
193    /// assert_eq!(p.unpark(), false);
194    ///
195    /// // Wakes up immediately.
196    /// p.park();
197    /// ```
198    pub fn unpark(&self) -> bool {
199        self.unparker.unpark()
200    }
201
202    /// Returns a handle for unparking.
203    ///
204    /// The returned [`Unparker`] can be cloned and shared among threads.
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use parking::Parker;
210    ///
211    /// let p = Parker::new();
212    /// let u = p.unparker();
213    ///
214    /// // Notify the parker.
215    /// u.unpark();
216    ///
217    /// // Wakes up immediately because the parker is notified.
218    /// p.park();
219    /// ```
220    pub fn unparker(&self) -> Unparker {
221        self.unparker.clone()
222    }
223}
224
225impl Default for Parker {
226    fn default() -> Parker {
227        Parker::new()
228    }
229}
230
231impl fmt::Debug for Parker {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        f.pad("Parker { .. }")
234    }
235}
236
237/// Notifies a parker.
238pub struct Unparker {
239    inner: Arc<Inner>,
240}
241
242impl Unparker {
243    /// Notifies the associated parker.
244    ///
245    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
246    /// was already notified.
247    ///
248    /// # Examples
249    ///
250    /// ```
251    /// use std::thread;
252    /// use std::time::Duration;
253    /// use parking::Parker;
254    ///
255    /// let p = Parker::new();
256    /// let u = p.unparker();
257    ///
258    /// thread::spawn(move || {
259    ///     thread::sleep(Duration::from_millis(500));
260    ///     u.unpark();
261    /// });
262    ///
263    /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
264    /// p.park();
265    /// ```
266    pub fn unpark(&self) -> bool {
267        self.inner.unpark()
268    }
269
270    /// Indicates whether this unparker will unpark the associated parker.
271    ///
272    /// This can be used to avoid unnecessary work before calling `unpark()`.
273    ///
274    /// # Examples
275    ///
276    /// ```
277    /// use parking::Parker;
278    ///
279    /// let p = Parker::new();
280    /// let u = p.unparker();
281    ///
282    /// assert!(u.will_unpark(&p));
283    /// ```
284    pub fn will_unpark(&self, parker: &Parker) -> bool {
285        Arc::ptr_eq(&self.inner, &parker.unparker.inner)
286    }
287
288    /// Indicates whether two unparkers will unpark the same parker.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use parking::Parker;
294    ///
295    /// let p = Parker::new();
296    /// let u1 = p.unparker();
297    /// let u2 = p.unparker();
298    ///
299    /// assert!(u1.same_parker(&u2));
300    /// ```
301    pub fn same_parker(&self, other: &Unparker) -> bool {
302        Arc::ptr_eq(&self.inner, &other.inner)
303    }
304}
305
306impl fmt::Debug for Unparker {
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        f.pad("Unparker { .. }")
309    }
310}
311
312impl Clone for Unparker {
313    fn clone(&self) -> Unparker {
314        Unparker {
315            inner: self.inner.clone(),
316        }
317    }
318}
319
320impl From<Unparker> for Waker {
321    fn from(up: Unparker) -> Self {
322        Waker::from(up.inner)
323    }
324}
325
326const EMPTY: usize = 0;
327const PARKED: usize = 1;
328const NOTIFIED: usize = 2;
329
330struct Inner {
331    state: AtomicUsize,
332    lock: Mutex<()>,
333    cvar: Condvar,
334}
335
336impl Inner {
337    fn park(&self, timeout: Option<Duration>) -> bool {
338        // If we were previously notified then we consume this notification and return quickly.
339        if self
340            .state
341            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
342            .is_ok()
343        {
344            return true;
345        }
346
347        // If the timeout is zero, then there is no need to actually block.
348        if let Some(dur) = timeout {
349            if dur == Duration::from_millis(0) {
350                return false;
351            }
352        }
353
354        // Otherwise we need to coordinate going to sleep.
355        let mut m = self.lock.lock().unwrap();
356
357        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
358            Ok(_) => {}
359            // Consume this notification to avoid spurious wakeups in the next park.
360            Err(NOTIFIED) => {
361                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
362                // because `unpark` may have been called again since we read `NOTIFIED` in the
363                // `compare_exchange` above. We must perform an acquire operation that synchronizes
364                // with that `unpark` to observe any writes it made before the call to `unpark`. To
365                // do that we must read from the write it made to `state`.
366                let old = self.state.swap(EMPTY, SeqCst);
367                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
368                return true;
369            }
370            Err(n) => panic!("inconsistent park_timeout state: {}", n),
371        }
372
373        match timeout {
374            None => {
375                loop {
376                    // Block the current thread on the conditional variable.
377                    m = self.cvar.wait(m).unwrap();
378
379                    if self
380                        .state
381                        .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
382                        .is_ok()
383                    {
384                        // got a notification
385                        return true;
386                    }
387                }
388            }
389            Some(timeout) => {
390                #[cfg(not(loom))]
391                {
392                    // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
393                    // notification we just want to unconditionally set `state` back to `EMPTY`, either
394                    // consuming a notification or un-flagging ourselves as parked.
395                    let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
396
397                    match self.state.swap(EMPTY, SeqCst) {
398                        NOTIFIED => true, // got a notification
399                        PARKED => false,  // no notification
400                        n => panic!("inconsistent park_timeout state: {}", n),
401                    }
402                }
403
404                #[cfg(loom)]
405                {
406                    let _ = timeout;
407                    panic!("park_timeout is not supported under loom");
408                }
409            }
410        }
411    }
412
413    pub fn unpark(&self) -> bool {
414        // To ensure the unparked thread will observe any writes we made before this call, we must
415        // perform a release operation that `park` can synchronize with. To do that we must write
416        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
417        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
418        match self.state.swap(NOTIFIED, SeqCst) {
419            EMPTY => return true,     // no one was waiting
420            NOTIFIED => return false, // already unparked
421            PARKED => {}              // gotta go wake someone up
422            _ => panic!("inconsistent state in unpark"),
423        }
424
425        // There is a period between when the parked thread sets `state` to `PARKED` (or last
426        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
427        // If we were to notify during this period it would be ignored and then when the parked
428        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
429        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
430        //
431        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
432        // it doesn't get woken only to have to wait for us to release `lock`.
433        drop(self.lock.lock().unwrap());
434        self.cvar.notify_one();
435        true
436    }
437}
438
439impl Wake for Inner {
440    #[inline]
441    fn wake(self: Arc<Self>) {
442        self.unpark();
443    }
444
445    #[inline]
446    fn wake_by_ref(self: &Arc<Self>) {
447        self.unpark();
448    }
449}