fuchsia_sync/rwlock.rs
1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::atomic::Ordering;
6
7pub struct RawSyncRwLock {
8 /// Holds the primary state of the RwLock.
9 ///
10 /// See the constants declared below for the semantics of this value.
11 ///
12 /// Readers will zx_futex_wait on this address.
13 ///
14 /// Ordering rules:
15 ///
16 /// * Any store operation which may release the lock must use Ordering::Release on state to
17 /// establish a happens-before relationship with the next lock acquisition.
18 /// * Any load operation which may acquire the lock must use Ordering::Acquire on state to
19 /// establish a happens-before relationship with the previous lock release.
20 state: zx::Futex,
21
22 /// The queue of writers waiting to obtain this lock.
23 ///
24 /// The value of this field is just a generation counter for this queue.
25 ///
26 /// Writers will zx_futex_wait on this address with the current generation number.
27 ///
28 /// Ordering rules:
29 ///
30 /// * Stores to writer_queue must be preceded by a store to state and use Ordering::Release.
31 /// * Loads from writer_queue must use Ordering::Acquire and be followed by a load of state.
32 writer_queue: zx::Futex,
33}
34
35const INITIAL_STATE: i32 = 0;
36
37/// If this bit is set in `state`, then the lock is held exclusively (i.e., as a writer) by the
38/// thread that set this bit.
39const WRITER_BIT: i32 = 0b0001;
40
41/// If this bit is set in `state`, then a writer wished to acquire exclusive access to this lock
42/// but observed a reader or a writer holding the lock. The writer will fetch the currentgeneration
43/// number for `writer_queue`, re-check `state`, and then zx_futex_wait on the `writer_queue`.
44const WRITER_BLOCKED_BIT: i32 = 0b0010;
45
46/// If this bit is set in `state`, then a reader wished to acquire shared access to this lock
47/// but could not because either (a) the lock was held exclusively by a writer or (b) a writer
48/// was already blocked waiting for the lock. This second condition is necessary to avoid
49/// starving writers: once a writer is blocked, readers that could otherwise have acquired
50/// shared access to the lock become blocked waiting for at least one writer to acquire the lock.
51const READER_BLOCKED_BIT: i32 = 0b0100;
52
53/// The amount `state` is incremented when a reader acquires the lock. The `state` tracks the
54/// number of outstanding readers so that once all the readers have released their shared access,
55/// the lock can be made available for exclusive access again.
56///
57/// We count the readers in the high bits of the state so that we can use arithmetic overflow to
58/// detect when too many readers have acquired the lock for us to keep track of.
59const READER_UNIT: i32 = 0b1000;
60
61/// A mask to select only the bits that count the number of readers holding shared access to the
62/// lock.
63const READER_MASK: i32 = !0b0111;
64
65/// # STATE MACHINE
66///
67/// The RwLock goes through the following states:
68///
69/// ## Initial
70///
71/// In the "Initial" state, the `state` is zero. No thread has access to the lock and no threads
72/// are waiting.
73///
74/// * If a reader tries to acquire the lock => Shared access (unblocked)
75/// * If a writer tries to acquire the lock => Exclusive access (unblocked)
76/// * If a previously blocked writer acquires the lock => Exclusive access (writers blocked)
77///
78/// ## Shared access (unblocked)
79///
80/// In this state, `state & READER_MASK` is non-zero and other bits are unset. A non-zero
81/// number of threads have shared access to the lock and no threads are waiting.
82///
83/// Additional readers can acquire shared access to the lock without entering the kernel.
84///
85/// * If a reader tries to acquire the lock => Shared access (unblocked)
86/// * If a writer tries to acquire the lock => Shared access (writers blocked)
87/// * If the last reader releases the lock => Initial
88///
89/// ## Shared access (writers blocked)
90///
91/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT is set, and other bits are
92/// unset. A non-zero number of threads have shared access to the lock and a non-zero number of
93/// writers are waiting for exclusive access.
94///
95/// The lock is contended and requires kernel coordination to wake the blocked threads.
96///
97/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
98/// * If a writer tries to acquire the lock => Shared access (writers blocked)
99/// * If the last reader releases the lock => Exclusive access (writers blocked)
100///
101/// ## Shared access (readers and writers blocked)
102///
103/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT and READER_BLOCKED_BIT are
104/// set, and other bits are unset. A non-zero number of threads have shared access to the lock,
105/// a non-zero number of writers are waiting for exclusive access, and a non-zero number of writers
106/// are waiting for shared access.
107///
108/// The lock is contended and requires kernel coordination to wake the blocked threads.
109///
110/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
111/// * If a writer tries to acquire the lock => Shared access (readers and writers blocked)
112/// * If the last reader releases the lock => Exclusive access (readers and writers blocked)
113///
114/// ## Exclusive access (unblocked)
115///
116/// In this state, WRITER_BIT is set and other bits are unset. Exactly one thread has exclusive
117/// access to the lock and no threads are waiting.
118///
119/// The writer can release the lock without entering the kernel.
120///
121/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
122/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
123/// * If a writer tries to downgrade the lock => Shared access (unblocked)
124/// * If the writer releases the lock => Initial
125///
126/// ## Exclusive access (writers blocked)
127///
128/// In this state, WRITER_BIT and WRITER_BLOCKED_BIT are set and other bits are unset. Exactly one
129/// thread has exclusive access to the lock and zero or more writers are waiting for exclusive
130/// access.
131///
132/// When the writer release the lock, the state transitions to the "Initial state" and then the
133/// lock wakes up one of the writers, if any exist. If this previously waiting writer succeeds in
134/// acquiring the lock, the state machine returns to the "Exclusive access (writers blocked)" state
135/// because we do not know how many writers are blocked waiting for exclusive access.
136///
137/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
138/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
139/// * If a writer tries to downgrade the lock => Shared access (writers blocked)
140/// * If the writer releases the lock => Initial
141///
142/// ## Exclusive access (readers blocked)
143///
144/// In this state, WRITER_BIT and READER_BLOCKED_BIT are set and other bits are unset. Exactly one
145/// thread has exclusive access to the lock and zero or more writers are waiting for shared
146/// access.
147///
148/// When the writer release the lock, the state transitions to the initial state and then the lock
149/// wakes up any blocked readers.
150///
151/// * If a reader tries to acquire the lock => Exclusive access (readers blocked)
152/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
153/// * If a writer tries to downgrade the lock => Unique reader (readers blocked)
154/// * If the writer releases the lock => Initial
155///
156/// ## Exclusive access (readers and writers blocked)
157///
158/// In this state, WRITER_BIT, WRITER_BLOCKED_BIT, and READER_BLOCKED_BIT are set and other bits
159/// are unset. Exactly one thread has exclusive access to the lock and zero or more writers are
160/// waiting for exclusive access, and a non-zero number of readers are waiting for shared access.
161///
162/// The lock is contended and requires kernel coordination to wake the blocked threads.
163///
164/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
165/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
166/// * If a writer tries to downgrade the lock => Unique reader (readers and writers blocked)
167/// * If the writer releases the lock => Unlocked (readers blocked)
168///
169/// ## Unlocked (readers blocked)
170///
171/// In this state, READER_BLOCKED_BIT is set and other bits are unset. No thread has access to the
172/// lock and a non-zero number of readers are waiting for shared access.
173///
174/// This state is transitory and the state machine will leave this state without outside
175/// intervention by returning to the "Initial" state and waking any blocked readers.
176///
177/// * If a reader tries to acquire the lock => Unlocked (readers blocked)
178/// * If a writer tries to acquire the lock => Exclusive access (readers blocked)
179/// * Otherwise => Initial
180///
181/// ## Unique reader (readers blocked)
182///
183/// In this state, there is exactly one reader, who is running on the current thread, the
184/// READER_BLOCKED_BIT is set and and other bits are unset. A non-zero number of readers are
185/// waiting for shared access.
186///
187/// This state is transitory and the state machine will leave this state without outside
188/// intervention by moving to the "Shared access (unblocked)" state and waking any blocked
189/// readers.
190///
191/// * If a reader tries to acquire the lock => Unique reader (readers blocked)
192/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
193/// * Otherwise => Shared access (unblocked)
194///
195/// ## Unique reader (readers and writers blocked)
196///
197/// In this state, there is exactly one reader, who is running on the current thread, and the
198/// READER_BLOCKED_BIT and the WRITER_BLOCKED_BIT are set. Zero or more writers are waiting for
199/// exclusive access, and a non-zero number of readers are waiting for shared access.
200///
201/// This state is transitory and the state machine will leave this state without outside
202/// intervention by moving to the "Shared access (writers blocked)" state and waking any blocked
203/// readers.
204///
205///
206/// * If a reader tries to acquire the lock => Unique reader (readers and writers blocked)
207/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
208/// * Otherwise => Shared access (writers blocked)
209
210fn is_locked_exclusive(state: i32) -> bool {
211 state & WRITER_BIT != 0
212}
213
214fn has_blocked_writer(state: i32) -> bool {
215 state & WRITER_BLOCKED_BIT != 0
216}
217
218fn has_blocked_reader(state: i32) -> bool {
219 state & READER_BLOCKED_BIT != 0
220}
221
222fn can_lock_shared(state: i32) -> bool {
223 !is_locked_exclusive(state) && !has_blocked_writer(state) && !has_blocked_reader(state)
224}
225
226fn is_unlocked(state: i32) -> bool {
227 state & (WRITER_BIT | READER_MASK) == 0
228}
229
230impl RawSyncRwLock {
231 #[inline]
232 fn try_lock_shared_fast(&self) -> bool {
233 let state = self.state.load(Ordering::Relaxed);
234 if can_lock_shared(state) {
235 if let Some(new_state) = state.checked_add(READER_UNIT) {
236 return self
237 .state
238 .compare_exchange(state, new_state, Ordering::Acquire, Ordering::Relaxed)
239 .is_ok();
240 }
241 }
242 false
243 }
244
245 #[cold]
246 fn lock_shared_slow(&self) {
247 let mut state = self.state.load(Ordering::Relaxed);
248 loop {
249 if can_lock_shared(state) {
250 let new_state =
251 state.checked_add(READER_UNIT).expect("overflowed reader count in rwlock");
252 match self.state.compare_exchange_weak(
253 state,
254 new_state,
255 Ordering::Acquire,
256 Ordering::Relaxed,
257 ) {
258 Ok(_) => return, // Acquired shared lock.
259 Err(observed_state) => {
260 state = observed_state;
261 continue;
262 }
263 }
264 }
265
266 let desired_sleep_state = state | READER_BLOCKED_BIT;
267
268 if !has_blocked_reader(state) {
269 if let Err(observed_state) = self.state.compare_exchange(
270 state,
271 desired_sleep_state,
272 Ordering::Relaxed,
273 Ordering::Relaxed,
274 ) {
275 state = observed_state;
276 continue;
277 }
278 }
279
280 // Ignore spurious wakeups, the loop will retry.
281 self.state
282 .wait(
283 desired_sleep_state,
284 None, // We don't integrate with priority inheritance yet.
285 zx::MonotonicInstant::INFINITE,
286 )
287 .ok();
288 state = self.state.load(Ordering::Relaxed);
289 }
290 }
291
292 #[cold]
293 fn lock_exclusive_slow(&self) {
294 let mut state = self.state.load(Ordering::Relaxed);
295 let mut other_writers_bit = 0;
296
297 loop {
298 if is_unlocked(state) {
299 match self.state.compare_exchange_weak(
300 state,
301 state | WRITER_BIT | other_writers_bit,
302 Ordering::Acquire,
303 Ordering::Relaxed,
304 ) {
305 Ok(_) => return, // Acquired exclusive lock.
306 Err(observed_state) => {
307 state = observed_state;
308 continue;
309 }
310 }
311 }
312
313 if !has_blocked_writer(state) {
314 if let Err(observed_state) = self.state.compare_exchange(
315 state,
316 state | WRITER_BLOCKED_BIT,
317 Ordering::Relaxed,
318 Ordering::Relaxed,
319 ) {
320 state = observed_state;
321 continue;
322 }
323 }
324
325 other_writers_bit = WRITER_BLOCKED_BIT;
326
327 let generation_number = self.writer_queue.load(Ordering::Acquire);
328
329 // Before we go to sleep on the writer_queue at the fetched generation number, we need
330 // to make sure that some other thread is going to wake that generation of sleeping
331 // writers. If we didn't fetch the state again, it's possible that another thread could
332 // have cleared the WRITER_BLOCKED_BIT in the state and incremented the generation
333 // number between the last time we observed state and the time we observed the
334 // generation number.
335 //
336 // By observing the WRITER_BLOCKED_BIT *after* fetching the generation number, we
337 // ensure that either (a) this generation has already been awoken or (b) whoever clears
338 // the WRITER_BLOCKED_BIT bit will wake this generation in the future.
339 state = self.state.load(Ordering::Relaxed);
340
341 // If the lock is available or the WRITER_BLOCKED_BIT is missing, try again. No one has
342 // promised to wake the observed generation number.
343 if is_unlocked(state) || !has_blocked_writer(state) {
344 continue;
345 }
346
347 // Ignore spurious wakeups here, the loop will retry.
348 self.writer_queue
349 .wait(
350 generation_number,
351 None, // We don't integrate with priority inheritance yet.
352 zx::MonotonicInstant::INFINITE,
353 )
354 .ok();
355
356 state = self.state.load(Ordering::Relaxed);
357 }
358 }
359
360 #[cold]
361 fn unlock_slow(&self, mut state: i32) {
362 debug_assert!(is_unlocked(state));
363
364 // There are only writers waiting.
365 if state == WRITER_BLOCKED_BIT {
366 match self.state.compare_exchange(
367 state,
368 INITIAL_STATE,
369 Ordering::Relaxed,
370 Ordering::Relaxed,
371 ) {
372 Ok(_) => {
373 self.wake_writer();
374 // We either made progress by waking a waiter or no one is waiting for this
375 // lock anymore.
376 return;
377 }
378 Err(observed_state) => {
379 state = observed_state;
380 }
381 }
382 }
383
384 // There are both readers and writers waiting.
385 if state == READER_BLOCKED_BIT | WRITER_BLOCKED_BIT {
386 // Attempt to clear the WRITER_BLOCKED_BIT.
387 if self
388 .state
389 .compare_exchange(state, READER_BLOCKED_BIT, Ordering::Relaxed, Ordering::Relaxed)
390 .is_err()
391 {
392 // The state changed, which means another thread made progress. We're done.
393 return;
394 }
395 self.wake_writer();
396 // We cannot be sure that we actually work up a writer, which means we also need to
397 // wake up the readers to avoid the situation where a stack of readers are waiting for
398 // a non-existent writer to be done.
399 state = READER_BLOCKED_BIT;
400 }
401
402 // There are only readers waiting.
403 if state == READER_BLOCKED_BIT {
404 if self
405 .state
406 .compare_exchange(state, INITIAL_STATE, Ordering::Relaxed, Ordering::Relaxed)
407 .is_ok()
408 {
409 // Wake up all the readers.
410 self.wake_readers();
411 }
412 }
413 }
414
415 #[cold]
416 fn downgrade_slow(&self, mut state: i32) {
417 debug_assert!(has_blocked_reader(state));
418 loop {
419 if !has_blocked_reader(state) {
420 // Someone else must have woken up the readers.
421 return;
422 }
423
424 match self.state.compare_exchange(
425 state,
426 state - READER_BLOCKED_BIT,
427 Ordering::Relaxed,
428 Ordering::Relaxed,
429 ) {
430 Ok(_) => {
431 // We cleared the READER_BLOCKED_BIT, so we need to wake the readers.
432 self.wake_readers();
433 return;
434 }
435 Err(observed_state) => {
436 state = observed_state;
437 continue;
438 }
439 }
440 }
441 }
442
443 fn wake_writer(&self) {
444 self.writer_queue.fetch_add(1, Ordering::Release);
445 // TODO: Track which thread owns this futex for priority inheritance.
446 self.writer_queue.wake(1);
447 }
448
449 fn wake_readers(&self) {
450 self.state.wake_all();
451 }
452}
453
454unsafe impl lock_api::RawRwLock for RawSyncRwLock {
455 const INIT: RawSyncRwLock =
456 RawSyncRwLock { state: zx::Futex::new(0), writer_queue: zx::Futex::new(0) };
457
458 // These operations do not need to happen on the same thread.
459 // However, we set this to no send to catch mistakes where folks accidentally hold a lock across
460 // an async await, which is often not intentional behavior and can lead to a deadlock. If
461 // sufficient need is required, this may be changed back to `lock_api::GuardSend`.
462 type GuardMarker = lock_api::GuardNoSend;
463
464 #[inline]
465 fn lock_shared(&self) {
466 if !self.try_lock_shared_fast() {
467 self.lock_shared_slow();
468 }
469 }
470
471 #[inline]
472 fn try_lock_shared(&self) -> bool {
473 self.try_lock_shared_fast()
474 }
475
476 #[inline]
477 unsafe fn unlock_shared(&self) {
478 let state = self.state.fetch_sub(READER_UNIT, Ordering::Release) - READER_UNIT;
479
480 // If we just released a reader, then we cannot have blocked readers unless we also have
481 // blocked writers because, otherwise, the reader would just have acquired the lock.
482 debug_assert!(!has_blocked_reader(state) || has_blocked_writer(state));
483
484 // If we were the last reader and there are writers blocked, we need to wake up the blocked
485 // writer.
486 if is_unlocked(state) && has_blocked_writer(state) {
487 self.unlock_slow(state);
488 }
489 }
490
491 #[inline]
492 fn lock_exclusive(&self) {
493 if self
494 .state
495 .compare_exchange_weak(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
496 .is_err()
497 {
498 self.lock_exclusive_slow();
499 }
500 }
501
502 #[inline]
503 fn try_lock_exclusive(&self) -> bool {
504 self.state
505 .compare_exchange(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
506 .is_ok()
507 }
508
509 #[inline]
510 unsafe fn unlock_exclusive(&self) {
511 let state = self.state.fetch_sub(WRITER_BIT, Ordering::Release) - WRITER_BIT;
512
513 // If we just released a writer, then there must not be any readers or writers.
514 debug_assert!(is_unlocked(state));
515
516 if has_blocked_reader(state) || has_blocked_writer(state) {
517 self.unlock_slow(state);
518 }
519 }
520}
521
522unsafe impl lock_api::RawRwLockDowngrade for RawSyncRwLock {
523 #[inline]
524 unsafe fn downgrade(&self) {
525 let state = self.state.fetch_add(READER_UNIT - WRITER_BIT, Ordering::Release);
526
527 if has_blocked_reader(state) {
528 self.downgrade_slow(state);
529 }
530 }
531}
532
533pub type RwLock<T> = lock_api::RwLock<RawSyncRwLock, T>;
534pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawSyncRwLock, T>;
535pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawSyncRwLock, T>;
536pub type MappedRwLockReadGuard<'a, T> = lock_api::MappedRwLockReadGuard<'a, RawSyncRwLock, T>;
537pub type MappedRwLockWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, RawSyncRwLock, T>;
538
539#[cfg(test)]
540mod test {
541 use super::*;
542 use std::sync::atomic::AtomicUsize;
543 use std::sync::Arc;
544
545 #[test]
546 fn test_write_and_read() {
547 let value = RwLock::<u32>::new(5);
548 let mut guard = value.write();
549 assert_eq!(*guard, 5);
550 *guard = 6;
551 assert_eq!(*guard, 6);
552 std::mem::drop(guard);
553
554 let guard = value.read();
555 assert_eq!(*guard, 6);
556 }
557
558 #[test]
559 fn test_try_during_read() {
560 let value = RwLock::<u32>::new(5);
561 let _read_guard = value.read();
562 assert!(value.try_write().is_none());
563 assert!(value.try_read().is_some());
564 }
565
566 #[test]
567 fn test_try_during_write() {
568 let value = RwLock::<u32>::new(5);
569 let _write_guard = value.write();
570 assert!(value.try_write().is_none());
571 assert!(value.try_read().is_none());
572 }
573
574 #[test]
575 fn test_downgrade() {
576 let value = RwLock::<u32>::new(5);
577 let mut guard = value.write();
578 assert_eq!(*guard, 5);
579 *guard = 6;
580 assert_eq!(*guard, 6);
581 assert!(value.try_write().is_none());
582 assert!(value.try_read().is_none());
583 let guard1 = RwLockWriteGuard::downgrade(guard);
584 assert_eq!(*guard1, 6);
585 assert!(value.try_write().is_none());
586 let guard2 = value.read();
587 assert_eq!(*guard2, 6);
588 }
589
590 struct State {
591 value: RwLock<u32>,
592 gate: zx::Futex,
593 writer_count: AtomicUsize,
594 reader_count: AtomicUsize,
595 }
596
597 impl Default for State {
598 fn default() -> Self {
599 Self {
600 value: Default::default(),
601 gate: zx::Futex::new(0),
602 writer_count: Default::default(),
603 reader_count: Default::default(),
604 }
605 }
606 }
607
608 impl State {
609 fn wait_for_gate(&self) {
610 while self.gate.load(Ordering::Acquire) == 0 {
611 // Ignore failures, we'll retry anyways.
612 self.gate.wait(0, None, zx::MonotonicInstant::INFINITE).ok();
613 }
614 }
615
616 fn open_gate(&self) {
617 self.gate.fetch_add(1, Ordering::Release);
618 self.gate.wake_all();
619 }
620
621 fn spawn_writer(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
622 std::thread::spawn(move || {
623 state.wait_for_gate();
624 for _ in 0..count {
625 let mut guard = state.value.write();
626 *guard = *guard + 1;
627 let writer_count = state.writer_count.fetch_add(1, Ordering::Acquire) + 1;
628 let reader_count = state.reader_count.load(Ordering::Acquire);
629 state.writer_count.fetch_sub(1, Ordering::Release);
630 std::mem::drop(guard);
631 assert_eq!(writer_count, 1, "More than one writer held the RwLock at once.");
632 assert_eq!(
633 reader_count, 0,
634 "A reader and writer held the RwLock at the same time."
635 );
636 }
637 })
638 }
639
640 fn spawn_reader(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
641 std::thread::spawn(move || {
642 state.wait_for_gate();
643 for _ in 0..count {
644 let guard = state.value.read();
645 let observed_value = *guard;
646 let reader_count = state.reader_count.fetch_add(1, Ordering::Acquire) + 1;
647 let writer_count = state.writer_count.load(Ordering::Acquire);
648 state.reader_count.fetch_sub(1, Ordering::Release);
649 std::mem::drop(guard);
650 assert!(observed_value < u32::MAX, "The value inside the RwLock underflowed.");
651 assert_eq!(
652 writer_count, 0,
653 "A reader and writer held the RwLock at the same time."
654 );
655 assert!(reader_count > 0, "A reader held the RwLock without being counted.");
656 }
657 })
658 }
659 }
660
661 #[test]
662 fn test_thundering_writes() {
663 let state = Arc::new(State::default());
664 let mut threads = vec![];
665 for _ in 0..10 {
666 threads.push(State::spawn_writer(Arc::clone(&state), 100));
667 }
668
669 // Try to align the thundering herd to stress the RwLock as much as possible.
670 std::thread::sleep(std::time::Duration::from_millis(100));
671 state.open_gate();
672
673 while let Some(thread) = threads.pop() {
674 thread.join().expect("failed to join thread");
675 }
676 let guard = state.value.read();
677 assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
678 }
679
680 #[test]
681 fn test_thundering_reads_and_writes() {
682 let state = Arc::new(State::default());
683 let mut threads = vec![];
684 for _ in 0..10 {
685 let state = Arc::clone(&state);
686 threads.push(State::spawn_writer(Arc::clone(&state), 100));
687 threads.push(State::spawn_reader(Arc::clone(&state), 100));
688 }
689
690 // Try to align the thundering herd to stress the RwLock as much as possible.
691 std::thread::sleep(std::time::Duration::from_millis(100));
692 state.open_gate();
693
694 while let Some(thread) = threads.pop() {
695 thread.join().expect("failed to join thread");
696 }
697 let guard = state.value.read();
698 assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
699 }
700}