fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::debug_assert_not_too_long;
7use crate::filesystem::{FxFilesystem, TxnGuard};
8use crate::log::*;
9use crate::lsm_tree::types::Item;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{reserved_space_from_journal_usage, ObjectManager};
13use crate::object_store::object_record::{
14    ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43, ObjectItemV46, ObjectKey,
15    ObjectKeyData, ObjectValue, ProjectProperty,
16};
17use crate::serialized_types::{migrate_nodefault, migrate_to_version, Migrate, Versioned};
18use anyhow::Error;
19use either::{Either, Left, Right};
20use fprint::TypeFingerprint;
21use fuchsia_sync::Mutex;
22use futures::future::poll_fn;
23use futures::pin_mut;
24use fxfs_crypto::{WrappedKey, WrappedKeyV32, WrappedKeyV40};
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::hash_map::Entry;
31use std::collections::BTreeSet;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV46;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV46 {
87    ObjectStore(ObjectStoreMutationV46),
88    EncryptedObjectStore(Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKeyV40),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV46)]
104pub enum MutationV43 {
105    ObjectStore(ObjectStoreMutationV43),
106    EncryptedObjectStore(Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKeyV40),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV43)]
118pub enum MutationV41 {
119    ObjectStore(ObjectStoreMutationV41),
120    EncryptedObjectStore(Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    BeginFlush,
123    EndFlush,
124    DeleteVolume,
125    UpdateBorrowed(u64),
126    UpdateMutationsKey(UpdateMutationsKeyV40),
127    CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV41)]
132pub enum MutationV40 {
133    ObjectStore(ObjectStoreMutationV40),
134    EncryptedObjectStore(Box<[u8]>),
135    Allocator(AllocatorMutationV32),
136    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
137    BeginFlush,
138    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
139    // with compacted ones.
140    EndFlush,
141    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
142    DeleteVolume,
143    UpdateBorrowed(u64),
144    UpdateMutationsKey(UpdateMutationsKeyV40),
145    CreateInternalDir(u64),
146}
147
148impl Mutation {
149    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
150        Mutation::ObjectStore(ObjectStoreMutation {
151            item: Item::new(key, value),
152            op: Operation::Insert,
153        })
154    }
155
156    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
157        Mutation::ObjectStore(ObjectStoreMutation {
158            item: Item::new(key, value),
159            op: Operation::ReplaceOrInsert,
160        })
161    }
162
163    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
164        Mutation::ObjectStore(ObjectStoreMutation {
165            item: Item::new(key, value),
166            op: Operation::Merge,
167        })
168    }
169
170    pub fn update_mutations_key(key: WrappedKey) -> Self {
171        Mutation::UpdateMutationsKey(key.into())
172    }
173}
174
175// We have custom comparison functions for mutations that just use the key, rather than the key and
176// value that would be used by default so that we can deduplicate and find mutations (see
177// get_object_mutation below).
178pub type ObjectStoreMutation = ObjectStoreMutationV46;
179
180#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
181#[migrate_nodefault]
182#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
183pub struct ObjectStoreMutationV46 {
184    pub item: ObjectItemV46,
185    pub op: OperationV32,
186}
187
188#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
189#[migrate_nodefault]
190pub struct ObjectStoreMutationV43 {
191    pub item: ObjectItemV43,
192    pub op: OperationV32,
193}
194
195#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
196#[migrate_to_version(ObjectStoreMutationV43)]
197#[migrate_nodefault]
198pub struct ObjectStoreMutationV41 {
199    pub item: ObjectItemV41,
200    pub op: OperationV32,
201}
202
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
204#[migrate_nodefault]
205#[migrate_to_version(ObjectStoreMutationV41)]
206pub struct ObjectStoreMutationV40 {
207    pub item: ObjectItemV40,
208    pub op: OperationV32,
209}
210
211/// The different LSM tree operations that can be performed as part of a mutation.
212pub type Operation = OperationV32;
213
214#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
215#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
216pub enum OperationV32 {
217    Insert,
218    ReplaceOrInsert,
219    Merge,
220}
221
222impl Ord for ObjectStoreMutation {
223    fn cmp(&self, other: &Self) -> Ordering {
224        self.item.key.cmp(&other.item.key)
225    }
226}
227
228impl PartialOrd for ObjectStoreMutation {
229    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
230        Some(self.cmp(other))
231    }
232}
233
234impl PartialEq for ObjectStoreMutation {
235    fn eq(&self, other: &Self) -> bool {
236        self.item.key.eq(&other.item.key)
237    }
238}
239
240impl Eq for ObjectStoreMutation {}
241
242impl Ord for AllocatorItem {
243    fn cmp(&self, other: &Self) -> Ordering {
244        self.key.cmp(&other.key)
245    }
246}
247
248impl PartialOrd for AllocatorItem {
249    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
250        Some(self.cmp(other))
251    }
252}
253
254/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
255/// then by the end.
256pub type DeviceRange = DeviceRangeV32;
257
258#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
259#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
260pub struct DeviceRangeV32(pub Range<u64>);
261
262impl Deref for DeviceRange {
263    type Target = Range<u64>;
264
265    fn deref(&self) -> &Self::Target {
266        &self.0
267    }
268}
269
270impl DerefMut for DeviceRange {
271    fn deref_mut(&mut self) -> &mut Self::Target {
272        &mut self.0
273    }
274}
275
276impl From<Range<u64>> for DeviceRange {
277    fn from(range: Range<u64>) -> Self {
278        Self(range)
279    }
280}
281
282impl Into<Range<u64>> for DeviceRange {
283    fn into(self) -> Range<u64> {
284        self.0
285    }
286}
287
288impl Ord for DeviceRange {
289    fn cmp(&self, other: &Self) -> Ordering {
290        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
291    }
292}
293
294impl PartialOrd for DeviceRange {
295    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
296        Some(self.cmp(other))
297    }
298}
299
300pub type AllocatorMutation = AllocatorMutationV32;
301
302#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
303#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
304pub enum AllocatorMutationV32 {
305    Allocate {
306        device_range: DeviceRangeV32,
307        owner_object_id: u64,
308    },
309    Deallocate {
310        device_range: DeviceRangeV32,
311        owner_object_id: u64,
312    },
313    SetLimit {
314        owner_object_id: u64,
315        bytes: u64,
316    },
317    /// Marks all extents with a given owner_object_id for deletion.
318    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
319    /// Note that the actual deletion time is undefined so this should never be used where an
320    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
321    /// should never be reused for the same reasons.
322    MarkForDeletion(u64),
323}
324
325pub type UpdateMutationsKey = UpdateMutationsKeyV40;
326
327#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
328pub struct UpdateMutationsKeyV40(pub WrappedKeyV40);
329
330#[derive(Serialize, Deserialize, TypeFingerprint)]
331pub struct UpdateMutationsKeyV32(pub WrappedKeyV32);
332
333impl From<UpdateMutationsKeyV32> for UpdateMutationsKeyV40 {
334    fn from(value: UpdateMutationsKeyV32) -> Self {
335        Self(value.0.into())
336    }
337}
338
339impl From<UpdateMutationsKey> for WrappedKey {
340    fn from(outer: UpdateMutationsKey) -> Self {
341        outer.0
342    }
343}
344
345impl From<WrappedKey> for UpdateMutationsKey {
346    fn from(inner: WrappedKey) -> Self {
347        Self(inner)
348    }
349}
350
351#[cfg(fuzz)]
352impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
353    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
354        <u128>::arbitrary(u).map(|wrapping_key_id| {
355            UpdateMutationsKey::from(WrappedKey {
356                wrapping_key_id,
357                // There doesn't seem to be much point to randomly generate crypto keys.
358                key: fxfs_crypto::WrappedKeyBytes::default(),
359            })
360        })
361    }
362}
363
364impl Ord for UpdateMutationsKey {
365    fn cmp(&self, other: &Self) -> Ordering {
366        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
367    }
368}
369
370impl PartialOrd for UpdateMutationsKey {
371    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
372        Some(self.cmp(other))
373    }
374}
375
376impl Eq for UpdateMutationsKey {}
377
378impl PartialEq for UpdateMutationsKey {
379    fn eq(&self, other: &Self) -> bool {
380        std::ptr::eq(self, other)
381    }
382}
383
384/// When creating a transaction, locks typically need to be held to prevent two or more writers
385/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
386/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
387/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
388/// the Flush lock.
389/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
390/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
391/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
392/// other transactions using the Flush lock have the same ordering as in flushing.
393#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
394pub enum LockKey {
395    /// Locks the entire filesystem.
396    Filesystem,
397
398    /// Used to lock flushing an object.
399    Flush {
400        object_id: u64,
401    },
402
403    /// Used to lock changes to a particular object attribute (e.g. writes).
404    ObjectAttribute {
405        store_object_id: u64,
406        object_id: u64,
407        attribute_id: u64,
408    },
409
410    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
411    Object {
412        store_object_id: u64,
413        object_id: u64,
414    },
415
416    ProjectId {
417        store_object_id: u64,
418        project_id: u64,
419    },
420
421    /// Used to lock any truncate operations for a file.
422    Truncate {
423        store_object_id: u64,
424        object_id: u64,
425    },
426}
427
428impl LockKey {
429    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
430        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
431    }
432
433    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
434        LockKey::Object { store_object_id, object_id }
435    }
436
437    pub const fn flush(object_id: u64) -> Self {
438        LockKey::Flush { object_id }
439    }
440
441    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
442        LockKey::Truncate { store_object_id, object_id }
443    }
444}
445
446/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
447#[derive(Clone, Debug)]
448pub enum LockKeys {
449    None,
450    Inline(LockKey),
451    Vec(Vec<LockKey>),
452}
453
454impl LockKeys {
455    pub fn with_capacity(capacity: usize) -> Self {
456        if capacity > 1 {
457            LockKeys::Vec(Vec::with_capacity(capacity))
458        } else {
459            LockKeys::None
460        }
461    }
462
463    pub fn push(&mut self, key: LockKey) {
464        match self {
465            Self::None => *self = LockKeys::Inline(key),
466            Self::Inline(inline) => {
467                *self = LockKeys::Vec(vec![*inline, key]);
468            }
469            Self::Vec(vec) => vec.push(key),
470        }
471    }
472
473    pub fn truncate(&mut self, len: usize) {
474        match self {
475            Self::None => {}
476            Self::Inline(_) => {
477                if len == 0 {
478                    *self = Self::None;
479                }
480            }
481            Self::Vec(vec) => vec.truncate(len),
482        }
483    }
484
485    fn len(&self) -> usize {
486        match self {
487            Self::None => 0,
488            Self::Inline(_) => 1,
489            Self::Vec(vec) => vec.len(),
490        }
491    }
492
493    fn contains(&self, key: &LockKey) -> bool {
494        match self {
495            Self::None => false,
496            Self::Inline(single) => single == key,
497            Self::Vec(vec) => vec.contains(key),
498        }
499    }
500
501    fn sort_unstable(&mut self) {
502        match self {
503            Self::Vec(vec) => vec.sort_unstable(),
504            _ => {}
505        }
506    }
507
508    fn dedup(&mut self) {
509        match self {
510            Self::Vec(vec) => vec.dedup(),
511            _ => {}
512        }
513    }
514
515    fn iter(&self) -> LockKeysIter<'_> {
516        match self {
517            LockKeys::None => LockKeysIter::None,
518            LockKeys::Inline(key) => LockKeysIter::Inline(key),
519            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
520        }
521    }
522}
523
524enum LockKeysIter<'a> {
525    None,
526    Inline(&'a LockKey),
527    Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
528}
529
530impl<'a> Iterator for LockKeysIter<'a> {
531    type Item = &'a LockKey;
532    fn next(&mut self) -> Option<Self::Item> {
533        match self {
534            Self::None => None,
535            Self::Inline(inline) => {
536                let next = *inline;
537                *self = Self::None;
538                Some(next)
539            }
540            Self::Vec(vec) => vec.next(),
541        }
542    }
543}
544
545impl Default for LockKeys {
546    fn default() -> Self {
547        LockKeys::None
548    }
549}
550
551#[macro_export]
552macro_rules! lock_keys {
553    () => {
554        $crate::object_store::transaction::LockKeys::None
555    };
556    ($lock_key:expr $(,)?) => {
557        $crate::object_store::transaction::LockKeys::Inline($lock_key)
558    };
559    ($($lock_keys:expr),+ $(,)?) => {
560        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
561    };
562}
563pub use lock_keys;
564
565/// Mutations in a transaction can be associated with an object so that when mutations are applied,
566/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
567/// size change is applied, we can update the cached object size.
568pub trait AssociatedObject: Send + Sync {
569    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
570    }
571}
572
573pub enum AssocObj<'a> {
574    None,
575    Borrowed(&'a (dyn AssociatedObject)),
576    Owned(Box<dyn AssociatedObject>),
577}
578
579impl AssocObj<'_> {
580    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
581        match self {
582            AssocObj::None => None,
583            AssocObj::Borrowed(ref b) => Some(f(*b)),
584            AssocObj::Owned(ref o) => Some(f(o.as_ref())),
585        }
586    }
587}
588
589pub struct TxnMutation<'a> {
590    // This, at time of writing, is either the object ID of an object store, or the object ID of the
591    // allocator.  In the case of an object mutation, there's another object ID in the mutation
592    // record that would be for the object actually being changed.
593    pub object_id: u64,
594
595    // The actual mutation.  This gets serialized to the journal.
596    pub mutation: Mutation,
597
598    // An optional associated object for the mutation.  During replay, there will always be no
599    // associated object.
600    pub associated_object: AssocObj<'a>,
601}
602
603// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
604// associated object or checksum.
605impl Ord for TxnMutation<'_> {
606    fn cmp(&self, other: &Self) -> Ordering {
607        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
608    }
609}
610
611impl PartialOrd for TxnMutation<'_> {
612    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
613        Some(self.cmp(other))
614    }
615}
616
617impl PartialEq for TxnMutation<'_> {
618    fn eq(&self, other: &Self) -> bool {
619        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
620    }
621}
622
623impl Eq for TxnMutation<'_> {}
624
625impl std::fmt::Debug for TxnMutation<'_> {
626    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
627        f.debug_struct("TxnMutation")
628            .field("object_id", &self.object_id)
629            .field("mutation", &self.mutation)
630            .finish()
631    }
632}
633
634pub enum MetadataReservation {
635    // The state after a transaction has been dropped.
636    None,
637
638    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
639    // reservation.
640    Borrowed,
641
642    // A metadata reservation was made when the transaction was created.
643    Reservation(Reservation),
644
645    // The metadata space is being _held_ within `allocator_reservation`.
646    Hold(u64),
647}
648
649/// A transaction groups mutation records to be committed as a group.
650pub struct Transaction<'a> {
651    txn_guard: TxnGuard<'a>,
652
653    // The mutations that make up this transaction.
654    mutations: BTreeSet<TxnMutation<'a>>,
655
656    // The locks that this transaction currently holds.
657    txn_locks: LockKeys,
658
659    /// If set, an allocator reservation that should be used for allocations.
660    pub allocator_reservation: Option<&'a Reservation>,
661
662    /// The reservation for the metadata for this transaction.
663    pub metadata_reservation: MetadataReservation,
664
665    // Keep track of objects explicitly created by this transaction. No locks are required for them.
666    // Addressed by (owner_object_id, object_id).
667    new_objects: BTreeSet<(u64, u64)>,
668
669    /// Any data checksums which should be evaluated when replaying this transaction.
670    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
671}
672
673impl<'a> Transaction<'a> {
674    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
675    /// at commit time.
676    pub async fn new(
677        txn_guard: TxnGuard<'a>,
678        options: Options<'a>,
679        txn_locks: LockKeys,
680    ) -> Result<Transaction<'a>, Error> {
681        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
682        let fs = txn_guard.fs().clone();
683        let guard = scopeguard::guard((), |_| fs.sub_transaction());
684        let (metadata_reservation, allocator_reservation, hold) =
685            txn_guard.fs().reservation_for_transaction(options).await?;
686
687        let txn_locks = {
688            let lock_manager = txn_guard.fs().lock_manager();
689            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
690            std::mem::take(&mut write_guard.0.lock_keys)
691        };
692        let mut transaction = Transaction {
693            txn_guard,
694            mutations: BTreeSet::new(),
695            txn_locks,
696            allocator_reservation: None,
697            metadata_reservation,
698            new_objects: BTreeSet::new(),
699            checksums: Vec::new(),
700        };
701
702        ScopeGuard::into_inner(guard);
703        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
704        transaction.allocator_reservation = allocator_reservation;
705        Ok(transaction)
706    }
707
708    pub fn txn_guard(&self) -> &TxnGuard<'_> {
709        &self.txn_guard
710    }
711
712    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
713        &self.mutations
714    }
715
716    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
717        self.new_objects.clear();
718        mem::take(&mut self.mutations)
719    }
720
721    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
722    /// old mutation is returned.
723    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
724        self.add_with_object(object_id, mutation, AssocObj::None)
725    }
726
727    /// Removes a mutation that matches `mutation`.
728    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
729        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
730        if self.mutations.remove(&txn_mutation) {
731            if let Mutation::ObjectStore(ObjectStoreMutation {
732                item:
733                    ObjectItem {
734                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
735                        ..
736                    },
737                op: Operation::Insert,
738            }) = txn_mutation.mutation
739            {
740                self.new_objects.remove(&(object_id, new_object_id));
741            }
742        }
743    }
744
745    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
746    /// and the old mutation is returned.
747    pub fn add_with_object(
748        &mut self,
749        object_id: u64,
750        mutation: Mutation,
751        associated_object: AssocObj<'a>,
752    ) -> Option<Mutation> {
753        assert!(object_id != INVALID_OBJECT_ID);
754        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
755        self.verify_locks(&txn_mutation);
756        self.mutations.replace(txn_mutation).map(|m| m.mutation)
757    }
758
759    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
760        self.checksums.push((range, checksums, first_write));
761    }
762
763    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
764        &self.checksums
765    }
766
767    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
768        std::mem::replace(&mut self.checksums, Vec::new())
769    }
770
771    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
772        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
773        // through it, but given the small set that these locks usually comprise, it probably isn't
774        // worth it.
775        if let TxnMutation {
776            mutation:
777                Mutation::ObjectStore { 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op } },
778            object_id: store_object_id,
779            ..
780        } = mutation
781        {
782            match &key.data {
783                ObjectKeyData::Attribute(..) => {
784                    // TODO(https://fxbug.dev/42073914): Check lock requirements.
785                }
786                ObjectKeyData::Child { .. }
787                | ObjectKeyData::EncryptedChild { .. }
788                | ObjectKeyData::CasefoldChild { .. } => {
789                    let id = key.object_id;
790                    if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
791                        && !self.new_objects.contains(&(*store_object_id, id))
792                    {
793                        debug_assert!(
794                            false,
795                            "Not holding required lock for object {id} \
796                                in store {store_object_id}"
797                        );
798                        error!(
799                            "Not holding required lock for object {id} in store \
800                                {store_object_id}"
801                        )
802                    }
803                }
804                ObjectKeyData::GraveyardEntry { .. } => {
805                    // TODO(https://fxbug.dev/42073911): Check lock requirements.
806                }
807                ObjectKeyData::GraveyardAttributeEntry { .. } => {
808                    // TODO(https://fxbug.dev/122974): Check lock requirements.
809                }
810                ObjectKeyData::Keys => {
811                    let id = key.object_id;
812                    if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
813                        && !self.new_objects.contains(&(*store_object_id, id))
814                    {
815                        debug_assert!(
816                            false,
817                            "Not holding required lock for object {id} \
818                                in store {store_object_id}"
819                        );
820                        error!(
821                            "Not holding required lock for object {id} in store \
822                                {store_object_id}"
823                        )
824                    }
825                }
826                ObjectKeyData::Object => match op {
827                    // Insert implies the caller expects no object with which to race
828                    Operation::Insert => {
829                        self.new_objects.insert((*store_object_id, key.object_id));
830                    }
831                    Operation::Merge | Operation::ReplaceOrInsert => {
832                        let id = key.object_id;
833                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
834                            && !self.new_objects.contains(&(*store_object_id, id))
835                        {
836                            debug_assert!(
837                                false,
838                                "Not holding required lock for object {id} \
839                                    in store {store_object_id}"
840                            );
841                            error!(
842                                "Not holding required lock for object {id} in store \
843                                    {store_object_id}"
844                            )
845                        }
846                    }
847                },
848                ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
849                    if !self.txn_locks.contains(&LockKey::ProjectId {
850                        store_object_id: *store_object_id,
851                        project_id: *project_id,
852                    }) {
853                        debug_assert!(
854                            false,
855                            "Not holding required lock for project limit id {project_id} \
856                                in store {store_object_id}"
857                        );
858                        error!(
859                            "Not holding required lock for project limit id {project_id} in \
860                                store {store_object_id}"
861                        )
862                    }
863                }
864                ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
865                    Operation::Insert | Operation::ReplaceOrInsert => {
866                        panic!(
867                            "Project usage is all handled by merging deltas, no inserts or \
868                                replacements should be used"
869                        );
870                    }
871                    // Merges are all handled like atomic +/- and serialized by the tree locks.
872                    Operation::Merge => {}
873                },
874                ObjectKeyData::ExtendedAttribute { .. } => {
875                    let id = key.object_id;
876                    if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
877                        && !self.new_objects.contains(&(*store_object_id, id))
878                    {
879                        debug_assert!(
880                            false,
881                            "Not holding required lock for object {id} \
882                                in store {store_object_id} while mutating extended attribute"
883                        );
884                        error!(
885                            "Not holding required lock for object {id} in store \
886                                {store_object_id} while mutating extended attribute"
887                        )
888                    }
889                }
890            }
891        }
892    }
893
894    /// Returns true if this transaction has no mutations.
895    pub fn is_empty(&self) -> bool {
896        self.mutations.is_empty()
897    }
898
899    /// Searches for an existing object mutation within the transaction that has the given key and
900    /// returns it if found.
901    pub fn get_object_mutation(
902        &self,
903        store_object_id: u64,
904        key: ObjectKey,
905    ) -> Option<&ObjectStoreMutation> {
906        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
907            self.mutations.get(&TxnMutation {
908                object_id: store_object_id,
909                mutation: Mutation::insert_object(key, ObjectValue::None),
910                associated_object: AssocObj::None,
911            })
912        {
913            Some(mutation)
914        } else {
915            None
916        }
917    }
918
919    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
920    pub async fn commit(mut self) -> Result<u64, Error> {
921        debug!(txn:? = &self; "Commit");
922        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
923    }
924
925    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
926    /// parameter which is the journal offset of the transaction.
927    pub async fn commit_with_callback<R: Send>(
928        mut self,
929        f: impl FnOnce(u64) -> R + Send,
930    ) -> Result<R, Error> {
931        debug!(txn:? = &self; "Commit");
932        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
933        // do that (for performance reasons), hence the reason for the following.
934        let mut f = Some(f);
935        let mut result = None;
936        self.txn_guard
937            .fs()
938            .clone()
939            .commit_transaction(&mut self, &mut |offset| {
940                result = Some(f.take().unwrap()(offset));
941            })
942            .await?;
943        Ok(result.unwrap())
944    }
945
946    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
947    /// dropped (but transaction locks will get downgraded to read locks).
948    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
949        debug!(txn:? = self; "Commit");
950        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
951        assert!(self.mutations.is_empty());
952        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
953        Ok(())
954    }
955}
956
957impl Drop for Transaction<'_> {
958    fn drop(&mut self) {
959        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
960        // LockManager's drop_transaction to ensure the locks are released.
961        debug!(txn:? = &self; "Drop");
962        self.txn_guard.fs().clone().drop_transaction(self);
963    }
964}
965
966impl std::fmt::Debug for Transaction<'_> {
967    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
968        f.debug_struct("Transaction")
969            .field("mutations", &self.mutations)
970            .field("txn_locks", &self.txn_locks)
971            .field("reservation", &self.allocator_reservation)
972            .finish()
973    }
974}
975
976pub enum BorrowedOrOwned<'a, T> {
977    Borrowed(&'a T),
978    Owned(T),
979}
980
981impl<T> Deref for BorrowedOrOwned<'_, T> {
982    type Target = T;
983
984    fn deref(&self) -> &Self::Target {
985        match self {
986            BorrowedOrOwned::Borrowed(b) => b,
987            BorrowedOrOwned::Owned(o) => &o,
988        }
989    }
990}
991
992impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
993    fn from(value: &'a T) -> Self {
994        BorrowedOrOwned::Borrowed(value)
995    }
996}
997
998impl<T> From<T> for BorrowedOrOwned<'_, T> {
999    fn from(value: T) -> Self {
1000        BorrowedOrOwned::Owned(value)
1001    }
1002}
1003
1004/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1005/// implementation would typically have one of these.
1006///
1007/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1008/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1009/// upgradeable read lock).  When first acquired, these block other writes (including other
1010/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1011/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1012/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1013/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1014/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1015/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1016/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1017/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1018/// waiting and there are only readers, readers will queue up behind the writer.
1019///
1020/// To summarize:
1021///
1022/// +-------------------------+-----------------+----------------+------------------+
1023/// |                         | While read_lock | While txn_lock | While write_lock |
1024/// |                         | is held         | is held        | is held          |
1025/// +-------------------------+-----------------+----------------+------------------+
1026/// | Can acquire read_lock?  | true            | true           | false            |
1027/// +-------------------------+-----------------+----------------+------------------+
1028/// | Can acquire txn_lock?   | true            | false          | false            |
1029/// +-------------------------+-----------------+----------------+------------------+
1030/// | Can acquire write_lock? | false           | false          | false            |
1031/// +-------------------------+-----------------+----------------+------------------+
1032pub struct LockManager {
1033    locks: Mutex<Locks>,
1034}
1035
1036struct Locks {
1037    keys: HashMap<LockKey, LockEntry>,
1038}
1039
1040impl Locks {
1041    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1042        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1043            let entry = occupied.get_mut();
1044            let wake = match state {
1045                LockState::ReadLock => {
1046                    entry.read_count -= 1;
1047                    entry.read_count == 0
1048                }
1049                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1050                LockState::Locked | LockState::WriteLock => {
1051                    entry.state = LockState::ReadLock;
1052                    true
1053                }
1054            };
1055            if wake {
1056                // SAFETY: The lock in `LockManager::locks` is held.
1057                unsafe {
1058                    entry.wake();
1059                }
1060                if entry.can_remove() {
1061                    occupied.remove_entry();
1062                }
1063            }
1064        } else {
1065            unreachable!();
1066        }
1067    }
1068
1069    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1070        for lock in lock_keys.iter() {
1071            self.drop_lock(*lock, LockState::ReadLock);
1072        }
1073    }
1074
1075    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1076        for lock in lock_keys.iter() {
1077            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1078            // states.
1079            self.drop_lock(*lock, LockState::WriteLock);
1080        }
1081    }
1082
1083    // Downgrades locks from WriteLock to Locked.
1084    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1085        for lock in lock_keys.iter() {
1086            // SAFETY: The lock in `LockManager::locks` is held.
1087            unsafe {
1088                self.keys.get_mut(lock).unwrap().downgrade_lock();
1089            }
1090        }
1091    }
1092}
1093
1094#[derive(Debug)]
1095struct LockEntry {
1096    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1097    // to indicate the number of active readers.
1098    read_count: u64,
1099
1100    // The state of the lock (see below).
1101    state: LockState,
1102
1103    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1104    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1105    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1106    // be possible to use intrusive-collections in the future.
1107    head: *const LockWaker,
1108    tail: *const LockWaker,
1109}
1110
1111unsafe impl Send for LockEntry {}
1112
1113// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1114// when LockManager's `locks` member is locked.
1115struct LockWaker {
1116    // The next and previous pointers in the doubly-linked list.
1117    next: UnsafeCell<*const LockWaker>,
1118    prev: UnsafeCell<*const LockWaker>,
1119
1120    // Holds the lock key for this waker.  This is required so that we can find the associated
1121    // `LockEntry`.
1122    key: LockKey,
1123
1124    // The underlying waker that should be used to wake the task.
1125    waker: UnsafeCell<WakerState>,
1126
1127    // The target state for this waker.
1128    target_state: LockState,
1129
1130    // True if this is an upgrade.
1131    is_upgrade: bool,
1132
1133    // We need to be pinned because these form part of the linked list.
1134    _pin: PhantomPinned,
1135}
1136
1137enum WakerState {
1138    // This is the initial state before the waker has been first polled.
1139    Pending,
1140
1141    // Once polled, this contains the actual waker.
1142    Registered(Waker),
1143
1144    // The waker has been woken and has been granted the lock.
1145    Woken,
1146}
1147
1148impl WakerState {
1149    fn is_woken(&self) -> bool {
1150        matches!(self, WakerState::Woken)
1151    }
1152}
1153
1154unsafe impl Send for LockWaker {}
1155unsafe impl Sync for LockWaker {}
1156
1157impl LockWaker {
1158    // Waits for the waker to be woken.
1159    async fn wait(&self, manager: &LockManager) {
1160        // We must guard against the future being dropped.
1161        let waker_guard = scopeguard::guard((), |_| {
1162            let mut locks = manager.locks.lock();
1163            // SAFETY: We've acquired the lock.
1164            unsafe {
1165                if (*self.waker.get()).is_woken() {
1166                    // We were woken, but didn't actually run, so we must drop the lock.
1167                    if self.is_upgrade {
1168                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1169                    } else {
1170                        locks.drop_lock(self.key, self.target_state);
1171                    }
1172                } else {
1173                    // We haven't been woken but we've been dropped so we must remove ourself from
1174                    // the waker list.
1175                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1176                }
1177            }
1178        });
1179
1180        poll_fn(|cx| {
1181            let _locks = manager.locks.lock();
1182            // SAFETY: We've acquired the lock.
1183            unsafe {
1184                if (*self.waker.get()).is_woken() {
1185                    Poll::Ready(())
1186                } else {
1187                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1188                    Poll::Pending
1189                }
1190            }
1191        })
1192        .await;
1193
1194        ScopeGuard::into_inner(waker_guard);
1195    }
1196}
1197
1198#[derive(Copy, Clone, Debug, PartialEq)]
1199enum LockState {
1200    // In this state, there are only readers.
1201    ReadLock,
1202
1203    // This state is used for transactions to lock other writers (including other transactions), but
1204    // it still allows readers.
1205    Locked,
1206
1207    // A writer has exclusive access; all other readers and writers are blocked.
1208    WriteLock,
1209}
1210
1211impl LockManager {
1212    pub fn new() -> Self {
1213        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1214    }
1215
1216    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1217    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1218    /// call LockManager's drop_transaction method.
1219    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1220        TransactionLocks(
1221            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1222        )
1223    }
1224
1225    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1226    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1227    async fn lock<'a>(
1228        &'a self,
1229        mut lock_keys: LockKeys,
1230        target_state: LockState,
1231    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1232        let mut guard = match &target_state {
1233            LockState::ReadLock => Left(ReadGuard {
1234                manager: self.into(),
1235                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1236            }),
1237            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1238                manager: self.into(),
1239                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1240            }),
1241        };
1242        let guard_keys = match &mut guard {
1243            Left(g) => &mut g.lock_keys,
1244            Right(g) => &mut g.lock_keys,
1245        };
1246        lock_keys.sort_unstable();
1247        lock_keys.dedup();
1248        for lock in lock_keys.iter() {
1249            let lock_waker = None;
1250            pin_mut!(lock_waker);
1251            {
1252                let mut locks = self.locks.lock();
1253                match locks.keys.entry(*lock) {
1254                    Entry::Vacant(vacant) => {
1255                        vacant.insert(LockEntry {
1256                            read_count: if let LockState::ReadLock = target_state {
1257                                guard_keys.push(*lock);
1258                                1
1259                            } else {
1260                                guard_keys.push(*lock);
1261                                0
1262                            },
1263                            state: target_state,
1264                            head: std::ptr::null(),
1265                            tail: std::ptr::null(),
1266                        });
1267                    }
1268                    Entry::Occupied(mut occupied) => {
1269                        let entry = occupied.get_mut();
1270                        // SAFETY: We've acquired the lock.
1271                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1272                            if let LockState::ReadLock = target_state {
1273                                entry.read_count += 1;
1274                                guard_keys.push(*lock);
1275                            } else {
1276                                entry.state = target_state;
1277                                guard_keys.push(*lock);
1278                            }
1279                        } else {
1280                            // Initialise a waker and push it on the tail of the list.
1281                            // SAFETY: `lock_waker` isn't used prior to this point.
1282                            unsafe {
1283                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1284                                    next: UnsafeCell::new(std::ptr::null()),
1285                                    prev: UnsafeCell::new(entry.tail),
1286                                    key: *lock,
1287                                    waker: UnsafeCell::new(WakerState::Pending),
1288                                    target_state: target_state,
1289                                    is_upgrade: false,
1290                                    _pin: PhantomPinned,
1291                                });
1292                            }
1293                            let waker = (*lock_waker).as_ref().unwrap();
1294                            if entry.tail.is_null() {
1295                                entry.head = waker;
1296                            } else {
1297                                // SAFETY: We've acquired the lock.
1298                                unsafe {
1299                                    *(*entry.tail).next.get() = waker;
1300                                }
1301                            }
1302                            entry.tail = waker;
1303                        }
1304                    }
1305                }
1306            }
1307            if let Some(waker) = &*lock_waker {
1308                waker.wait(self).await;
1309                guard_keys.push(*lock);
1310            }
1311        }
1312        guard
1313    }
1314
1315    /// This should be called by the filesystem's drop_transaction implementation.
1316    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1317        let mut locks = self.locks.lock();
1318        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1319    }
1320
1321    /// Prepares to commit by waiting for readers to finish.
1322    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1323        self.commit_prepare_keys(&transaction.txn_locks).await;
1324    }
1325
1326    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1327        for lock in lock_keys.iter() {
1328            let lock_waker = None;
1329            pin_mut!(lock_waker);
1330            {
1331                let mut locks = self.locks.lock();
1332                let entry = locks.keys.get_mut(lock).unwrap();
1333                assert_eq!(entry.state, LockState::Locked);
1334
1335                if entry.read_count == 0 {
1336                    entry.state = LockState::WriteLock;
1337                } else {
1338                    // Initialise a waker and push it on the head of the list.
1339                    // SAFETY: `lock_waker` isn't used prior to this point.
1340                    unsafe {
1341                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1342                            next: UnsafeCell::new(entry.head),
1343                            prev: UnsafeCell::new(std::ptr::null()),
1344                            key: *lock,
1345                            waker: UnsafeCell::new(WakerState::Pending),
1346                            target_state: LockState::WriteLock,
1347                            is_upgrade: true,
1348                            _pin: PhantomPinned,
1349                        });
1350                    }
1351                    let waker = (*lock_waker).as_ref().unwrap();
1352                    if entry.head.is_null() {
1353                        entry.tail = (*lock_waker).as_ref().unwrap();
1354                    } else {
1355                        // SAFETY: We've acquired the lock.
1356                        unsafe {
1357                            *(*entry.head).prev.get() = waker;
1358                        }
1359                    }
1360                    entry.head = waker;
1361                }
1362            }
1363
1364            if let Some(waker) = &*lock_waker {
1365                waker.wait(self).await;
1366            }
1367        }
1368    }
1369
1370    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1371    /// is being committed for the same locks.  They are only necessary where consistency is
1372    /// required between different mutations within a transaction.  For example, a write might
1373    /// change the size and extents for an object, in which case a read lock is required so that
1374    /// observed size and extents are seen together or not at all.
1375    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1376        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1377    }
1378
1379    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1380    /// requested lock keys.
1381    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1382        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1383    }
1384
1385    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1386    /// not in the WriteLock state.
1387    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1388        self.locks.lock().downgrade_locks(lock_keys);
1389    }
1390}
1391
1392// These unsafe functions require that `locks` in LockManager is locked.
1393impl LockEntry {
1394    unsafe fn wake(&mut self) {
1395        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1396        if self.head.is_null() || self.state == LockState::WriteLock {
1397            return;
1398        }
1399
1400        let waker = &*self.head;
1401
1402        if waker.is_upgrade {
1403            if self.read_count > 0 {
1404                return;
1405            }
1406        } else if !self.is_allowed(waker.target_state, true) {
1407            return;
1408        }
1409
1410        self.pop_and_wake();
1411
1412        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1413        // waking up readers.
1414        if waker.target_state == LockState::WriteLock {
1415            return;
1416        }
1417
1418        while !self.head.is_null() && (*self.head).target_state == LockState::ReadLock {
1419            self.pop_and_wake();
1420        }
1421    }
1422
1423    unsafe fn pop_and_wake(&mut self) {
1424        let waker = &*self.head;
1425
1426        // Pop the waker.
1427        self.head = *waker.next.get();
1428        if self.head.is_null() {
1429            self.tail = std::ptr::null()
1430        } else {
1431            *(*self.head).prev.get() = std::ptr::null();
1432        }
1433
1434        // Adjust our state accordingly.
1435        if waker.target_state == LockState::ReadLock {
1436            self.read_count += 1;
1437        } else {
1438            self.state = waker.target_state;
1439        }
1440
1441        // Now wake the task.
1442        if let WakerState::Registered(waker) =
1443            std::mem::replace(&mut *waker.waker.get(), WakerState::Woken)
1444        {
1445            waker.wake();
1446        }
1447    }
1448
1449    fn can_remove(&self) -> bool {
1450        self.state == LockState::ReadLock && self.read_count == 0
1451    }
1452
1453    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1454        let is_first = (*waker.prev.get()).is_null();
1455        if is_first {
1456            self.head = *waker.next.get();
1457        } else {
1458            *(**waker.prev.get()).next.get() = *waker.next.get();
1459        }
1460        if (*waker.next.get()).is_null() {
1461            self.tail = *waker.prev.get();
1462        } else {
1463            *(**waker.next.get()).prev.get() = *waker.prev.get();
1464        }
1465        if is_first {
1466            // We must call wake in case we erased a pending write lock and readers can now proceed.
1467            self.wake();
1468        }
1469    }
1470
1471    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1472    // true if this is something at the head of the waker list (or the waker list is empty) and
1473    // false if there are other items on the waker list that are prior.
1474    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1475        match self.state {
1476            LockState::ReadLock => {
1477                // Allow ReadLock and Locked so long as nothing else is waiting.
1478                (self.read_count == 0
1479                    || target_state == LockState::Locked
1480                    || target_state == LockState::ReadLock)
1481                    && is_head
1482            }
1483            LockState::Locked => {
1484                // Always allow reads unless there's an upgrade waiting.  We have to
1485                // always allow reads in this state because tasks that have locks in
1486                // the Locked state can later try and acquire ReadLock.
1487                target_state == LockState::ReadLock && (is_head || !(*self.head).is_upgrade)
1488            }
1489            LockState::WriteLock => false,
1490        }
1491    }
1492
1493    unsafe fn downgrade_lock(&mut self) {
1494        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1495        self.wake();
1496    }
1497}
1498
1499#[must_use]
1500pub struct ReadGuard<'a> {
1501    manager: LockManagerRef<'a>,
1502    lock_keys: LockKeys,
1503}
1504
1505impl ReadGuard<'_> {
1506    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1507        if let LockManagerRef::Owned(fs) = &self.manager {
1508            Some(fs)
1509        } else {
1510            None
1511        }
1512    }
1513
1514    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1515        ReadGuard {
1516            manager: LockManagerRef::Owned(fs),
1517            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1518        }
1519    }
1520}
1521
1522impl Drop for ReadGuard<'_> {
1523    fn drop(&mut self) {
1524        let mut locks = self.manager.locks.lock();
1525        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1526    }
1527}
1528
1529impl fmt::Debug for ReadGuard<'_> {
1530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1531        f.debug_struct("ReadGuard")
1532            .field("manager", &(&self.manager as *const _))
1533            .field("lock_keys", &self.lock_keys)
1534            .finish()
1535    }
1536}
1537
1538#[must_use]
1539pub struct WriteGuard<'a> {
1540    manager: LockManagerRef<'a>,
1541    lock_keys: LockKeys,
1542}
1543
1544impl Drop for WriteGuard<'_> {
1545    fn drop(&mut self) {
1546        let mut locks = self.manager.locks.lock();
1547        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1548    }
1549}
1550
1551impl fmt::Debug for WriteGuard<'_> {
1552    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1553        f.debug_struct("WriteGuard")
1554            .field("manager", &(&self.manager as *const _))
1555            .field("lock_keys", &self.lock_keys)
1556            .finish()
1557    }
1558}
1559
1560enum LockManagerRef<'a> {
1561    Borrowed(&'a LockManager),
1562    Owned(Arc<FxFilesystem>),
1563}
1564
1565impl Deref for LockManagerRef<'_> {
1566    type Target = LockManager;
1567
1568    fn deref(&self) -> &Self::Target {
1569        match self {
1570            LockManagerRef::Borrowed(m) => m,
1571            LockManagerRef::Owned(f) => f.lock_manager(),
1572        }
1573    }
1574}
1575
1576impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1577    fn from(value: &'a LockManager) -> Self {
1578        LockManagerRef::Borrowed(value)
1579    }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1585    use crate::filesystem::FxFilesystem;
1586    use fuchsia_async as fasync;
1587    use fuchsia_sync::Mutex;
1588    use futures::channel::oneshot::channel;
1589    use futures::future::FutureExt;
1590    use futures::stream::FuturesUnordered;
1591    use futures::{join, pin_mut, StreamExt};
1592    use std::task::Poll;
1593    use std::time::Duration;
1594    use storage_device::fake_device::FakeDevice;
1595    use storage_device::DeviceHolder;
1596
1597    #[fuchsia::test]
1598    async fn test_simple() {
1599        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1600        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1601        let mut t = fs
1602            .clone()
1603            .new_transaction(lock_keys![], Options::default())
1604            .await
1605            .expect("new_transaction failed");
1606        t.add(1, Mutation::BeginFlush);
1607        assert!(!t.is_empty());
1608    }
1609
1610    #[fuchsia::test]
1611    async fn test_locks() {
1612        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1613        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1614        let (send1, recv1) = channel();
1615        let (send2, recv2) = channel();
1616        let (send3, recv3) = channel();
1617        let done = Mutex::new(false);
1618        let mut futures = FuturesUnordered::new();
1619        futures.push(
1620            async {
1621                let _t = fs
1622                    .clone()
1623                    .new_transaction(
1624                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1625                        Options::default(),
1626                    )
1627                    .await
1628                    .expect("new_transaction failed");
1629                send1.send(()).unwrap(); // Tell the next future to continue.
1630                send3.send(()).unwrap(); // Tell the last future to continue.
1631                recv2.await.unwrap();
1632                // This is a halting problem so all we can do is sleep.
1633                fasync::Timer::new(Duration::from_millis(100)).await;
1634                assert!(!*done.lock());
1635            }
1636            .boxed(),
1637        );
1638        futures.push(
1639            async {
1640                recv1.await.unwrap();
1641                // This should not block since it is a different key.
1642                let _t = fs
1643                    .clone()
1644                    .new_transaction(
1645                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1646                        Options::default(),
1647                    )
1648                    .await
1649                    .expect("new_transaction failed");
1650                // Tell the first future to continue.
1651                send2.send(()).unwrap();
1652            }
1653            .boxed(),
1654        );
1655        futures.push(
1656            async {
1657                // This should block until the first future has completed.
1658                recv3.await.unwrap();
1659                let _t = fs
1660                    .clone()
1661                    .new_transaction(
1662                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1663                        Options::default(),
1664                    )
1665                    .await;
1666                *done.lock() = true;
1667            }
1668            .boxed(),
1669        );
1670        while let Some(()) = futures.next().await {}
1671    }
1672
1673    #[fuchsia::test]
1674    async fn test_read_lock_after_write_lock() {
1675        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1676        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1677        let (send1, recv1) = channel();
1678        let (send2, recv2) = channel();
1679        let done = Mutex::new(false);
1680        join!(
1681            async {
1682                let t = fs
1683                    .clone()
1684                    .new_transaction(
1685                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1686                        Options::default(),
1687                    )
1688                    .await
1689                    .expect("new_transaction failed");
1690                send1.send(()).unwrap(); // Tell the next future to continue.
1691                recv2.await.unwrap();
1692                t.commit().await.expect("commit failed");
1693                *done.lock() = true;
1694            },
1695            async {
1696                recv1.await.unwrap();
1697                // Reads should not be blocked until the transaction is committed.
1698                let _guard = fs
1699                    .lock_manager()
1700                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1701                    .await;
1702                // Tell the first future to continue.
1703                send2.send(()).unwrap();
1704                // It shouldn't proceed until we release our read lock, but it's a halting
1705                // problem, so sleep.
1706                fasync::Timer::new(Duration::from_millis(100)).await;
1707                assert!(!*done.lock());
1708            },
1709        );
1710    }
1711
1712    #[fuchsia::test]
1713    async fn test_write_lock_after_read_lock() {
1714        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1715        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1716        let (send1, recv1) = channel();
1717        let (send2, recv2) = channel();
1718        let done = Mutex::new(false);
1719        join!(
1720            async {
1721                // Reads should not be blocked until the transaction is committed.
1722                let _guard = fs
1723                    .lock_manager()
1724                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1725                    .await;
1726                // Tell the next future to continue and then wait.
1727                send1.send(()).unwrap();
1728                recv2.await.unwrap();
1729                // It shouldn't proceed until we release our read lock, but it's a halting
1730                // problem, so sleep.
1731                fasync::Timer::new(Duration::from_millis(100)).await;
1732                assert!(!*done.lock());
1733            },
1734            async {
1735                recv1.await.unwrap();
1736                let t = fs
1737                    .clone()
1738                    .new_transaction(
1739                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1740                        Options::default(),
1741                    )
1742                    .await
1743                    .expect("new_transaction failed");
1744                send2.send(()).unwrap(); // Tell the first future to continue;
1745                t.commit().await.expect("commit failed");
1746                *done.lock() = true;
1747            },
1748        );
1749    }
1750
1751    #[fuchsia::test]
1752    async fn test_drop_uncommitted_transaction() {
1753        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1754        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1755        let key = lock_keys![LockKey::object(1, 1)];
1756
1757        // Dropping while there's a reader.
1758        {
1759            let _write_lock = fs
1760                .clone()
1761                .new_transaction(key.clone(), Options::default())
1762                .await
1763                .expect("new_transaction failed");
1764            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1765        }
1766        // Dropping while there's no reader.
1767        {
1768            let _write_lock = fs
1769                .clone()
1770                .new_transaction(key.clone(), Options::default())
1771                .await
1772                .expect("new_transaction failed");
1773        }
1774        // Make sure we can take the lock again (i.e. it was actually released).
1775        fs.clone()
1776            .new_transaction(key.clone(), Options::default())
1777            .await
1778            .expect("new_transaction failed");
1779    }
1780
1781    #[fuchsia::test]
1782    async fn test_drop_waiting_write_lock() {
1783        let manager = LockManager::new();
1784        let keys = lock_keys![LockKey::object(1, 1)];
1785        {
1786            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1787            if let Poll::Ready(_) =
1788                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1789            {
1790                assert!(false);
1791            }
1792        }
1793        let _ = manager.lock(keys, LockState::WriteLock).await;
1794    }
1795
1796    #[fuchsia::test]
1797    async fn test_write_lock_blocks_everything() {
1798        let manager = LockManager::new();
1799        let keys = lock_keys![LockKey::object(1, 1)];
1800        {
1801            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1802            if let Poll::Ready(_) =
1803                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1804            {
1805                assert!(false);
1806            }
1807            if let Poll::Ready(_) =
1808                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1809            {
1810                assert!(false);
1811            }
1812        }
1813        {
1814            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1815        }
1816        {
1817            let _guard = manager.lock(keys, LockState::ReadLock).await;
1818        }
1819    }
1820
1821    #[fuchsia::test]
1822    async fn test_downgrade_locks() {
1823        let manager = LockManager::new();
1824        let keys = lock_keys![LockKey::object(1, 1)];
1825        let _guard = manager.txn_lock(keys.clone()).await;
1826        manager.commit_prepare_keys(&keys).await;
1827
1828        // Use FuturesUnordered so that we can check that the waker is woken.
1829        let mut read_lock: FuturesUnordered<_> =
1830            std::iter::once(manager.read_lock(keys.clone())).collect();
1831
1832        // Trying to acquire a read lock now should be blocked.
1833        assert!(futures::poll!(read_lock.next()).is_pending());
1834
1835        manager.downgrade_locks(&keys);
1836
1837        // After downgrading, it should be possible to take a read lock.
1838        assert!(futures::poll!(read_lock.next()).is_ready());
1839    }
1840
1841    #[fuchsia::test]
1842    async fn test_dropped_write_lock_wakes() {
1843        let manager = LockManager::new();
1844        let keys = lock_keys![LockKey::object(1, 1)];
1845        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1846        let mut read_lock = FuturesUnordered::new();
1847        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1848
1849        {
1850            let write_lock = manager.lock(keys, LockState::WriteLock);
1851            pin_mut!(write_lock);
1852
1853            // The write lock should be blocked because of the read lock.
1854            assert!(futures::poll!(write_lock).is_pending());
1855
1856            // Another read lock should be blocked because of the write lock.
1857            assert!(futures::poll!(read_lock.next()).is_pending());
1858        }
1859
1860        // Dropping the write lock should allow the read lock to proceed.
1861        assert!(futures::poll!(read_lock.next()).is_ready());
1862    }
1863
1864    #[fuchsia::test]
1865    async fn test_drop_upgrade() {
1866        let manager = LockManager::new();
1867        let keys = lock_keys![LockKey::object(1, 1)];
1868        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1869
1870        {
1871            let commit_prepare = manager.commit_prepare_keys(&keys);
1872            pin_mut!(commit_prepare);
1873            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1874            assert!(futures::poll!(commit_prepare).is_pending());
1875
1876            // Now we test dropping read_guard which should wake commit_prepare and
1877            // then dropping commit_prepare.
1878        }
1879
1880        // We should be able to still commit_prepare.
1881        manager.commit_prepare_keys(&keys).await;
1882    }
1883
1884    #[fasync::run_singlethreaded(test)]
1885    async fn test_woken_upgrade_blocks_reads() {
1886        let manager = LockManager::new();
1887        let keys = lock_keys![LockKey::object(1, 1)];
1888        // Start with a transaction lock.
1889        let guard = manager.lock(keys.clone(), LockState::Locked).await;
1890
1891        // Take a read lock.
1892        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1893
1894        // Try and upgrade the transaction lock, which should not be possible because of the read.
1895        let commit_prepare = manager.commit_prepare_keys(&keys);
1896        pin_mut!(commit_prepare);
1897        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1898
1899        // Taking another read should also be blocked.
1900        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1901        pin_mut!(read2);
1902        assert!(futures::poll!(read2.as_mut()).is_pending());
1903
1904        // Drop the first read and the upgrade should complete.
1905        std::mem::drop(read1);
1906        assert!(futures::poll!(commit_prepare).is_ready());
1907
1908        // But the second read should still be blocked.
1909        assert!(futures::poll!(read2.as_mut()).is_pending());
1910
1911        // If we drop the write lock now, the read should be unblocked.
1912        std::mem::drop(guard);
1913        assert!(futures::poll!(read2).is_ready());
1914    }
1915
1916    static LOCK_KEY_1: LockKey = LockKey::flush(1);
1917    static LOCK_KEY_2: LockKey = LockKey::flush(2);
1918    static LOCK_KEY_3: LockKey = LockKey::flush(3);
1919
1920    // The keys, storage method, and capacity must all match.
1921    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1922        match (value, expected) {
1923            (LockKeys::None, LockKeys::None) => {}
1924            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
1925                if key1 != key2 {
1926                    panic!("{key1:?} != {key2:?}");
1927                }
1928            }
1929            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
1930                if vec1 != vec2 {
1931                    panic!("{vec1:?} != {vec2:?}");
1932                }
1933                if vec1.capacity() != vec2.capacity() {
1934                    panic!(
1935                        "LockKeys have different capacity: {} != {}",
1936                        vec1.capacity(),
1937                        vec2.capacity()
1938                    );
1939                }
1940            }
1941            (_, _) => panic!("{value:?} != {expected:?}"),
1942        }
1943    }
1944
1945    // Only the keys must match. Storage method and capacity don't matter.
1946    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
1947        let value: Vec<_> = value.iter().collect();
1948        let expected: Vec<_> = expected.iter().collect();
1949        assert_eq!(value, expected);
1950    }
1951
1952    #[test]
1953    fn test_lock_keys_macro() {
1954        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
1955        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
1956        assert_lock_keys_equal(
1957            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
1958            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
1959        );
1960    }
1961
1962    #[test]
1963    fn test_lock_keys_with_capacity() {
1964        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
1965        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
1966        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
1967    }
1968
1969    #[test]
1970    fn test_lock_keys_len() {
1971        assert_eq!(lock_keys![].len(), 0);
1972        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
1973        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
1974    }
1975
1976    #[test]
1977    fn test_lock_keys_contains() {
1978        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
1979        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
1980        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
1981        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
1982        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
1983        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
1984    }
1985
1986    #[test]
1987    fn test_lock_keys_push() {
1988        let mut keys = lock_keys![];
1989        keys.push(LOCK_KEY_1);
1990        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
1991        keys.push(LOCK_KEY_2);
1992        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
1993        keys.push(LOCK_KEY_3);
1994        assert_lock_keys_equivalent(
1995            &keys,
1996            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
1997        );
1998    }
1999
2000    #[test]
2001    fn test_lock_keys_sort_unstable() {
2002        let mut keys = lock_keys![];
2003        keys.sort_unstable();
2004        assert_lock_keys_equal(&keys, &lock_keys![]);
2005
2006        let mut keys = lock_keys![LOCK_KEY_1];
2007        keys.sort_unstable();
2008        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2009
2010        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2011        keys.sort_unstable();
2012        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2013    }
2014
2015    #[test]
2016    fn test_lock_keys_dedup() {
2017        let mut keys = lock_keys![];
2018        keys.dedup();
2019        assert_lock_keys_equal(&keys, &lock_keys![]);
2020
2021        let mut keys = lock_keys![LOCK_KEY_1];
2022        keys.dedup();
2023        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2024
2025        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2026        keys.dedup();
2027        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2028    }
2029
2030    #[test]
2031    fn test_lock_keys_truncate() {
2032        let mut keys = lock_keys![];
2033        keys.truncate(5);
2034        assert_lock_keys_equal(&keys, &lock_keys![]);
2035        keys.truncate(0);
2036        assert_lock_keys_equal(&keys, &lock_keys![]);
2037
2038        let mut keys = lock_keys![LOCK_KEY_1];
2039        keys.truncate(5);
2040        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2041        keys.truncate(0);
2042        assert_lock_keys_equal(&keys, &lock_keys![]);
2043
2044        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2045        keys.truncate(5);
2046        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2047        keys.truncate(1);
2048        // Although there's only 1 key after truncate the key is not stored inline.
2049        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2050    }
2051
2052    #[test]
2053    fn test_lock_keys_iter() {
2054        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2055
2056        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2057
2058        assert_eq!(
2059            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2060            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2061        );
2062    }
2063}