fuchsia_sync/
rwlock.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::atomic::Ordering;
6
7pub struct RawSyncRwLock {
8    /// Holds the primary state of the RwLock.
9    ///
10    /// See the constants declared below for the semantics of this value.
11    ///
12    /// Readers will zx_futex_wait on this address.
13    ///
14    /// Ordering rules:
15    ///
16    ///  * Any store operation which may release the lock must use Ordering::Release on state to
17    ///    establish a happens-before relationship with the next lock acquisition.
18    ///  * Any load operation which may acquire the lock must use Ordering::Acquire on state to
19    ///    establish a happens-before relationship with the previous lock release.
20    state: zx::Futex,
21
22    /// The queue of writers waiting to obtain this lock.
23    ///
24    /// The value of this field is just a generation counter for this queue.
25    ///
26    /// Writers will zx_futex_wait on this address with the current generation number.
27    ///
28    /// Ordering rules:
29    ///
30    ///  * Stores to writer_queue must be preceded by a store to state and use Ordering::Release.
31    ///  * Loads from writer_queue must use Ordering::Acquire and be followed by a load of state.
32    writer_queue: zx::Futex,
33}
34
35const INITIAL_STATE: i32 = 0;
36
37/// If this bit is set in `state`, then the lock is held exclusively (i.e., as a writer) by the
38/// thread that set this bit.
39const WRITER_BIT: i32 = 0b0001;
40
41/// If this bit is set in `state`, then a writer wished to acquire exclusive access to this lock
42/// but observed a reader or a writer holding the lock. The writer will fetch the currentgeneration
43/// number for `writer_queue`, re-check `state`, and then zx_futex_wait on the `writer_queue`.
44const WRITER_BLOCKED_BIT: i32 = 0b0010;
45
46/// If this bit is set in `state`, then a reader wished to acquire shared access to this lock
47/// but could not because either (a) the lock was held exclusively by a writer or (b) a writer
48/// was already blocked waiting for the lock. This second condition is necessary to avoid
49/// starving writers: once a writer is blocked, readers that could otherwise have acquired
50/// shared access to the lock become blocked waiting for at least one writer to acquire the lock.
51const READER_BLOCKED_BIT: i32 = 0b0100;
52
53/// The amount `state` is incremented when a reader acquires the lock. The `state` tracks the
54/// number of outstanding readers so that once all the readers have released their shared access,
55/// the lock can be made available for exclusive access again.
56///
57/// We count the readers in the high bits of the state so that we can use arithmetic overflow to
58/// detect when too many readers have acquired the lock for us to keep track of.
59const READER_UNIT: i32 = 0b1000;
60
61/// A mask to select only the bits that count the number of readers holding shared access to the
62/// lock.
63const READER_MASK: i32 = !0b0111;
64
65/// # STATE MACHINE
66///
67/// The RwLock goes through the following states:
68///
69/// ## Initial
70///
71/// In the "Initial" state, the `state` is zero. No thread has access to the lock and no threads
72/// are waiting.
73///
74/// * If a reader tries to acquire the lock => Shared access (unblocked)
75/// * If a writer tries to acquire the lock => Exclusive access (unblocked)
76/// * If a previously blocked writer acquires the lock => Exclusive access (writers blocked)
77///
78/// ## Shared access (unblocked)
79///
80/// In this state, `state & READER_MASK` is non-zero and other bits are unset. A non-zero
81/// number of threads have shared access to the lock and no threads are waiting.
82///
83/// Additional readers can acquire shared access to the lock without entering the kernel.
84///
85/// * If a reader tries to acquire the lock => Shared access (unblocked)
86/// * If a writer tries to acquire the lock => Shared access (writers blocked)
87/// * If the last reader releases the lock => Initial
88///
89/// ## Shared access (writers blocked)
90///
91/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT is set, and other bits are
92/// unset. A non-zero number of threads have shared access to the lock and a non-zero number of
93/// writers are waiting for exclusive access.
94///
95/// The lock is contended and requires kernel coordination to wake the blocked threads.
96///
97/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
98/// * If a writer tries to acquire the lock => Shared access (writers blocked)
99/// * If the last reader releases the lock => Exclusive access (writers blocked)
100///
101/// ## Shared access (readers and writers blocked)
102///
103/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT and READER_BLOCKED_BIT are
104/// set, and other bits are unset. A non-zero number of threads have shared access to the lock,
105/// a non-zero number of writers are waiting for exclusive access, and a non-zero number of writers
106/// are waiting for shared access.
107///
108/// The lock is contended and requires kernel coordination to wake the blocked threads.
109///
110/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
111/// * If a writer tries to acquire the lock => Shared access (readers and writers blocked)
112/// * If the last reader releases the lock => Exclusive access (readers and writers blocked)
113///
114/// ## Exclusive access (unblocked)
115///
116/// In this state, WRITER_BIT is set and other bits are unset. Exactly one thread has exclusive
117/// access to the lock and no threads are waiting.
118///
119/// The writer can release the lock without entering the kernel.
120///
121/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
122/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
123/// * If a writer tries to downgrade the lock => Shared access (unblocked)
124/// * If the writer releases the lock => Initial
125///
126/// ## Exclusive access (writers blocked)
127///
128/// In this state, WRITER_BIT and WRITER_BLOCKED_BIT are set and other bits are unset. Exactly one
129/// thread has exclusive access to the lock and zero or more writers are waiting for exclusive
130/// access.
131///
132/// When the writer release the lock, the state transitions to the "Initial state" and then the
133/// lock wakes up one of the writers, if any exist. If this previously waiting writer succeeds in
134/// acquiring the lock, the state machine returns to the "Exclusive access (writers blocked)" state
135/// because we do not know how many writers are blocked waiting for exclusive access.
136///
137/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
138/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
139/// * If a writer tries to downgrade the lock => Shared access (writers blocked)
140/// * If the writer releases the lock => Initial
141///
142/// ## Exclusive access (readers blocked)
143///
144/// In this state, WRITER_BIT and READER_BLOCKED_BIT are set and other bits are unset. Exactly one
145/// thread has exclusive access to the lock and zero or more writers are waiting for shared
146/// access.
147///
148/// When the writer release the lock, the state transitions to the initial state and then the lock
149/// wakes up any blocked readers.
150///
151/// * If a reader tries to acquire the lock => Exclusive access (readers blocked)
152/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
153/// * If a writer tries to downgrade the lock => Unique reader (readers blocked)
154/// * If the writer releases the lock => Initial
155///
156/// ## Exclusive access (readers and writers blocked)
157///
158/// In this state, WRITER_BIT, WRITER_BLOCKED_BIT, and READER_BLOCKED_BIT are set and other bits
159/// are unset. Exactly one thread has exclusive access to the lock and zero or more writers are
160/// waiting for exclusive access, and a non-zero number of readers are waiting for shared access.
161///
162/// The lock is contended and requires kernel coordination to wake the blocked threads.
163///
164/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
165/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
166/// * If a writer tries to downgrade the lock => Unique reader (readers and writers blocked)
167/// * If the writer releases the lock => Unlocked (readers blocked)
168///
169/// ## Unlocked (readers blocked)
170///
171/// In this state, READER_BLOCKED_BIT is set and other bits are unset. No thread has access to the
172/// lock and a non-zero number of readers are waiting for shared access.
173///
174/// This state is transitory and the state machine will leave this state without outside
175/// intervention by returning to the "Initial" state and waking any blocked readers.
176///
177/// * If a reader tries to acquire the lock => Unlocked (readers blocked)
178/// * If a writer tries to acquire the lock => Exclusive access (readers blocked)
179/// * Otherwise => Initial
180///
181/// ## Unique reader (readers blocked)
182///
183/// In this state, there is exactly one reader, who is running on the current thread, the
184/// READER_BLOCKED_BIT is set and and other bits are unset. A non-zero number of readers are
185/// waiting for shared access.
186///
187/// This state is transitory and the state machine will leave this state without outside
188/// intervention by moving to the "Shared access (unblocked)" state and waking any blocked
189/// readers.
190///
191/// * If a reader tries to acquire the lock => Unique reader (readers blocked)
192/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
193/// * Otherwise => Shared access (unblocked)
194///
195/// ## Unique reader (readers and writers blocked)
196///
197/// In this state, there is exactly one reader, who is running on the current thread, and the
198/// READER_BLOCKED_BIT and the WRITER_BLOCKED_BIT are set. Zero or more writers are waiting for
199/// exclusive access, and a non-zero number of readers are waiting for shared access.
200///
201/// This state is transitory and the state machine will leave this state without outside
202/// intervention by moving to the "Shared access (writers blocked)" state and waking any blocked
203/// readers.
204///
205///
206/// * If a reader tries to acquire the lock => Unique reader (readers and writers blocked)
207/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
208/// * Otherwise => Shared access (writers blocked)
209
210fn is_locked_exclusive(state: i32) -> bool {
211    state & WRITER_BIT != 0
212}
213
214fn has_blocked_writer(state: i32) -> bool {
215    state & WRITER_BLOCKED_BIT != 0
216}
217
218fn has_blocked_reader(state: i32) -> bool {
219    state & READER_BLOCKED_BIT != 0
220}
221
222fn can_lock_shared(state: i32) -> bool {
223    !is_locked_exclusive(state) && !has_blocked_writer(state) && !has_blocked_reader(state)
224}
225
226fn is_unlocked(state: i32) -> bool {
227    state & (WRITER_BIT | READER_MASK) == 0
228}
229
230impl RawSyncRwLock {
231    #[inline]
232    fn try_lock_shared_fast(&self) -> bool {
233        let state = self.state.load(Ordering::Relaxed);
234        if can_lock_shared(state) {
235            if let Some(new_state) = state.checked_add(READER_UNIT) {
236                return self
237                    .state
238                    .compare_exchange(state, new_state, Ordering::Acquire, Ordering::Relaxed)
239                    .is_ok();
240            }
241        }
242        false
243    }
244
245    #[cold]
246    fn lock_shared_slow(&self) {
247        let mut state = self.state.load(Ordering::Relaxed);
248        loop {
249            if can_lock_shared(state) {
250                let new_state =
251                    state.checked_add(READER_UNIT).expect("overflowed reader count in rwlock");
252                match self.state.compare_exchange_weak(
253                    state,
254                    new_state,
255                    Ordering::Acquire,
256                    Ordering::Relaxed,
257                ) {
258                    Ok(_) => return, // Acquired shared lock.
259                    Err(observed_state) => {
260                        state = observed_state;
261                        continue;
262                    }
263                }
264            }
265
266            let desired_sleep_state = state | READER_BLOCKED_BIT;
267
268            if !has_blocked_reader(state) {
269                if let Err(observed_state) = self.state.compare_exchange(
270                    state,
271                    desired_sleep_state,
272                    Ordering::Relaxed,
273                    Ordering::Relaxed,
274                ) {
275                    state = observed_state;
276                    continue;
277                }
278            }
279
280            // Ignore spurious wakeups, the loop will retry.
281            self.state
282                .wait(
283                    desired_sleep_state,
284                    None, // We don't integrate with priority inheritance yet.
285                    zx::MonotonicInstant::INFINITE,
286                )
287                .ok();
288            state = self.state.load(Ordering::Relaxed);
289        }
290    }
291
292    #[cold]
293    fn lock_exclusive_slow(&self) {
294        let mut state = self.state.load(Ordering::Relaxed);
295        let mut other_writers_bit = 0;
296
297        loop {
298            if is_unlocked(state) {
299                match self.state.compare_exchange_weak(
300                    state,
301                    state | WRITER_BIT | other_writers_bit,
302                    Ordering::Acquire,
303                    Ordering::Relaxed,
304                ) {
305                    Ok(_) => return, // Acquired exclusive lock.
306                    Err(observed_state) => {
307                        state = observed_state;
308                        continue;
309                    }
310                }
311            }
312
313            if !has_blocked_writer(state) {
314                if let Err(observed_state) = self.state.compare_exchange(
315                    state,
316                    state | WRITER_BLOCKED_BIT,
317                    Ordering::Relaxed,
318                    Ordering::Relaxed,
319                ) {
320                    state = observed_state;
321                    continue;
322                }
323            }
324
325            other_writers_bit = WRITER_BLOCKED_BIT;
326
327            let generation_number = self.writer_queue.load(Ordering::Acquire);
328
329            // Before we go to sleep on the writer_queue at the fetched generation number, we need
330            // to make sure that some other thread is going to wake that generation of sleeping
331            // writers. If we didn't fetch the state again, it's possible that another thread could
332            // have cleared the WRITER_BLOCKED_BIT in the state and incremented the generation
333            // number between the last time we observed state and the time we observed the
334            // generation number.
335            //
336            // By observing the WRITER_BLOCKED_BIT *after* fetching the generation number, we
337            // ensure that either (a) this generation has already been awoken or (b) whoever clears
338            // the WRITER_BLOCKED_BIT bit will wake this generation in the future.
339            state = self.state.load(Ordering::Relaxed);
340
341            // If the lock is available or the WRITER_BLOCKED_BIT is missing, try again. No one has
342            // promised to wake the observed generation number.
343            if is_unlocked(state) || !has_blocked_writer(state) {
344                continue;
345            }
346
347            // Ignore spurious wakeups here, the loop will retry.
348            self.writer_queue
349                .wait(
350                    generation_number,
351                    None, // We don't integrate with priority inheritance yet.
352                    zx::MonotonicInstant::INFINITE,
353                )
354                .ok();
355
356            state = self.state.load(Ordering::Relaxed);
357        }
358    }
359
360    #[cold]
361    fn unlock_slow(&self, mut state: i32) {
362        debug_assert!(is_unlocked(state));
363
364        // There are only writers waiting.
365        if state == WRITER_BLOCKED_BIT {
366            match self.state.compare_exchange(
367                state,
368                INITIAL_STATE,
369                Ordering::Relaxed,
370                Ordering::Relaxed,
371            ) {
372                Ok(_) => {
373                    self.wake_writer();
374                    // We either made progress by waking a waiter or no one is waiting for this
375                    // lock anymore.
376                    return;
377                }
378                Err(observed_state) => {
379                    state = observed_state;
380                }
381            }
382        }
383
384        // There are both readers and writers waiting.
385        if state == READER_BLOCKED_BIT | WRITER_BLOCKED_BIT {
386            // Attempt to clear the WRITER_BLOCKED_BIT.
387            if self
388                .state
389                .compare_exchange(state, READER_BLOCKED_BIT, Ordering::Relaxed, Ordering::Relaxed)
390                .is_err()
391            {
392                // The state changed, which means another thread made progress. We're done.
393                return;
394            }
395            self.wake_writer();
396            // We cannot be sure that we actually work up a writer, which means we also need to
397            // wake up the readers to avoid the situation where a stack of readers are waiting for
398            // a non-existent writer to be done.
399            state = READER_BLOCKED_BIT;
400        }
401
402        // There are only readers waiting.
403        if state == READER_BLOCKED_BIT {
404            if self
405                .state
406                .compare_exchange(state, INITIAL_STATE, Ordering::Relaxed, Ordering::Relaxed)
407                .is_ok()
408            {
409                // Wake up all the readers.
410                self.wake_readers();
411            }
412        }
413    }
414
415    #[cold]
416    fn downgrade_slow(&self, mut state: i32) {
417        debug_assert!(has_blocked_reader(state));
418        loop {
419            if !has_blocked_reader(state) {
420                // Someone else must have woken up the readers.
421                return;
422            }
423
424            match self.state.compare_exchange(
425                state,
426                state - READER_BLOCKED_BIT,
427                Ordering::Relaxed,
428                Ordering::Relaxed,
429            ) {
430                Ok(_) => {
431                    // We cleared the READER_BLOCKED_BIT, so we need to wake the readers.
432                    self.wake_readers();
433                    return;
434                }
435                Err(observed_state) => {
436                    state = observed_state;
437                    continue;
438                }
439            }
440        }
441    }
442
443    fn wake_writer(&self) {
444        self.writer_queue.fetch_add(1, Ordering::Release);
445        // TODO: Track which thread owns this futex for priority inheritance.
446        self.writer_queue.wake(1);
447    }
448
449    fn wake_readers(&self) {
450        self.state.wake_all();
451    }
452}
453
454unsafe impl lock_api::RawRwLock for RawSyncRwLock {
455    const INIT: RawSyncRwLock =
456        RawSyncRwLock { state: zx::Futex::new(0), writer_queue: zx::Futex::new(0) };
457
458    // These operations do not need to happen on the same thread.
459    // However, we set this to no send to catch mistakes where folks accidentally hold a lock across
460    // an async await, which is often not intentional behavior and can lead to a deadlock. If
461    // sufficient need is required, this may be changed back to `lock_api::GuardSend`.
462    type GuardMarker = lock_api::GuardNoSend;
463
464    #[inline]
465    fn lock_shared(&self) {
466        if !self.try_lock_shared_fast() {
467            self.lock_shared_slow();
468        }
469    }
470
471    #[inline]
472    fn try_lock_shared(&self) -> bool {
473        self.try_lock_shared_fast()
474    }
475
476    #[inline]
477    unsafe fn unlock_shared(&self) {
478        let state = self.state.fetch_sub(READER_UNIT, Ordering::Release) - READER_UNIT;
479
480        // If we just released a reader, then we cannot have blocked readers unless we also have
481        // blocked writers because, otherwise, the reader would just have acquired the lock.
482        debug_assert!(!has_blocked_reader(state) || has_blocked_writer(state));
483
484        // If we were the last reader and there are writers blocked, we need to wake up the blocked
485        // writer.
486        if is_unlocked(state) && has_blocked_writer(state) {
487            self.unlock_slow(state);
488        }
489    }
490
491    #[inline]
492    fn lock_exclusive(&self) {
493        if self
494            .state
495            .compare_exchange_weak(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
496            .is_err()
497        {
498            self.lock_exclusive_slow();
499        }
500    }
501
502    #[inline]
503    fn try_lock_exclusive(&self) -> bool {
504        self.state
505            .compare_exchange(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
506            .is_ok()
507    }
508
509    #[inline]
510    unsafe fn unlock_exclusive(&self) {
511        let state = self.state.fetch_sub(WRITER_BIT, Ordering::Release) - WRITER_BIT;
512
513        // If we just released a writer, then there must not be any readers or writers.
514        debug_assert!(is_unlocked(state));
515
516        if has_blocked_reader(state) || has_blocked_writer(state) {
517            self.unlock_slow(state);
518        }
519    }
520}
521
522unsafe impl lock_api::RawRwLockDowngrade for RawSyncRwLock {
523    #[inline]
524    unsafe fn downgrade(&self) {
525        let state = self.state.fetch_add(READER_UNIT - WRITER_BIT, Ordering::Release);
526
527        if has_blocked_reader(state) {
528            self.downgrade_slow(state);
529        }
530    }
531}
532
533pub type RwLock<T> = lock_api::RwLock<RawSyncRwLock, T>;
534pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawSyncRwLock, T>;
535pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawSyncRwLock, T>;
536pub type MappedRwLockReadGuard<'a, T> = lock_api::MappedRwLockReadGuard<'a, RawSyncRwLock, T>;
537pub type MappedRwLockWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, RawSyncRwLock, T>;
538
539#[cfg(test)]
540mod test {
541    use super::*;
542    use std::sync::atomic::AtomicUsize;
543    use std::sync::Arc;
544
545    #[test]
546    fn test_write_and_read() {
547        let value = RwLock::<u32>::new(5);
548        let mut guard = value.write();
549        assert_eq!(*guard, 5);
550        *guard = 6;
551        assert_eq!(*guard, 6);
552        std::mem::drop(guard);
553
554        let guard = value.read();
555        assert_eq!(*guard, 6);
556    }
557
558    #[test]
559    fn test_try_during_read() {
560        let value = RwLock::<u32>::new(5);
561        let _read_guard = value.read();
562        assert!(value.try_write().is_none());
563        assert!(value.try_read().is_some());
564    }
565
566    #[test]
567    fn test_try_during_write() {
568        let value = RwLock::<u32>::new(5);
569        let _write_guard = value.write();
570        assert!(value.try_write().is_none());
571        assert!(value.try_read().is_none());
572    }
573
574    #[test]
575    fn test_downgrade() {
576        let value = RwLock::<u32>::new(5);
577        let mut guard = value.write();
578        assert_eq!(*guard, 5);
579        *guard = 6;
580        assert_eq!(*guard, 6);
581        assert!(value.try_write().is_none());
582        assert!(value.try_read().is_none());
583        let guard1 = RwLockWriteGuard::downgrade(guard);
584        assert_eq!(*guard1, 6);
585        assert!(value.try_write().is_none());
586        let guard2 = value.read();
587        assert_eq!(*guard2, 6);
588    }
589
590    struct State {
591        value: RwLock<u32>,
592        gate: zx::Futex,
593        writer_count: AtomicUsize,
594        reader_count: AtomicUsize,
595    }
596
597    impl Default for State {
598        fn default() -> Self {
599            Self {
600                value: Default::default(),
601                gate: zx::Futex::new(0),
602                writer_count: Default::default(),
603                reader_count: Default::default(),
604            }
605        }
606    }
607
608    impl State {
609        fn wait_for_gate(&self) {
610            while self.gate.load(Ordering::Acquire) == 0 {
611                // Ignore failures, we'll retry anyways.
612                self.gate.wait(0, None, zx::MonotonicInstant::INFINITE).ok();
613            }
614        }
615
616        fn open_gate(&self) {
617            self.gate.fetch_add(1, Ordering::Release);
618            self.gate.wake_all();
619        }
620
621        fn spawn_writer(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
622            std::thread::spawn(move || {
623                state.wait_for_gate();
624                for _ in 0..count {
625                    let mut guard = state.value.write();
626                    *guard = *guard + 1;
627                    let writer_count = state.writer_count.fetch_add(1, Ordering::Acquire) + 1;
628                    let reader_count = state.reader_count.load(Ordering::Acquire);
629                    state.writer_count.fetch_sub(1, Ordering::Release);
630                    std::mem::drop(guard);
631                    assert_eq!(writer_count, 1, "More than one writer held the RwLock at once.");
632                    assert_eq!(
633                        reader_count, 0,
634                        "A reader and writer held the RwLock at the same time."
635                    );
636                }
637            })
638        }
639
640        fn spawn_reader(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
641            std::thread::spawn(move || {
642                state.wait_for_gate();
643                for _ in 0..count {
644                    let guard = state.value.read();
645                    let observed_value = *guard;
646                    let reader_count = state.reader_count.fetch_add(1, Ordering::Acquire) + 1;
647                    let writer_count = state.writer_count.load(Ordering::Acquire);
648                    state.reader_count.fetch_sub(1, Ordering::Release);
649                    std::mem::drop(guard);
650                    assert!(observed_value < u32::MAX, "The value inside the RwLock underflowed.");
651                    assert_eq!(
652                        writer_count, 0,
653                        "A reader and writer held the RwLock at the same time."
654                    );
655                    assert!(reader_count > 0, "A reader held the RwLock without being counted.");
656                }
657            })
658        }
659    }
660
661    #[test]
662    fn test_thundering_writes() {
663        let state = Arc::new(State::default());
664        let mut threads = vec![];
665        for _ in 0..10 {
666            threads.push(State::spawn_writer(Arc::clone(&state), 100));
667        }
668
669        // Try to align the thundering herd to stress the RwLock as much as possible.
670        std::thread::sleep(std::time::Duration::from_millis(100));
671        state.open_gate();
672
673        while let Some(thread) = threads.pop() {
674            thread.join().expect("failed to join thread");
675        }
676        let guard = state.value.read();
677        assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
678    }
679
680    #[test]
681    fn test_thundering_reads_and_writes() {
682        let state = Arc::new(State::default());
683        let mut threads = vec![];
684        for _ in 0..10 {
685            let state = Arc::clone(&state);
686            threads.push(State::spawn_writer(Arc::clone(&state), 100));
687            threads.push(State::spawn_reader(Arc::clone(&state), 100));
688        }
689
690        // Try to align the thundering herd to stress the RwLock as much as possible.
691        std::thread::sleep(std::time::Duration::from_millis(100));
692        state.open_gate();
693
694        while let Some(thread) = threads.pop() {
695            thread.join().expect("failed to join thread");
696        }
697        let guard = state.value.read();
698        assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
699    }
700}