fxfs/object_store/
tree.rs1use crate::lsm_tree::types::{LayerIterator, MergeableKey, Value};
6use crate::lsm_tree::{LSMTree, LockedLayer, Query};
7use crate::object_handle::WriteBytes;
8use crate::object_store::journal;
9use crate::serialized_types::LATEST_VERSION;
10use anyhow::{Context, Error};
11use std::future::{ready, Future};
12
13pub trait MajorCompactable<K: 'static, V: 'static> {
14 fn major_iter(
18 iter: impl LayerIterator<K, V>,
19 ) -> impl Future<Output = Result<impl LayerIterator<K, V>, Error>> + Send {
20 ready(Ok(iter))
21 }
22}
23
24type Layers<K, V> = Vec<LockedLayer<K, V>>;
25
26pub async fn flush<'a, K: MergeableKey, V: Value>(
30 tree: &'a LSMTree<K, V>,
31 writer: impl WriteBytes + Send,
32) -> Result<(Layers<K, V>, Layers<K, V>), Error>
33where
34 LSMTree<K, V>: MajorCompactable<K, V>,
35{
36 let earliest_version = tree.get_earliest_version();
37 let mut layer_set = tree.immutable_layer_set();
38 let mut total_size = 0;
39 let mut layer_count = 0;
40 let mut split_index = if earliest_version == LATEST_VERSION {
41 layer_set
42 .layers
43 .iter()
44 .position(|layer| {
45 match layer.handle() {
46 None => {}
47 Some(handle) => {
48 let size = handle.get_size();
49 if total_size > 0 && total_size * 4 < size * 3 {
52 return true;
53 }
54 total_size += size;
55 layer_count += 1;
56 }
57 }
58 false
59 })
60 .unwrap_or(layer_set.layers.len())
61 } else {
62 layer_set.layers.len()
65 };
66
67 if layer_count == 1
74 && total_size > journal::DEFAULT_RECLAIM_SIZE
75 && layer_set.layers[split_index - 1].handle().is_some()
76 {
77 split_index -= 1;
78 }
79
80 let layers_to_keep = layer_set.layers.split_off(split_index);
81
82 {
83 let block_size = writer.block_size();
84 let total_len = layer_set.sum_len();
85 let mut merger = layer_set.merger();
86 let iter = merger.query(Query::FullScan).await?;
87 if layers_to_keep.is_empty() {
88 let major_iter = LSMTree::<K, V>::major_iter(iter).await?;
89 tree.compact_with_iterator(major_iter, total_len, writer, block_size).await
90 } else {
91 tree.compact_with_iterator(iter, total_len, writer, block_size).await
92 }
93 .context("ObjectStore::flush")?;
94 }
95
96 Ok((layers_to_keep, layer_set.layers))
97}
98
99pub fn reservation_amount_from_layer_size(layer_size: u64) -> u64 {
102 layer_size * 3 / 2
105}