fxfs/lsm_tree/
skip_list_layer.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// There are a great many optimisations that could be considered to improve performance and maybe
6// memory usage.
7
8use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12    BoxedLayerIterator, Item, ItemCount, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13    LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{Version, LATEST_VERSION};
16use anyhow::{bail, Error};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cell::UnsafeCell;
20use std::cmp::{min, Ordering};
21use std::collections::BTreeMap;
22use std::ops::{Bound, Range};
23use std::sync::atomic::{self, AtomicPtr, AtomicU32};
24use std::sync::Arc;
25
26// Each skip list node contains a variable sized pointer list. The head pointers also exist in the
27// form of a pointer list. Index 0 in the pointer list is the chain with the most elements i.e.
28// contains every element in the list.
29struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32    fn new(count: usize) -> PointerList<K, V> {
33        let mut pointers = Vec::new();
34        for _ in 0..count {
35            pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36        }
37        PointerList(pointers.into_boxed_slice())
38    }
39
40    fn len(&self) -> usize {
41        self.0.len()
42    }
43
44    // Extracts the pointer at the given index.
45    fn get_mut<'a>(&self, index: usize) -> Option<&'a mut SkipListNode<K, V>> {
46        unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_mut() }
47    }
48
49    // Same as previous, but returns an immutable reference.
50    fn get<'a>(&self, index: usize) -> Option<&'a SkipListNode<K, V>> {
51        unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_ref() }
52    }
53
54    // Sets the pointer at the given index.
55    fn set(&self, index: usize, node: Option<&SkipListNode<K, V>>) {
56        self.0[index].store(
57            match node {
58                None => std::ptr::null_mut(),
59                Some(node) => {
60                    // https://github.com/rust-lang/rust/issues/66136#issuecomment-550003651
61                    // suggests that the following is the best way to cast from const* to mut*.
62                    unsafe {
63                        (&*(node as *const SkipListNode<K, V>
64                            as *const UnsafeCell<SkipListNode<K, V>>))
65                            .get()
66                    }
67                }
68            },
69            atomic::Ordering::SeqCst,
70        );
71    }
72
73    fn get_ptr(&self, index: usize) -> *mut SkipListNode<K, V> {
74        self.0[index].load(atomic::Ordering::SeqCst)
75    }
76}
77
78struct SkipListNode<K, V> {
79    item: Item<K, V>,
80    pointers: PointerList<K, V>,
81}
82
83pub struct SkipListLayer<K, V> {
84    // These are the head pointers for the list.
85    pointers: PointerList<K, V>,
86
87    inner: Mutex<Inner<K, V>>,
88
89    // Writes are locked using this lock.
90    write_lock: Mutex<()>,
91
92    // The number of nodes that have been allocated.  This is only used for debugging purposes.
93    allocated: AtomicU32,
94
95    close_event: Mutex<Option<Arc<DropEvent>>>,
96}
97
98// The writer needs to synchronize with the readers and this is done by keeping track of read
99// counts.  We could, in theory, remove the mutex and make the read counts atomic (and thus make
100// reads truly lock free) but it's simpler and easier to reason about with a mutex and what matters
101// most is that we avoid using a futures::lock::Mutex for readers because that can be blocked for
102// relatively long periods of time.
103struct Inner<K, V> {
104    // After a write, if there are nodes that need to be freed, and existing readers, the epoch
105    // changes and new readers will be in a new epoch.  When all the old readers finish, the nodes
106    // can be freed.
107    epoch: u64,
108
109    // The number of readers on the current epoch.
110    current_count: u16,
111
112    // A list of nodes to be freed once the read counts have reached zero.
113    erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
114
115    // The number of items in the skip-list.
116    item_count: usize,
117}
118
119// After a mutation that involves erasing nodes, we must keep the nodes alive until there are no
120// more readers in any of the epochs prior to the mutation.  To deal with this, we track the number
121// of outstanding readers in each epoch so that when the count reaches zero, we know it is safe to
122// free the nodes.
123struct EpochEraseList<K, V> {
124    // The number of readers still associated with this epoch.  When this reaches zero, the list can
125    // be freed once all previous epochs have been freed.
126    count: u16,
127    // We represent the list by storing the head and tail of the list which each node chained to the
128    // next.
129    range: Range<*mut SkipListNode<K, V>>,
130}
131
132// Required because of `erase_lists` which holds pointers.
133unsafe impl<K, V> Send for Inner<K, V> {}
134
135impl<K, V> Inner<K, V> {
136    fn new() -> Self {
137        Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
138    }
139
140    fn free_erase_list(
141        &mut self,
142        owner: &SkipListLayer<K, V>,
143        list: Range<*mut SkipListNode<K, V>>,
144    ) {
145        let mut maybe_node = unsafe { list.start.as_mut() };
146        loop {
147            match maybe_node {
148                Some(node) if node as *const _ != list.end => {
149                    maybe_node = owner.free_node(node);
150                }
151                _ => break,
152            }
153        }
154    }
155}
156
157impl<K, V> SkipListLayer<K, V> {
158    pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
159        Arc::new(SkipListLayer {
160            pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
161            inner: Mutex::new(Inner::new()),
162            write_lock: Mutex::new(()),
163            allocated: AtomicU32::new(0),
164            close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
165        })
166    }
167
168    pub fn len(&self) -> usize {
169        self.inner.lock().item_count
170    }
171
172    fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
173        self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
174        Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
175    }
176
177    // Frees and then returns the next node in the chain.
178    // TODO(https://fxbug.dev/414761492): document or remove this `#[allow]`
179    #[allow(clippy::mut_from_ref)]
180    fn free_node(&self, node: &mut SkipListNode<K, V>) -> Option<&mut SkipListNode<K, V>> {
181        self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
182        // TODO(https://fxbug.dev/414760817): document unsafe usage
183        unsafe { Box::from_raw(node).pointers.get_mut(0) }
184    }
185}
186
187impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
188    // Erases the given item. Does nothing if the item doesn't exist.
189    pub fn erase(&self, key: &K)
190    where
191        K: std::cmp::Eq,
192    {
193        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
194        if let Some(ItemRef { key: k, .. }) = iter.get() {
195            if k == key {
196                iter.erase();
197            } else {
198                warn!("Attempt to erase key not present!");
199            }
200        }
201        iter.commit();
202    }
203
204    /// Inserts the given item.
205    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
206        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
207        if let Some(found_item) = iter.get() {
208            if found_item.key == &item.key {
209                bail!("Attempted to insert an existing key");
210            }
211        }
212        iter.insert(item);
213        Ok(())
214    }
215
216    /// Replaces or inserts the given item.
217    pub fn replace_or_insert(&self, item: Item<K, V>) {
218        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
219        if let Some(found_item) = iter.get() {
220            if found_item.key == &item.key {
221                iter.erase();
222            }
223        }
224        iter.insert(item);
225    }
226
227    /// Merges the item into the layer.
228    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
229        merge::merge_into(
230            Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
231            item,
232            merge_fn,
233        )
234        .unwrap();
235    }
236}
237
238// We have to manually manage memory.
239impl<K, V> Drop for SkipListLayer<K, V> {
240    fn drop(&mut self) {
241        let mut next = self.pointers.get_mut(0);
242        while let Some(node) = next {
243            next = self.free_node(node);
244        }
245        assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
246    }
247}
248
249#[async_trait]
250impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
251    async fn seek<'a>(
252        &'a self,
253        bound: std::ops::Bound<&K>,
254    ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
255        Ok(Box::new(SkipListLayerIter::new(self, bound)))
256    }
257
258    fn lock(&self) -> Option<Arc<DropEvent>> {
259        self.close_event.lock().clone()
260    }
261
262    fn estimated_len(&self) -> ItemCount {
263        ItemCount::Precise(self.inner.lock().item_count)
264    }
265
266    async fn close(&self) {
267        let listener = self.close_event.lock().take().expect("close already called").listen();
268        listener.await;
269    }
270
271    fn get_version(&self) -> Version {
272        // The SkipListLayer is stored in RAM and written to disk as a SimplePersistentLayer
273        // Hence, the SkipListLayer is always at the latest version
274        return LATEST_VERSION;
275    }
276
277    fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
278        node.record_bool("persistent", false);
279        node.record_uint("num_items", self.inner.lock().item_count as u64);
280    }
281}
282
283// -- SkipListLayerIter --
284
285struct SkipListLayerIter<'a, K, V> {
286    skip_list: &'a SkipListLayer<K, V>,
287
288    // The epoch for this reader.
289    epoch: u64,
290
291    // The current node.
292    node: Option<&'a SkipListNode<K, V>>,
293}
294
295impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
296    fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
297        let epoch = {
298            let mut inner = skip_list.inner.lock();
299            inner.current_count += 1;
300            inner.epoch
301        };
302        let (included, key) = match bound {
303            Bound::Unbounded => {
304                return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
305            }
306            Bound::Included(key) => (true, key),
307            Bound::Excluded(key) => (false, key),
308        };
309        let mut last_pointers = &skip_list.pointers;
310
311        // Some care needs to be taken here because new elements can be inserted atomically, so it
312        // is important that the node we return in the iterator is the same node that we performed
313        // the last comparison on.
314        let mut node = None;
315        for index in (0..skip_list.pointers.len()).rev() {
316            // Keep iterating along this level until we encounter a key that's >= our search key.
317            loop {
318                node = last_pointers.get(index);
319                if let Some(node) = node {
320                    match &node.item.key.cmp_upper_bound(key) {
321                        Ordering::Equal if included => break,
322                        Ordering::Greater => break,
323                        _ => {}
324                    }
325                    last_pointers = &node.pointers;
326                } else {
327                    break;
328                }
329            }
330        }
331        SkipListLayerIter { skip_list, epoch, node }
332    }
333}
334
335impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
336    fn drop(&mut self) {
337        let mut inner = self.skip_list.inner.lock();
338        if self.epoch == inner.epoch {
339            inner.current_count -= 1;
340        } else {
341            if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
342                erase_list.count -= 1;
343                if erase_list.count == 0 {
344                    while let Some(entry) = inner.erase_lists.first_entry() {
345                        if entry.get().count == 0 {
346                            let range = entry.remove_entry().1.range;
347                            inner.free_erase_list(self.skip_list, range);
348                        } else {
349                            break;
350                        }
351                    }
352                }
353            }
354        }
355    }
356}
357
358#[async_trait]
359impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
360    async fn advance(&mut self) -> Result<(), Error> {
361        match self.node {
362            None => {}
363            Some(node) => self.node = node.pointers.get(0),
364        }
365        Ok(())
366    }
367
368    fn get(&self) -> Option<ItemRef<'_, K, V>> {
369        self.node.map(|node| node.item.as_item_ref())
370    }
371}
372
373type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
374
375// -- SkipListLayerIterMut --
376
377// This works by building an insertion chain.  When that chain is committed, it is done atomically
378// so that readers are not interrupted.  When the existing readers are finished, it is then safe to
379// release memory for any nodes that might have been erased.  In the case that we are only erasing
380// elements, there will be no insertion chain, in which case we just atomically remove the elements
381// from the chain.
382pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
383    skip_list: &'a SkipListLayer<K, V>,
384
385    // Since this is a mutable iterator, we need to keep pointers to all the nodes that precede the
386    // current position at every level, so that we can update them when inserting or erasing
387    // elements.
388    prev_pointers: PointerListRefArray<'a, K, V>,
389
390    // When we first insert or erase an element, we take a copy of prev_pointers so that
391    // we know which pointers need to be updated when we commit.
392    insertion_point: Option<PointerListRefArray<'a, K, V>>,
393
394    // These are the nodes that we should point to when we commit.
395    insertion_nodes: PointerList<K, V>,
396
397    // Only one write can proceed at a time.  We only need a place to keep the mutex guard, which is
398    // why Rust thinks this is unused.
399    #[allow(dead_code)]
400    write_guard: MutexGuard<'a, ()>,
401
402    // The change in item count as a result of this mutation.
403    item_delta: isize,
404}
405
406impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
407    pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
408        let write_guard = skip_list.write_lock.lock();
409        let len = skip_list.pointers.len();
410
411        // Start by setting all the previous pointers to the head.
412        //
413        // To understand how the previous pointers work, imagine the list looks something like the
414        // following:
415        //
416        // 2  |--->|
417        // 1  |--->|--|------->|
418        // 0  |--->|--|--|--|->|
419        //  HEAD   A  B  C  D  E  F
420        //
421        // Now imagine that the iterator is pointing at element D. In that case, the previous
422        // pointers will point at C for index 0, B for index 1 and A for index 2. With that
423        // information, it will be possible to insert an element immediately prior to D and
424        // correctly update as many pointers as required (remember a new element will be given a
425        // random number of levels).
426        let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
427        match bound {
428            Bound::Unbounded => {}
429            Bound::Included(key) => {
430                let pointers = &mut prev_pointers;
431                for index in (0..len).rev() {
432                    while let Some(node) = pointers[index].get(index) {
433                        // Keep iterating along this level until we encounter a key that's >= our
434                        // search key.
435                        match &(node.item.key).cmp_upper_bound(key) {
436                            Ordering::Equal | Ordering::Greater => break,
437                            Ordering::Less => {}
438                        }
439                        pointers[index] = &node.pointers;
440                    }
441                    if index > 0 {
442                        pointers[index - 1] = pointers[index];
443                    }
444                }
445            }
446            Bound::Excluded(_) => panic!("Excluded bounds not supported"),
447        }
448        SkipListLayerIterMut {
449            skip_list,
450            prev_pointers,
451            insertion_point: None,
452            insertion_nodes: PointerList::new(len),
453            write_guard,
454            item_delta: 0,
455        }
456    }
457}
458
459impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
460    fn drop(&mut self) {
461        self.commit();
462    }
463}
464
465impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
466    fn advance(&mut self) {
467        if self.insertion_point.is_some() {
468            if let Some(item) = self.get() {
469                // Copy the current item into the insertion chain.
470                let copy = item.cloned();
471                self.insert(copy);
472                self.erase();
473            }
474        } else {
475            let pointers = &mut self.prev_pointers;
476            if let Some(next) = pointers[0].get_mut(0) {
477                for i in 0..next.pointers.len() {
478                    pointers[i] = &next.pointers;
479                }
480            }
481        }
482    }
483
484    fn get(&self) -> Option<ItemRef<'_, K, V>> {
485        self.prev_pointers[0].get(0).map(|node| node.item.as_item_ref())
486    }
487
488    fn insert(&mut self, item: Item<K, V>) {
489        use rand::Rng;
490        let mut rng = rand::thread_rng();
491        let max_pointers = self.skip_list.pointers.len();
492        // This chooses a random number of pointers such that each level has half the number of
493        // pointers of the previous one.
494        let pointer_count = max_pointers
495            - min(
496                (rng.gen_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
497                max_pointers - 1,
498            );
499        let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
500        if self.insertion_point.is_none() {
501            self.insertion_point = Some(self.prev_pointers.clone());
502        }
503        for i in 0..pointer_count {
504            let pointers = self.prev_pointers[i];
505            node.pointers.set(i, pointers.get(i));
506            if self.insertion_nodes.get(i).is_none() {
507                // If there's no insertion node at this level, record this node as the node to
508                // switch in when we commit.
509                self.insertion_nodes.set(i, Some(node));
510            } else {
511                // There's already an insertion node at this level which means that it's part of the
512                // insertion chain, so we can just update the pointers now.
513                pointers.set(i, Some(&node));
514            }
515            // The iterator should point at the node following the new node i.e. the existing node.
516            self.prev_pointers[i] = &node.pointers;
517        }
518        self.item_delta += 1;
519    }
520
521    fn erase(&mut self) {
522        let pointers = &mut self.prev_pointers;
523        if let Some(next) = pointers[0].get_mut(0) {
524            if self.insertion_point.is_none() {
525                self.insertion_point = Some(pointers.clone());
526            }
527            if self.insertion_nodes.get(0).is_none() {
528                // If there's no insertion node, then just update the iterator position to point to
529                // the next node, and then when we commit, it'll get erased.
530                pointers[0] = &next.pointers;
531            } else {
532                // There's an insertion node, so the current element must be part of the insertion
533                // chain and so we can update the pointers immediately.  There will be another node
534                // that isn't part of the insertion chain that will still point at this node, but it
535                // will disappear when we commit.
536                pointers[0].set(0, next.pointers.get(0));
537            }
538            // Fix up all the pointers except the bottom one. Readers will still find this node,
539            // just not as efficiently.
540            for i in 1..next.pointers.len() {
541                pointers[i].set(i, next.pointers.get(i));
542            }
543        }
544        self.item_delta -= 1;
545    }
546
547    // Commits the changes.  Note that this doesn't wait for readers to finish; any barrier that be
548    // required should be handled by the caller.
549    fn commit(&mut self) {
550        // Splice the changes into the list.
551        let prev_pointers = match self.insertion_point.take() {
552            Some(prev_pointers) => prev_pointers,
553            None => return,
554        };
555
556        // Keep track of the first node that we might need to erase later.
557        let maybe_erase = prev_pointers[0].get_mut(0);
558
559        // If there are no insertion nodes, then it means that we're only erasing nodes.
560        if self.insertion_nodes.get(0).is_none() {
561            // Erase all elements between the insertion point and the current element. The
562            // pointers for levels > 0 should already have been done, so it's only level 0 we
563            // need to worry about.
564            prev_pointers[0].set(0, self.prev_pointers[0].get(0));
565        } else {
566            // Switch the pointers over so that the insertion chain is spliced in.  This is safe
567            // so long as the bottom pointer is done first because that guarantees the new nodes
568            // will be found, just maybe not as efficiently.
569            for i in 0..self.insertion_nodes.len() {
570                if let Some(node) = self.insertion_nodes.get_mut(i) {
571                    prev_pointers[i].set(i, Some(node));
572                }
573            }
574        }
575
576        // Switch the epoch so that we can track when existing readers have finished.
577        let mut inner = self.skip_list.inner.lock();
578        inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
579        if let Some(start) = maybe_erase {
580            let end = self.prev_pointers[0].get_ptr(0);
581            if start as *mut _ != end {
582                if inner.current_count > 0 || !inner.erase_lists.is_empty() {
583                    let count = std::mem::take(&mut inner.current_count);
584                    let epoch = inner.epoch;
585                    inner.erase_lists.insert(epoch, EpochEraseList { count, range: start..end });
586                    inner.epoch = inner.epoch.wrapping_add(1);
587                } else {
588                    inner.free_erase_list(self.skip_list, start..end);
589                }
590            }
591        }
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use super::{SkipListLayer, SkipListLayerIterMut};
598    use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
599    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
600    use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
601    use crate::lsm_tree::types::{
602        DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerIterator,
603        LayerIteratorMut, SortByU64,
604    };
605    use crate::serialized_types::{
606        versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
607    };
608    use assert_matches::assert_matches;
609    use fprint::TypeFingerprint;
610    use fuchsia_async as fasync;
611    use futures::future::join_all;
612    use futures::{join, FutureExt as _};
613    use fxfs_macros::FuzzyHash;
614    use std::hash::Hash;
615    use std::ops::Bound;
616    use std::time::{Duration, Instant};
617
618    #[derive(
619        Clone,
620        Eq,
621        Debug,
622        Hash,
623        FuzzyHash,
624        PartialEq,
625        PartialOrd,
626        Ord,
627        serde::Serialize,
628        serde::Deserialize,
629        TypeFingerprint,
630        Versioned,
631    )]
632    struct TestKey(u64);
633
634    versioned_type! { 1.. => TestKey }
635
636    impl SortByU64 for TestKey {
637        fn get_leading_u64(&self) -> u64 {
638            self.0
639        }
640    }
641
642    impl DefaultOrdLowerBound for TestKey {}
643    impl DefaultOrdUpperBound for TestKey {}
644
645    #[fuchsia::test]
646    async fn test_iteration() {
647        // Insert two items and make sure we can iterate back in the correct order.
648        let skip_list = SkipListLayer::new(100);
649        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
650        skip_list.insert(items[1].clone()).expect("insert error");
651        skip_list.insert(items[0].clone()).expect("insert error");
652        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
653        let ItemRef { key, value, .. } = iter.get().expect("missing item");
654        assert_eq!((key, value), (&items[0].key, &items[0].value));
655        iter.advance().await.unwrap();
656        let ItemRef { key, value, .. } = iter.get().expect("missing item");
657        assert_eq!((key, value), (&items[1].key, &items[1].value));
658        iter.advance().await.unwrap();
659        assert!(iter.get().is_none());
660    }
661
662    #[fuchsia::test]
663    async fn test_seek_exact() {
664        // Seek for an exact match.
665        let skip_list = SkipListLayer::new(100);
666        for i in (0..100).rev() {
667            skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
668        }
669        let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
670        let ItemRef { key, value, .. } = iter.get().expect("missing item");
671        assert_eq!((key, value), (&TestKey(57), &57));
672
673        // And check the next item is correct.
674        iter.advance().await.unwrap();
675        let ItemRef { key, value, .. } = iter.get().expect("missing item");
676        assert_eq!((key, value), (&TestKey(58), &58));
677    }
678
679    #[fuchsia::test]
680    async fn test_seek_lower_bound() {
681        // Seek for a non-exact match.
682        let skip_list = SkipListLayer::new(100);
683        for i in (0..100).rev() {
684            skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
685        }
686        let mut expected_index = 57 * 3;
687        let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
688        let ItemRef { key, value, .. } = iter.get().expect("missing item");
689        assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
690
691        // And check the next item is correct.
692        expected_index += 3;
693        iter.advance().await.unwrap();
694        let ItemRef { key, value, .. } = iter.get().expect("missing item");
695        assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
696    }
697
698    #[fuchsia::test]
699    async fn test_replace_or_insert_replaces() {
700        let skip_list = SkipListLayer::new(100);
701        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
702        skip_list.insert(items[1].clone()).expect("insert error");
703        skip_list.insert(items[0].clone()).expect("insert error");
704        let replacement_value = 3;
705        skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
706
707        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
708        let ItemRef { key, value, .. } = iter.get().expect("missing item");
709        assert_eq!((key, value), (&items[0].key, &items[0].value));
710        iter.advance().await.unwrap();
711        let ItemRef { key, value, .. } = iter.get().expect("missing item");
712        assert_eq!((key, value), (&items[1].key, &replacement_value));
713        iter.advance().await.unwrap();
714        assert!(iter.get().is_none());
715    }
716
717    #[fuchsia::test]
718    async fn test_replace_or_insert_inserts() {
719        let skip_list = SkipListLayer::new(100);
720        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
721        skip_list.insert(items[2].clone()).expect("insert error");
722        skip_list.insert(items[0].clone()).expect("insert error");
723        skip_list.replace_or_insert(items[1].clone());
724
725        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
726        let ItemRef { key, value, .. } = iter.get().expect("missing item");
727        assert_eq!((key, value), (&items[0].key, &items[0].value));
728        iter.advance().await.unwrap();
729        let ItemRef { key, value, .. } = iter.get().expect("missing item");
730        assert_eq!((key, value), (&items[1].key, &items[1].value));
731        iter.advance().await.unwrap();
732        let ItemRef { key, value, .. } = iter.get().expect("missing item");
733        assert_eq!((key, value), (&items[2].key, &items[2].value));
734        iter.advance().await.unwrap();
735        assert!(iter.get().is_none());
736    }
737
738    #[fuchsia::test]
739    async fn test_erase() {
740        let skip_list = SkipListLayer::new(100);
741        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
742        skip_list.insert(items[1].clone()).expect("insert error");
743        skip_list.insert(items[0].clone()).expect("insert error");
744
745        assert_eq!(skip_list.len(), 2);
746
747        skip_list.erase(&items[1].key);
748
749        assert_eq!(skip_list.len(), 1);
750
751        {
752            let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
753            let ItemRef { key, value, .. } = iter.get().expect("missing item");
754            assert_eq!((key, value), (&items[0].key, &items[0].value));
755            iter.advance().await.unwrap();
756            assert!(iter.get().is_none());
757        }
758
759        skip_list.erase(&items[0].key);
760
761        assert_eq!(skip_list.len(), 0);
762
763        {
764            let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
765            assert!(iter.get().is_none());
766        }
767    }
768
769    // This test ends up being flaky on CQ. It is left here as it might be useful in case
770    // significant changes are made.
771    #[fuchsia::test]
772    #[ignore]
773    async fn test_seek_is_log_n_complexity() {
774        // Keep doubling up the number of items until it takes about 500ms to search and then go
775        // back and measure something that should, in theory, take about half that time.
776        let mut n = 100;
777        let mut loops = 0;
778        const TARGET_TIME: Duration = Duration::from_millis(500);
779        let time = loop {
780            let skip_list = SkipListLayer::new(n as usize);
781            for i in 0..n {
782                skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
783            }
784            let start = Instant::now();
785            for i in 0..n {
786                skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
787            }
788            let elapsed = Instant::now() - start;
789            if elapsed > TARGET_TIME {
790                break elapsed;
791            }
792            n *= 2;
793            loops += 1;
794        };
795
796        let seek_count = n;
797        n >>= loops / 2; // This should, in theory, result in 50% seek time.
798        let skip_list = SkipListLayer::new(n as usize);
799        for i in 0..n {
800            skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
801        }
802        let start = Instant::now();
803        for i in 0..seek_count {
804            skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
805        }
806        let elapsed = Instant::now() - start;
807
808        eprintln!(
809            "{} items: {}ms, {} items: {}ms",
810            seek_count,
811            time.as_millis(),
812            n,
813            elapsed.as_millis()
814        );
815
816        // Experimental results show that typically we do a bit better than log(n), but here we just
817        // check that the time we just measured is above 25% of the time we first measured, the
818        // theory suggests it should be around 50%.
819        assert!(elapsed * 4 > time);
820    }
821
822    #[fuchsia::test]
823    async fn test_large_number_of_items() {
824        let item_count = 1000;
825        let skip_list = SkipListLayer::new(1000);
826        for i in 1..item_count {
827            skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
828        }
829        let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
830        for i in item_count - 10..item_count {
831            assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
832            iter.advance().await.unwrap();
833        }
834        assert!(iter.get().is_none());
835    }
836
837    #[fuchsia::test]
838    async fn test_multiple_readers_allowed() {
839        let skip_list = SkipListLayer::new(100);
840        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
841        skip_list.insert(items[1].clone()).expect("insert error");
842        skip_list.insert(items[0].clone()).expect("insert error");
843
844        // Create the first iterator and check the first item.
845        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
846        let ItemRef { key, value, .. } = iter.get().expect("missing item");
847        assert_eq!((key, value), (&items[0].key, &items[0].value));
848
849        // Create a second iterator and check the first item.
850        let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
851        let ItemRef { key, value, .. } = iter2.get().expect("missing item");
852        assert_eq!((key, value), (&items[0].key, &items[0].value));
853
854        // Now go back to the first iterator and check the second item.
855        iter.advance().await.unwrap();
856        let ItemRef { key, value, .. } = iter.get().expect("missing item");
857        assert_eq!((key, value), (&items[1].key, &items[1].value));
858    }
859
860    fn merge(
861        left: &'_ MergeLayerIterator<'_, TestKey, i32>,
862        right: &'_ MergeLayerIterator<'_, TestKey, i32>,
863    ) -> MergeResult<TestKey, i32> {
864        MergeResult::Other {
865            emit: None,
866            left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
867            right: Discard,
868        }
869    }
870
871    #[fuchsia::test]
872    async fn test_merge_into() {
873        let skip_list = SkipListLayer::new(100);
874        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
875
876        skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
877
878        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
879        let ItemRef { key, value, .. } = iter.get().expect("missing item");
880        assert_eq!((key, value), (&TestKey(1), &3));
881        iter.advance().await.unwrap();
882        assert!(iter.get().is_none());
883    }
884
885    #[fuchsia::test]
886    async fn test_two_inserts() {
887        let skip_list = SkipListLayer::new(100);
888        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
889        {
890            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
891            iter.insert(items[0].clone());
892            iter.insert(items[1].clone());
893        }
894
895        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
896        let ItemRef { key, value, .. } = iter.get().expect("missing item");
897        assert_eq!((key, value), (&items[0].key, &items[0].value));
898        iter.advance().await.unwrap();
899        let ItemRef { key, value, .. } = iter.get().expect("missing item");
900        assert_eq!((key, value), (&items[1].key, &items[1].value));
901    }
902
903    #[fuchsia::test]
904    async fn test_erase_after_insert() {
905        let skip_list = SkipListLayer::new(100);
906        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
907        skip_list.insert(items[1].clone()).expect("insert error");
908        {
909            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
910            iter.insert(items[0].clone());
911            iter.erase();
912        }
913
914        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
915        let ItemRef { key, value, .. } = iter.get().expect("missing item");
916        assert_eq!((key, value), (&items[0].key, &items[0].value));
917        iter.advance().await.unwrap();
918        assert!(iter.get().is_none());
919    }
920
921    #[fuchsia::test]
922    async fn test_insert_after_erase() {
923        let skip_list = SkipListLayer::new(100);
924        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
925        skip_list.insert(items[1].clone()).expect("insert error");
926        {
927            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
928            iter.erase();
929            iter.insert(items[0].clone());
930        }
931
932        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
933        let ItemRef { key, value, .. } = iter.get().expect("missing item");
934        assert_eq!((key, value), (&items[0].key, &items[0].value));
935        iter.advance().await.unwrap();
936        assert!(iter.get().is_none());
937    }
938
939    #[fuchsia::test]
940    async fn test_insert_erase_insert() {
941        let skip_list = SkipListLayer::new(100);
942        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
943        skip_list.insert(items[0].clone()).expect("insert error");
944        {
945            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
946            iter.insert(items[1].clone());
947            iter.erase();
948            iter.insert(items[2].clone());
949        }
950
951        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
952        let ItemRef { key, value, .. } = iter.get().expect("missing item");
953        assert_eq!((key, value), (&items[1].key, &items[1].value));
954        iter.advance().await.unwrap();
955        let ItemRef { key, value, .. } = iter.get().expect("missing item");
956        assert_eq!((key, value), (&items[2].key, &items[2].value));
957    }
958
959    #[fuchsia::test]
960    async fn test_two_erase_erases() {
961        let skip_list = SkipListLayer::new(100);
962        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
963        skip_list.insert(items[0].clone()).expect("insert error");
964        skip_list.insert(items[1].clone()).expect("insert error");
965        skip_list.insert(items[2].clone()).expect("insert error");
966        {
967            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
968            iter.erase();
969            iter.erase();
970        }
971
972        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
973        let ItemRef { key, value, .. } = iter.get().expect("missing item");
974        assert_eq!((key, value), (&items[2].key, &items[2].value));
975        iter.advance().await.unwrap();
976        assert!(iter.get().is_none());
977    }
978
979    #[fuchsia::test]
980    async fn test_readers_not_blocked_by_writers() {
981        let skip_list = SkipListLayer::new(100);
982        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
983        skip_list.insert(items[1].clone()).expect("insert error");
984
985        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
986        let ItemRef { key, value, .. } = iter.get().expect("missing item");
987        assert_eq!((key, value), (&items[1].key, &items[1].value));
988
989        let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
990        let ItemRef { key, value, .. } = iter.get().expect("missing item");
991        assert_eq!((key, value), (&items[1].key, &items[1].value));
992
993        join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
994            loop {
995                let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
996                let ItemRef { key, .. } = iter.get().expect("missing item");
997                if key == &items[0].key {
998                    break;
999                }
1000            }
1001            iter.advance().await.unwrap();
1002            assert!(iter.get().is_none());
1003            std::mem::drop(iter);
1004            iter2.advance().await.unwrap();
1005            assert!(iter2.get().is_none());
1006            std::mem::drop(iter2);
1007        });
1008    }
1009
1010    #[fuchsia::test(threads = 20)]
1011    async fn test_many_readers_and_writers() {
1012        let skip_list = SkipListLayer::new(100);
1013        join_all(
1014            (0..10)
1015                .map(|i| {
1016                    let skip_list_clone = skip_list.clone();
1017                    fasync::Task::spawn(async move {
1018                        for j in 0..10 {
1019                            skip_list_clone
1020                                .insert(Item::new(TestKey(i * 100 + j), i))
1021                                .expect("insert error");
1022                        }
1023                    })
1024                })
1025                .chain((0..10).map(|_| {
1026                    let skip_list_clone = skip_list.clone();
1027                    fasync::Task::spawn(async move {
1028                        for _ in 0..300 {
1029                            let mut iter =
1030                                skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1031                            let mut last_item: Option<TestKey> = None;
1032                            while let Some(item) = iter.get() {
1033                                if let Some(last) = last_item {
1034                                    assert!(item.key > &last);
1035                                }
1036                                last_item = Some(item.key.clone());
1037                                iter.advance().await.expect("advance failed");
1038                            }
1039                        }
1040                    })
1041                })),
1042        )
1043        .await;
1044    }
1045
1046    #[fuchsia::test]
1047    async fn test_insert_advance_erase() {
1048        let skip_list = SkipListLayer::new(100);
1049        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1050        skip_list.insert(items[1].clone()).expect("insert error");
1051        skip_list.insert(items[2].clone()).expect("insert error");
1052
1053        assert_eq!(skip_list.len(), 2);
1054
1055        {
1056            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1057            iter.insert(items[0].clone());
1058            iter.advance();
1059            iter.erase();
1060        }
1061
1062        assert_eq!(skip_list.len(), 2);
1063
1064        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1065        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1066        assert_eq!((key, value), (&items[0].key, &items[0].value));
1067        iter.advance().await.unwrap();
1068        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1069        assert_eq!((key, value), (&items[1].key, &items[1].value));
1070        iter.advance().await.unwrap();
1071        assert!(iter.get().is_none());
1072    }
1073
1074    #[fuchsia::test]
1075    async fn test_seek_excluded() {
1076        let skip_list = SkipListLayer::new(100);
1077        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1078        skip_list.insert(items[0].clone()).expect("insert error");
1079        skip_list.insert(items[1].clone()).expect("insert error");
1080        let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1081        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1082        assert_eq!((key, value), (&items[1].key, &items[1].value));
1083    }
1084
1085    #[fuchsia::test]
1086    fn test_insert_race() {
1087        for _ in 0..1000 {
1088            let skip_list = SkipListLayer::new(100);
1089            skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1090
1091            let skip_list_clone = skip_list.clone();
1092            let thread1 = std::thread::spawn(move || {
1093                skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1094            });
1095            let thread2 = std::thread::spawn(move || {
1096                let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1097                match iter.get() {
1098                    Some(ItemRef { key: TestKey(2), .. }) => {}
1099                    result => assert!(false, "{:?}", result),
1100                }
1101            });
1102            thread1.join().unwrap();
1103            thread2.join().unwrap();
1104        }
1105    }
1106
1107    #[fuchsia::test]
1108    fn test_replace_or_insert_multi_thread() {
1109        let skip_list = SkipListLayer::new(100);
1110        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1111        skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1112        skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1113        skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1114
1115        // Set up a number of threads that are repeatedly replacing the '3' key.
1116        let mut threads = Vec::new();
1117        for i in 0..200 {
1118            let skip_list_clone = skip_list.clone();
1119            threads.push(std::thread::spawn(move || {
1120                skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1121            }));
1122        }
1123
1124        // Have one thread repeatedly checking the list.
1125        let _checker_thread = std::thread::spawn(move || loop {
1126            let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1127            assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1128            iter.advance().now_or_never().unwrap().unwrap();
1129            assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1130            iter.advance().now_or_never().unwrap().unwrap();
1131            assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1132        });
1133
1134        for thread in threads {
1135            thread.join().unwrap();
1136        }
1137    }
1138}