1use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12 BoxedLayerIterator, Item, ItemCount, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13 LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{Version, LATEST_VERSION};
16use anyhow::{bail, Error};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cell::UnsafeCell;
20use std::cmp::{min, Ordering};
21use std::collections::BTreeMap;
22use std::ops::{Bound, Range};
23use std::sync::atomic::{self, AtomicPtr, AtomicU32};
24use std::sync::Arc;
25
26struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32 fn new(count: usize) -> PointerList<K, V> {
33 let mut pointers = Vec::new();
34 for _ in 0..count {
35 pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36 }
37 PointerList(pointers.into_boxed_slice())
38 }
39
40 fn len(&self) -> usize {
41 self.0.len()
42 }
43
44 fn get_mut<'a>(&self, index: usize) -> Option<&'a mut SkipListNode<K, V>> {
46 unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_mut() }
47 }
48
49 fn get<'a>(&self, index: usize) -> Option<&'a SkipListNode<K, V>> {
51 unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_ref() }
52 }
53
54 fn set(&self, index: usize, node: Option<&SkipListNode<K, V>>) {
56 self.0[index].store(
57 match node {
58 None => std::ptr::null_mut(),
59 Some(node) => {
60 unsafe {
63 (&*(node as *const SkipListNode<K, V>
64 as *const UnsafeCell<SkipListNode<K, V>>))
65 .get()
66 }
67 }
68 },
69 atomic::Ordering::SeqCst,
70 );
71 }
72
73 fn get_ptr(&self, index: usize) -> *mut SkipListNode<K, V> {
74 self.0[index].load(atomic::Ordering::SeqCst)
75 }
76}
77
78struct SkipListNode<K, V> {
79 item: Item<K, V>,
80 pointers: PointerList<K, V>,
81}
82
83pub struct SkipListLayer<K, V> {
84 pointers: PointerList<K, V>,
86
87 inner: Mutex<Inner<K, V>>,
88
89 write_lock: Mutex<()>,
91
92 allocated: AtomicU32,
94
95 close_event: Mutex<Option<Arc<DropEvent>>>,
96}
97
98struct Inner<K, V> {
104 epoch: u64,
108
109 current_count: u16,
111
112 erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
114
115 item_count: usize,
117}
118
119struct EpochEraseList<K, V> {
124 count: u16,
127 range: Range<*mut SkipListNode<K, V>>,
130}
131
132unsafe impl<K, V> Send for Inner<K, V> {}
134
135impl<K, V> Inner<K, V> {
136 fn new() -> Self {
137 Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
138 }
139
140 fn free_erase_list(
141 &mut self,
142 owner: &SkipListLayer<K, V>,
143 list: Range<*mut SkipListNode<K, V>>,
144 ) {
145 let mut maybe_node = unsafe { list.start.as_mut() };
146 loop {
147 match maybe_node {
148 Some(node) if node as *const _ != list.end => {
149 maybe_node = owner.free_node(node);
150 }
151 _ => break,
152 }
153 }
154 }
155}
156
157impl<K, V> SkipListLayer<K, V> {
158 pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
159 Arc::new(SkipListLayer {
160 pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
161 inner: Mutex::new(Inner::new()),
162 write_lock: Mutex::new(()),
163 allocated: AtomicU32::new(0),
164 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
165 })
166 }
167
168 pub fn len(&self) -> usize {
169 self.inner.lock().item_count
170 }
171
172 fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
173 self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
174 Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
175 }
176
177 #[allow(clippy::mut_from_ref)]
180 fn free_node(&self, node: &mut SkipListNode<K, V>) -> Option<&mut SkipListNode<K, V>> {
181 self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
182 unsafe { Box::from_raw(node).pointers.get_mut(0) }
184 }
185}
186
187impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
188 pub fn erase(&self, key: &K)
190 where
191 K: std::cmp::Eq,
192 {
193 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
194 if let Some(ItemRef { key: k, .. }) = iter.get() {
195 if k == key {
196 iter.erase();
197 } else {
198 warn!("Attempt to erase key not present!");
199 }
200 }
201 iter.commit();
202 }
203
204 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
206 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
207 if let Some(found_item) = iter.get() {
208 if found_item.key == &item.key {
209 bail!("Attempted to insert an existing key");
210 }
211 }
212 iter.insert(item);
213 Ok(())
214 }
215
216 pub fn replace_or_insert(&self, item: Item<K, V>) {
218 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
219 if let Some(found_item) = iter.get() {
220 if found_item.key == &item.key {
221 iter.erase();
222 }
223 }
224 iter.insert(item);
225 }
226
227 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
229 merge::merge_into(
230 Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
231 item,
232 merge_fn,
233 )
234 .unwrap();
235 }
236}
237
238impl<K, V> Drop for SkipListLayer<K, V> {
240 fn drop(&mut self) {
241 let mut next = self.pointers.get_mut(0);
242 while let Some(node) = next {
243 next = self.free_node(node);
244 }
245 assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
246 }
247}
248
249#[async_trait]
250impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
251 async fn seek<'a>(
252 &'a self,
253 bound: std::ops::Bound<&K>,
254 ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
255 Ok(Box::new(SkipListLayerIter::new(self, bound)))
256 }
257
258 fn lock(&self) -> Option<Arc<DropEvent>> {
259 self.close_event.lock().clone()
260 }
261
262 fn estimated_len(&self) -> ItemCount {
263 ItemCount::Precise(self.inner.lock().item_count)
264 }
265
266 async fn close(&self) {
267 let listener = self.close_event.lock().take().expect("close already called").listen();
268 listener.await;
269 }
270
271 fn get_version(&self) -> Version {
272 return LATEST_VERSION;
275 }
276
277 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
278 node.record_bool("persistent", false);
279 node.record_uint("num_items", self.inner.lock().item_count as u64);
280 }
281}
282
283struct SkipListLayerIter<'a, K, V> {
286 skip_list: &'a SkipListLayer<K, V>,
287
288 epoch: u64,
290
291 node: Option<&'a SkipListNode<K, V>>,
293}
294
295impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
296 fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
297 let epoch = {
298 let mut inner = skip_list.inner.lock();
299 inner.current_count += 1;
300 inner.epoch
301 };
302 let (included, key) = match bound {
303 Bound::Unbounded => {
304 return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
305 }
306 Bound::Included(key) => (true, key),
307 Bound::Excluded(key) => (false, key),
308 };
309 let mut last_pointers = &skip_list.pointers;
310
311 let mut node = None;
315 for index in (0..skip_list.pointers.len()).rev() {
316 loop {
318 node = last_pointers.get(index);
319 if let Some(node) = node {
320 match &node.item.key.cmp_upper_bound(key) {
321 Ordering::Equal if included => break,
322 Ordering::Greater => break,
323 _ => {}
324 }
325 last_pointers = &node.pointers;
326 } else {
327 break;
328 }
329 }
330 }
331 SkipListLayerIter { skip_list, epoch, node }
332 }
333}
334
335impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
336 fn drop(&mut self) {
337 let mut inner = self.skip_list.inner.lock();
338 if self.epoch == inner.epoch {
339 inner.current_count -= 1;
340 } else {
341 if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
342 erase_list.count -= 1;
343 if erase_list.count == 0 {
344 while let Some(entry) = inner.erase_lists.first_entry() {
345 if entry.get().count == 0 {
346 let range = entry.remove_entry().1.range;
347 inner.free_erase_list(self.skip_list, range);
348 } else {
349 break;
350 }
351 }
352 }
353 }
354 }
355 }
356}
357
358#[async_trait]
359impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
360 async fn advance(&mut self) -> Result<(), Error> {
361 match self.node {
362 None => {}
363 Some(node) => self.node = node.pointers.get(0),
364 }
365 Ok(())
366 }
367
368 fn get(&self) -> Option<ItemRef<'_, K, V>> {
369 self.node.map(|node| node.item.as_item_ref())
370 }
371}
372
373type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
374
375pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
383 skip_list: &'a SkipListLayer<K, V>,
384
385 prev_pointers: PointerListRefArray<'a, K, V>,
389
390 insertion_point: Option<PointerListRefArray<'a, K, V>>,
393
394 insertion_nodes: PointerList<K, V>,
396
397 #[allow(dead_code)]
400 write_guard: MutexGuard<'a, ()>,
401
402 item_delta: isize,
404}
405
406impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
407 pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
408 let write_guard = skip_list.write_lock.lock();
409 let len = skip_list.pointers.len();
410
411 let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
427 match bound {
428 Bound::Unbounded => {}
429 Bound::Included(key) => {
430 let pointers = &mut prev_pointers;
431 for index in (0..len).rev() {
432 while let Some(node) = pointers[index].get(index) {
433 match &(node.item.key).cmp_upper_bound(key) {
436 Ordering::Equal | Ordering::Greater => break,
437 Ordering::Less => {}
438 }
439 pointers[index] = &node.pointers;
440 }
441 if index > 0 {
442 pointers[index - 1] = pointers[index];
443 }
444 }
445 }
446 Bound::Excluded(_) => panic!("Excluded bounds not supported"),
447 }
448 SkipListLayerIterMut {
449 skip_list,
450 prev_pointers,
451 insertion_point: None,
452 insertion_nodes: PointerList::new(len),
453 write_guard,
454 item_delta: 0,
455 }
456 }
457}
458
459impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
460 fn drop(&mut self) {
461 self.commit();
462 }
463}
464
465impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
466 fn advance(&mut self) {
467 if self.insertion_point.is_some() {
468 if let Some(item) = self.get() {
469 let copy = item.cloned();
471 self.insert(copy);
472 self.erase();
473 }
474 } else {
475 let pointers = &mut self.prev_pointers;
476 if let Some(next) = pointers[0].get_mut(0) {
477 for i in 0..next.pointers.len() {
478 pointers[i] = &next.pointers;
479 }
480 }
481 }
482 }
483
484 fn get(&self) -> Option<ItemRef<'_, K, V>> {
485 self.prev_pointers[0].get(0).map(|node| node.item.as_item_ref())
486 }
487
488 fn insert(&mut self, item: Item<K, V>) {
489 use rand::Rng;
490 let mut rng = rand::thread_rng();
491 let max_pointers = self.skip_list.pointers.len();
492 let pointer_count = max_pointers
495 - min(
496 (rng.gen_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
497 max_pointers - 1,
498 );
499 let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
500 if self.insertion_point.is_none() {
501 self.insertion_point = Some(self.prev_pointers.clone());
502 }
503 for i in 0..pointer_count {
504 let pointers = self.prev_pointers[i];
505 node.pointers.set(i, pointers.get(i));
506 if self.insertion_nodes.get(i).is_none() {
507 self.insertion_nodes.set(i, Some(node));
510 } else {
511 pointers.set(i, Some(&node));
514 }
515 self.prev_pointers[i] = &node.pointers;
517 }
518 self.item_delta += 1;
519 }
520
521 fn erase(&mut self) {
522 let pointers = &mut self.prev_pointers;
523 if let Some(next) = pointers[0].get_mut(0) {
524 if self.insertion_point.is_none() {
525 self.insertion_point = Some(pointers.clone());
526 }
527 if self.insertion_nodes.get(0).is_none() {
528 pointers[0] = &next.pointers;
531 } else {
532 pointers[0].set(0, next.pointers.get(0));
537 }
538 for i in 1..next.pointers.len() {
541 pointers[i].set(i, next.pointers.get(i));
542 }
543 }
544 self.item_delta -= 1;
545 }
546
547 fn commit(&mut self) {
550 let prev_pointers = match self.insertion_point.take() {
552 Some(prev_pointers) => prev_pointers,
553 None => return,
554 };
555
556 let maybe_erase = prev_pointers[0].get_mut(0);
558
559 if self.insertion_nodes.get(0).is_none() {
561 prev_pointers[0].set(0, self.prev_pointers[0].get(0));
565 } else {
566 for i in 0..self.insertion_nodes.len() {
570 if let Some(node) = self.insertion_nodes.get_mut(i) {
571 prev_pointers[i].set(i, Some(node));
572 }
573 }
574 }
575
576 let mut inner = self.skip_list.inner.lock();
578 inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
579 if let Some(start) = maybe_erase {
580 let end = self.prev_pointers[0].get_ptr(0);
581 if start as *mut _ != end {
582 if inner.current_count > 0 || !inner.erase_lists.is_empty() {
583 let count = std::mem::take(&mut inner.current_count);
584 let epoch = inner.epoch;
585 inner.erase_lists.insert(epoch, EpochEraseList { count, range: start..end });
586 inner.epoch = inner.epoch.wrapping_add(1);
587 } else {
588 inner.free_erase_list(self.skip_list, start..end);
589 }
590 }
591 }
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::{SkipListLayer, SkipListLayerIterMut};
598 use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
599 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
600 use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
601 use crate::lsm_tree::types::{
602 DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerIterator,
603 LayerIteratorMut, SortByU64,
604 };
605 use crate::serialized_types::{
606 versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
607 };
608 use assert_matches::assert_matches;
609 use fprint::TypeFingerprint;
610 use fuchsia_async as fasync;
611 use futures::future::join_all;
612 use futures::{join, FutureExt as _};
613 use fxfs_macros::FuzzyHash;
614 use std::hash::Hash;
615 use std::ops::Bound;
616 use std::time::{Duration, Instant};
617
618 #[derive(
619 Clone,
620 Eq,
621 Debug,
622 Hash,
623 FuzzyHash,
624 PartialEq,
625 PartialOrd,
626 Ord,
627 serde::Serialize,
628 serde::Deserialize,
629 TypeFingerprint,
630 Versioned,
631 )]
632 struct TestKey(u64);
633
634 versioned_type! { 1.. => TestKey }
635
636 impl SortByU64 for TestKey {
637 fn get_leading_u64(&self) -> u64 {
638 self.0
639 }
640 }
641
642 impl DefaultOrdLowerBound for TestKey {}
643 impl DefaultOrdUpperBound for TestKey {}
644
645 #[fuchsia::test]
646 async fn test_iteration() {
647 let skip_list = SkipListLayer::new(100);
649 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
650 skip_list.insert(items[1].clone()).expect("insert error");
651 skip_list.insert(items[0].clone()).expect("insert error");
652 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
653 let ItemRef { key, value, .. } = iter.get().expect("missing item");
654 assert_eq!((key, value), (&items[0].key, &items[0].value));
655 iter.advance().await.unwrap();
656 let ItemRef { key, value, .. } = iter.get().expect("missing item");
657 assert_eq!((key, value), (&items[1].key, &items[1].value));
658 iter.advance().await.unwrap();
659 assert!(iter.get().is_none());
660 }
661
662 #[fuchsia::test]
663 async fn test_seek_exact() {
664 let skip_list = SkipListLayer::new(100);
666 for i in (0..100).rev() {
667 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
668 }
669 let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
670 let ItemRef { key, value, .. } = iter.get().expect("missing item");
671 assert_eq!((key, value), (&TestKey(57), &57));
672
673 iter.advance().await.unwrap();
675 let ItemRef { key, value, .. } = iter.get().expect("missing item");
676 assert_eq!((key, value), (&TestKey(58), &58));
677 }
678
679 #[fuchsia::test]
680 async fn test_seek_lower_bound() {
681 let skip_list = SkipListLayer::new(100);
683 for i in (0..100).rev() {
684 skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
685 }
686 let mut expected_index = 57 * 3;
687 let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
688 let ItemRef { key, value, .. } = iter.get().expect("missing item");
689 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
690
691 expected_index += 3;
693 iter.advance().await.unwrap();
694 let ItemRef { key, value, .. } = iter.get().expect("missing item");
695 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
696 }
697
698 #[fuchsia::test]
699 async fn test_replace_or_insert_replaces() {
700 let skip_list = SkipListLayer::new(100);
701 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
702 skip_list.insert(items[1].clone()).expect("insert error");
703 skip_list.insert(items[0].clone()).expect("insert error");
704 let replacement_value = 3;
705 skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
706
707 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
708 let ItemRef { key, value, .. } = iter.get().expect("missing item");
709 assert_eq!((key, value), (&items[0].key, &items[0].value));
710 iter.advance().await.unwrap();
711 let ItemRef { key, value, .. } = iter.get().expect("missing item");
712 assert_eq!((key, value), (&items[1].key, &replacement_value));
713 iter.advance().await.unwrap();
714 assert!(iter.get().is_none());
715 }
716
717 #[fuchsia::test]
718 async fn test_replace_or_insert_inserts() {
719 let skip_list = SkipListLayer::new(100);
720 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
721 skip_list.insert(items[2].clone()).expect("insert error");
722 skip_list.insert(items[0].clone()).expect("insert error");
723 skip_list.replace_or_insert(items[1].clone());
724
725 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
726 let ItemRef { key, value, .. } = iter.get().expect("missing item");
727 assert_eq!((key, value), (&items[0].key, &items[0].value));
728 iter.advance().await.unwrap();
729 let ItemRef { key, value, .. } = iter.get().expect("missing item");
730 assert_eq!((key, value), (&items[1].key, &items[1].value));
731 iter.advance().await.unwrap();
732 let ItemRef { key, value, .. } = iter.get().expect("missing item");
733 assert_eq!((key, value), (&items[2].key, &items[2].value));
734 iter.advance().await.unwrap();
735 assert!(iter.get().is_none());
736 }
737
738 #[fuchsia::test]
739 async fn test_erase() {
740 let skip_list = SkipListLayer::new(100);
741 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
742 skip_list.insert(items[1].clone()).expect("insert error");
743 skip_list.insert(items[0].clone()).expect("insert error");
744
745 assert_eq!(skip_list.len(), 2);
746
747 skip_list.erase(&items[1].key);
748
749 assert_eq!(skip_list.len(), 1);
750
751 {
752 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
753 let ItemRef { key, value, .. } = iter.get().expect("missing item");
754 assert_eq!((key, value), (&items[0].key, &items[0].value));
755 iter.advance().await.unwrap();
756 assert!(iter.get().is_none());
757 }
758
759 skip_list.erase(&items[0].key);
760
761 assert_eq!(skip_list.len(), 0);
762
763 {
764 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
765 assert!(iter.get().is_none());
766 }
767 }
768
769 #[fuchsia::test]
772 #[ignore]
773 async fn test_seek_is_log_n_complexity() {
774 let mut n = 100;
777 let mut loops = 0;
778 const TARGET_TIME: Duration = Duration::from_millis(500);
779 let time = loop {
780 let skip_list = SkipListLayer::new(n as usize);
781 for i in 0..n {
782 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
783 }
784 let start = Instant::now();
785 for i in 0..n {
786 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
787 }
788 let elapsed = Instant::now() - start;
789 if elapsed > TARGET_TIME {
790 break elapsed;
791 }
792 n *= 2;
793 loops += 1;
794 };
795
796 let seek_count = n;
797 n >>= loops / 2; let skip_list = SkipListLayer::new(n as usize);
799 for i in 0..n {
800 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
801 }
802 let start = Instant::now();
803 for i in 0..seek_count {
804 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
805 }
806 let elapsed = Instant::now() - start;
807
808 eprintln!(
809 "{} items: {}ms, {} items: {}ms",
810 seek_count,
811 time.as_millis(),
812 n,
813 elapsed.as_millis()
814 );
815
816 assert!(elapsed * 4 > time);
820 }
821
822 #[fuchsia::test]
823 async fn test_large_number_of_items() {
824 let item_count = 1000;
825 let skip_list = SkipListLayer::new(1000);
826 for i in 1..item_count {
827 skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
828 }
829 let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
830 for i in item_count - 10..item_count {
831 assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
832 iter.advance().await.unwrap();
833 }
834 assert!(iter.get().is_none());
835 }
836
837 #[fuchsia::test]
838 async fn test_multiple_readers_allowed() {
839 let skip_list = SkipListLayer::new(100);
840 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
841 skip_list.insert(items[1].clone()).expect("insert error");
842 skip_list.insert(items[0].clone()).expect("insert error");
843
844 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
846 let ItemRef { key, value, .. } = iter.get().expect("missing item");
847 assert_eq!((key, value), (&items[0].key, &items[0].value));
848
849 let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
851 let ItemRef { key, value, .. } = iter2.get().expect("missing item");
852 assert_eq!((key, value), (&items[0].key, &items[0].value));
853
854 iter.advance().await.unwrap();
856 let ItemRef { key, value, .. } = iter.get().expect("missing item");
857 assert_eq!((key, value), (&items[1].key, &items[1].value));
858 }
859
860 fn merge(
861 left: &'_ MergeLayerIterator<'_, TestKey, i32>,
862 right: &'_ MergeLayerIterator<'_, TestKey, i32>,
863 ) -> MergeResult<TestKey, i32> {
864 MergeResult::Other {
865 emit: None,
866 left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
867 right: Discard,
868 }
869 }
870
871 #[fuchsia::test]
872 async fn test_merge_into() {
873 let skip_list = SkipListLayer::new(100);
874 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
875
876 skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
877
878 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
879 let ItemRef { key, value, .. } = iter.get().expect("missing item");
880 assert_eq!((key, value), (&TestKey(1), &3));
881 iter.advance().await.unwrap();
882 assert!(iter.get().is_none());
883 }
884
885 #[fuchsia::test]
886 async fn test_two_inserts() {
887 let skip_list = SkipListLayer::new(100);
888 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
889 {
890 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
891 iter.insert(items[0].clone());
892 iter.insert(items[1].clone());
893 }
894
895 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
896 let ItemRef { key, value, .. } = iter.get().expect("missing item");
897 assert_eq!((key, value), (&items[0].key, &items[0].value));
898 iter.advance().await.unwrap();
899 let ItemRef { key, value, .. } = iter.get().expect("missing item");
900 assert_eq!((key, value), (&items[1].key, &items[1].value));
901 }
902
903 #[fuchsia::test]
904 async fn test_erase_after_insert() {
905 let skip_list = SkipListLayer::new(100);
906 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
907 skip_list.insert(items[1].clone()).expect("insert error");
908 {
909 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
910 iter.insert(items[0].clone());
911 iter.erase();
912 }
913
914 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
915 let ItemRef { key, value, .. } = iter.get().expect("missing item");
916 assert_eq!((key, value), (&items[0].key, &items[0].value));
917 iter.advance().await.unwrap();
918 assert!(iter.get().is_none());
919 }
920
921 #[fuchsia::test]
922 async fn test_insert_after_erase() {
923 let skip_list = SkipListLayer::new(100);
924 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
925 skip_list.insert(items[1].clone()).expect("insert error");
926 {
927 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
928 iter.erase();
929 iter.insert(items[0].clone());
930 }
931
932 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
933 let ItemRef { key, value, .. } = iter.get().expect("missing item");
934 assert_eq!((key, value), (&items[0].key, &items[0].value));
935 iter.advance().await.unwrap();
936 assert!(iter.get().is_none());
937 }
938
939 #[fuchsia::test]
940 async fn test_insert_erase_insert() {
941 let skip_list = SkipListLayer::new(100);
942 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
943 skip_list.insert(items[0].clone()).expect("insert error");
944 {
945 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
946 iter.insert(items[1].clone());
947 iter.erase();
948 iter.insert(items[2].clone());
949 }
950
951 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
952 let ItemRef { key, value, .. } = iter.get().expect("missing item");
953 assert_eq!((key, value), (&items[1].key, &items[1].value));
954 iter.advance().await.unwrap();
955 let ItemRef { key, value, .. } = iter.get().expect("missing item");
956 assert_eq!((key, value), (&items[2].key, &items[2].value));
957 }
958
959 #[fuchsia::test]
960 async fn test_two_erase_erases() {
961 let skip_list = SkipListLayer::new(100);
962 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
963 skip_list.insert(items[0].clone()).expect("insert error");
964 skip_list.insert(items[1].clone()).expect("insert error");
965 skip_list.insert(items[2].clone()).expect("insert error");
966 {
967 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
968 iter.erase();
969 iter.erase();
970 }
971
972 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
973 let ItemRef { key, value, .. } = iter.get().expect("missing item");
974 assert_eq!((key, value), (&items[2].key, &items[2].value));
975 iter.advance().await.unwrap();
976 assert!(iter.get().is_none());
977 }
978
979 #[fuchsia::test]
980 async fn test_readers_not_blocked_by_writers() {
981 let skip_list = SkipListLayer::new(100);
982 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
983 skip_list.insert(items[1].clone()).expect("insert error");
984
985 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
986 let ItemRef { key, value, .. } = iter.get().expect("missing item");
987 assert_eq!((key, value), (&items[1].key, &items[1].value));
988
989 let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
990 let ItemRef { key, value, .. } = iter.get().expect("missing item");
991 assert_eq!((key, value), (&items[1].key, &items[1].value));
992
993 join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
994 loop {
995 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
996 let ItemRef { key, .. } = iter.get().expect("missing item");
997 if key == &items[0].key {
998 break;
999 }
1000 }
1001 iter.advance().await.unwrap();
1002 assert!(iter.get().is_none());
1003 std::mem::drop(iter);
1004 iter2.advance().await.unwrap();
1005 assert!(iter2.get().is_none());
1006 std::mem::drop(iter2);
1007 });
1008 }
1009
1010 #[fuchsia::test(threads = 20)]
1011 async fn test_many_readers_and_writers() {
1012 let skip_list = SkipListLayer::new(100);
1013 join_all(
1014 (0..10)
1015 .map(|i| {
1016 let skip_list_clone = skip_list.clone();
1017 fasync::Task::spawn(async move {
1018 for j in 0..10 {
1019 skip_list_clone
1020 .insert(Item::new(TestKey(i * 100 + j), i))
1021 .expect("insert error");
1022 }
1023 })
1024 })
1025 .chain((0..10).map(|_| {
1026 let skip_list_clone = skip_list.clone();
1027 fasync::Task::spawn(async move {
1028 for _ in 0..300 {
1029 let mut iter =
1030 skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1031 let mut last_item: Option<TestKey> = None;
1032 while let Some(item) = iter.get() {
1033 if let Some(last) = last_item {
1034 assert!(item.key > &last);
1035 }
1036 last_item = Some(item.key.clone());
1037 iter.advance().await.expect("advance failed");
1038 }
1039 }
1040 })
1041 })),
1042 )
1043 .await;
1044 }
1045
1046 #[fuchsia::test]
1047 async fn test_insert_advance_erase() {
1048 let skip_list = SkipListLayer::new(100);
1049 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1050 skip_list.insert(items[1].clone()).expect("insert error");
1051 skip_list.insert(items[2].clone()).expect("insert error");
1052
1053 assert_eq!(skip_list.len(), 2);
1054
1055 {
1056 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1057 iter.insert(items[0].clone());
1058 iter.advance();
1059 iter.erase();
1060 }
1061
1062 assert_eq!(skip_list.len(), 2);
1063
1064 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1065 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1066 assert_eq!((key, value), (&items[0].key, &items[0].value));
1067 iter.advance().await.unwrap();
1068 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1069 assert_eq!((key, value), (&items[1].key, &items[1].value));
1070 iter.advance().await.unwrap();
1071 assert!(iter.get().is_none());
1072 }
1073
1074 #[fuchsia::test]
1075 async fn test_seek_excluded() {
1076 let skip_list = SkipListLayer::new(100);
1077 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1078 skip_list.insert(items[0].clone()).expect("insert error");
1079 skip_list.insert(items[1].clone()).expect("insert error");
1080 let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1081 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1082 assert_eq!((key, value), (&items[1].key, &items[1].value));
1083 }
1084
1085 #[fuchsia::test]
1086 fn test_insert_race() {
1087 for _ in 0..1000 {
1088 let skip_list = SkipListLayer::new(100);
1089 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1090
1091 let skip_list_clone = skip_list.clone();
1092 let thread1 = std::thread::spawn(move || {
1093 skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1094 });
1095 let thread2 = std::thread::spawn(move || {
1096 let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1097 match iter.get() {
1098 Some(ItemRef { key: TestKey(2), .. }) => {}
1099 result => assert!(false, "{:?}", result),
1100 }
1101 });
1102 thread1.join().unwrap();
1103 thread2.join().unwrap();
1104 }
1105 }
1106
1107 #[fuchsia::test]
1108 fn test_replace_or_insert_multi_thread() {
1109 let skip_list = SkipListLayer::new(100);
1110 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1111 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1112 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1113 skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1114
1115 let mut threads = Vec::new();
1117 for i in 0..200 {
1118 let skip_list_clone = skip_list.clone();
1119 threads.push(std::thread::spawn(move || {
1120 skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1121 }));
1122 }
1123
1124 let _checker_thread = std::thread::spawn(move || loop {
1126 let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1127 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1128 iter.advance().now_or_never().unwrap().unwrap();
1129 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1130 iter.advance().now_or_never().unwrap().unwrap();
1131 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1132 });
1133
1134 for thread in threads {
1135 thread.join().unwrap();
1136 }
1137 }
1138}