fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7mod data_object_handle;
8pub mod directory;
9mod extent_record;
10mod flush;
11pub mod graveyard;
12pub mod journal;
13mod key_manager;
14pub(crate) mod merge;
15pub mod object_manager;
16pub mod object_record;
17pub mod project_id;
18mod store_object_handle;
19pub mod transaction;
20mod tree;
21mod tree_cache;
22pub mod volume;
23
24pub use data_object_handle::{
25    DataObjectHandle, DirectWriter, FsverityState, FsverityStateInner, RangeType,
26};
27pub use directory::Directory;
28pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
29pub use store_object_handle::{
30    SetExtendedAttributeMode, StoreObjectHandle, EXTENDED_ATTRIBUTE_RANGE_END,
31    EXTENDED_ATTRIBUTE_RANGE_START,
32};
33
34use crate::errors::FxfsError;
35use crate::filesystem::{
36    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions, TxnGuard, MAX_FILE_SIZE,
37};
38use crate::log::*;
39use crate::lsm_tree::cache::{NullCache, ObjectCache};
40use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
41use crate::lsm_tree::{LSMTree, Query};
42use crate::object_handle::{ObjectHandle, ObjectProperties, ReadObjectHandle, INVALID_OBJECT_ID};
43use crate::object_store::allocator::Allocator;
44use crate::object_store::graveyard::Graveyard;
45use crate::object_store::journal::{JournalCheckpoint, JournaledTransaction};
46use crate::object_store::key_manager::KeyManager;
47use crate::object_store::transaction::{
48    lock_keys, AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options,
49    Transaction,
50};
51use crate::range::RangeExt;
52use crate::round::round_up;
53use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, VersionedLatest};
54use anyhow::{anyhow, bail, ensure, Context, Error};
55use async_trait::async_trait;
56use base64::engine::general_purpose::URL_SAFE_NO_PAD as BASE64_URL_SAFE_NO_PAD;
57use base64::engine::Engine as _;
58use fidl_fuchsia_io as fio;
59use fprint::TypeFingerprint;
60use fuchsia_inspect::ArrayProperty;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    Crypt, KeyPurpose, StreamCipher, UnwrappedKey, WrappedKey, WrappedKeyV32, WrappedKeyV40,
65    WrappedKeys,
66};
67use mundane::hash::{Digest, Hasher, Sha256};
68use once_cell::sync::OnceCell;
69use scopeguard::ScopeGuard;
70use serde::{Deserialize, Serialize};
71use std::fmt;
72use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
73use std::sync::{Arc, Weak};
74use storage_device::Device;
75use uuid::Uuid;
76
77pub use extent_record::{
78    ExtentKey, ExtentMode, ExtentValue, BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID,
79    FSVERITY_MERKLE_ATTRIBUTE_ID,
80};
81pub use object_record::{
82    AttributeKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, ObjectAttributes,
83    ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, ProjectProperty, RootDigest,
84};
85pub use transaction::Mutation;
86
87// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
88// attacks more difficult. This mask can be used to extract the hi part of the object ID.
89const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
90
91// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
92const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
93
94// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
95// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
96// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
97// key to encrypt large extended attributes. Thus, encrypted files and directories with large
98// xattrs will have both an fscrypt and volume data key.
99pub const VOLUME_DATA_KEY_ID: u64 = 0;
100pub const FSCRYPT_KEY_ID: u64 = 1;
101
102/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
103/// owner is required.
104pub const NO_OWNER: Weak<()> = Weak::new();
105impl StoreOwner for () {}
106
107#[async_trait]
108pub trait StoreOwner: Send + Sync {
109    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
110    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
111    /// be called when the store is not in use.
112    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
113        Err(anyhow!(FxfsError::Internal))
114    }
115}
116
117/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
118/// back to an ObjectStore.
119pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
120
121/// StoreInfo stores information about the object store.  This is stored within the parent object
122/// store, and is used, for example, to get the persistent layer objects.
123pub type StoreInfo = StoreInfoV40;
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
126pub struct StoreInfoV40 {
127    /// The globally unique identifier for the associated object store. If unset, will be all zero.
128    guid: [u8; 16],
129
130    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
131    /// last_object_id field is the one to use in that case.  Technically, this might not be the
132    /// last object ID used for the latest transaction that created an object because we use this at
133    /// the point of creating the object but before we commit the transaction.  Transactions can
134    /// then get committed in an arbitrary order (or not at all).
135    last_object_id: u64,
136
137    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
138    /// so we can support snapshots.
139    pub layers: Vec<u64>,
140
141    /// The object ID for the root directory.
142    root_directory_object_id: u64,
143
144    /// The object ID for the graveyard.
145    graveyard_directory_object_id: u64,
146
147    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
148    /// due to filesystem inconsistencies.
149    object_count: u64,
150
151    /// The (wrapped) key that encrypted mutations should use.
152    mutations_key: Option<WrappedKeyV40>,
153
154    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
155    /// need to know the offset in the cipher stream to start it.
156    mutations_cipher_offset: u64,
157
158    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
159    /// mutations to an object. This is the object ID of that file if it exists.
160    pub encrypted_mutations_object_id: u64,
161
162    /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
163    /// reveal (such as the number of files in the system and the ordering of their creation in
164    /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
165    /// increment after 2^32 object IDs have been used and this allows us to roll the key.
166    object_id_key: Option<WrappedKeyV40>,
167
168    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
169    /// when the directory doesn't yet exist.
170    internal_directory_object_id: u64,
171}
172
173#[derive(Default, Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
174#[migrate_to_version(StoreInfoV40)]
175pub struct StoreInfoV36 {
176    guid: [u8; 16],
177    last_object_id: u64,
178    pub layers: Vec<u64>,
179    root_directory_object_id: u64,
180    graveyard_directory_object_id: u64,
181    object_count: u64,
182    mutations_key: Option<WrappedKeyV32>,
183    mutations_cipher_offset: u64,
184    pub encrypted_mutations_object_id: u64,
185    object_id_key: Option<WrappedKeyV32>,
186    internal_directory_object_id: u64,
187}
188
189#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
190#[migrate_to_version(StoreInfoV36)]
191pub struct StoreInfoV32 {
192    guid: [u8; 16],
193    last_object_id: u64,
194    pub layers: Vec<u64>,
195    root_directory_object_id: u64,
196    graveyard_directory_object_id: u64,
197    object_count: u64,
198    mutations_key: Option<WrappedKeyV32>,
199    mutations_cipher_offset: u64,
200    pub encrypted_mutations_object_id: u64,
201    object_id_key: Option<WrappedKeyV32>,
202}
203
204impl StoreInfo {
205    /// Create a new/default [`StoreInfo`] but with a newly generated GUID.
206    fn new_with_guid() -> Self {
207        let guid = Uuid::new_v4();
208        Self { guid: *guid.as_bytes(), ..Default::default() }
209    }
210
211    /// Returns the parent objects for this store.
212    pub fn parent_objects(&self) -> Vec<u64> {
213        // We should not include the ID of the store itself, since that should be referred to in the
214        // volume directory.
215        let mut objects = self.layers.to_vec();
216        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
217            objects.push(self.encrypted_mutations_object_id);
218        }
219        objects
220    }
221}
222
223// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
224// It will likely involve placing limits on the maximum number of layers.
225pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
226
227// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
228// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
229// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
230pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
231
232#[derive(Default)]
233pub struct HandleOptions {
234    /// If true, transactions used by this handle will skip journal space checks.
235    pub skip_journal_checks: bool,
236    /// If true, data written to any attribute of this handle will not have per-block checksums
237    /// computed.
238    pub skip_checksums: bool,
239}
240
241/// Parameters for encrypting a newly created object.
242struct ObjectEncryptionOptions {
243    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
244    /// This is necessary when keys are managed by another store; for example, the layer files
245    /// of a child store are objects in the root store, but they are encrypted with keys from the
246    /// child store.  Generally, most objects should have this set to `false`.
247    permanent: bool,
248    key_id: u64,
249    key: WrappedKey,
250    unwrapped_key: UnwrappedKey,
251}
252
253pub struct NewChildStoreOptions {
254    /// The owner of the store.
255    pub owner: Weak<dyn StoreOwner>,
256
257    /// The store is unencrypted if store is none.
258    pub crypt: Option<Arc<dyn Crypt>>,
259
260    /// Specifies the object ID in the root store to be used for the store.  If set to
261    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
262    pub object_id: u64,
263}
264
265impl Default for NewChildStoreOptions {
266    fn default() -> Self {
267        Self { owner: NO_OWNER, crypt: None, object_id: INVALID_OBJECT_ID }
268    }
269}
270
271pub type EncryptedMutations = EncryptedMutationsV40;
272
273#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
274pub struct EncryptedMutationsV40 {
275    // Information about the mutations are held here, but the actual encrypted data is held within
276    // data.  For each transaction, we record the checkpoint and the count of mutations within the
277    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
278    // mutations), and the version so that we can correctly decode the mutation after it has been
279    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
280    transactions: Vec<(JournalCheckpoint, u64)>,
281
282    // The encrypted mutations.
283    data: Vec<u8>,
284
285    // If the mutations key was rolled, this holds the offset in `data` where the new key should
286    // apply.
287    mutations_key_roll: Vec<(usize, WrappedKeyV40)>,
288}
289
290impl std::fmt::Debug for EncryptedMutations {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
292        f.debug_struct("EncryptedMutations")
293            .field("transactions", &self.transactions)
294            .field("len", &self.data.len())
295            .field(
296                "mutations_key_roll",
297                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
298            )
299            .finish()
300    }
301}
302
303impl Versioned for EncryptedMutations {
304    fn max_serialized_size() -> u64 {
305        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
306    }
307}
308
309impl EncryptedMutations {
310    fn from_replayed_mutations(
311        store_object_id: u64,
312        transactions: Vec<JournaledTransaction>,
313    ) -> Self {
314        let mut this = Self::default();
315        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
316            for (object_id, mutation) in non_root_mutations {
317                if store_object_id == object_id {
318                    if let Mutation::EncryptedObjectStore(data) = mutation {
319                        this.push(&checkpoint, data);
320                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
321                        this.mutations_key_roll.push((this.data.len(), key.into()));
322                    }
323                }
324            }
325        }
326        this
327    }
328
329    fn extend(&mut self, other: &EncryptedMutations) {
330        self.transactions.extend_from_slice(&other.transactions[..]);
331        self.mutations_key_roll.extend(
332            other
333                .mutations_key_roll
334                .iter()
335                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
336        );
337        self.data.extend_from_slice(&other.data[..]);
338    }
339
340    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
341        self.data.append(&mut data.into());
342        // If the checkpoint is the same as the last mutation we pushed, increment the count.
343        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
344            if last_checkpoint.file_offset == checkpoint.file_offset {
345                *count += 1;
346                return;
347            }
348        }
349        self.transactions.push((checkpoint.clone(), 1));
350    }
351}
352
353pub enum LockState {
354    Locked,
355    Unencrypted,
356    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
357
358    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
359    // performed on the store.
360    UnlockedReadOnly(Arc<dyn Crypt>),
361
362    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
363    // after locking the store).  The store cannot be unlocked.
364    Invalid,
365
366    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
367    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
368    Unknown,
369
370    // The store is in the process of being locked.  Whilst the store is being locked, the store
371    // isn't usable; assertions will trip if any mutations are applied.
372    Locking,
373
374    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
375    // it's in the Unlocked state.
376    Unlocking,
377}
378
379impl LockState {
380    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
381        if let Self::Unlocked { owner, .. } = self {
382            owner.upgrade()
383        } else {
384            None
385        }
386    }
387}
388
389impl fmt::Debug for LockState {
390    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
391        formatter.write_str(match self {
392            LockState::Locked => "Locked",
393            LockState::Unencrypted => "Unencrypted",
394            LockState::Unlocked { .. } => "Unlocked",
395            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
396            LockState::Invalid => "Invalid",
397            LockState::Unknown => "Unknown",
398            LockState::Locking => "Locking",
399            LockState::Unlocking => "Unlocking",
400        })
401    }
402}
403
404#[derive(Default)]
405struct LastObjectId {
406    // The *unencrypted* value of the last object ID.
407    id: u64,
408
409    // Encrypted stores will use a cipher to obfuscate the object ID.
410    cipher: Option<Ff1>,
411}
412
413impl LastObjectId {
414    // Returns true if a cipher is needed to generate new object IDs.
415    fn should_create_cipher(&self) -> bool {
416        self.id as u32 == u32::MAX
417    }
418
419    fn get_next_object_id(&mut self) -> u64 {
420        self.id += 1;
421        if let Some(cipher) = &self.cipher {
422            let hi = self.id & OBJECT_ID_HI_MASK;
423            assert_ne!(hi, INVALID_OBJECT_ID);
424            assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
425            hi | cipher.encrypt(self.id as u32) as u64
426        } else {
427            self.id
428        }
429    }
430}
431
432/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
433/// identifier.  And object store has to be backed by a parent object store (which stores metadata
434/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
435/// in-memory only.
436pub struct ObjectStore {
437    parent_store: Option<Arc<ObjectStore>>,
438    store_object_id: u64,
439    device: Arc<dyn Device>,
440    block_size: u64,
441    filesystem: Weak<FxFilesystem>,
442    // Lock ordering: This must be taken before `lock_state`.
443    store_info: Mutex<Option<StoreInfo>>,
444    tree: LSMTree<ObjectKey, ObjectValue>,
445
446    // When replaying the journal, the store cannot read StoreInfo until the whole journal
447    // has been replayed, so during that time, store_info_handle will be None and records
448    // just get sent to the tree. Once the journal has been replayed, we can open the store
449    // and load all the other layer information.
450    store_info_handle: OnceCell<DataObjectHandle<ObjectStore>>,
451
452    // The cipher to use for encrypted mutations, if this store is encrypted.
453    mutations_cipher: Mutex<Option<StreamCipher>>,
454
455    // Current lock state of the store.
456    // Lock ordering: This must be taken after `store_info`.
457    lock_state: Mutex<LockState>,
458    key_manager: KeyManager,
459
460    // Enable/disable tracing.
461    trace: AtomicBool,
462
463    // Informational counters for events occurring within the store.
464    counters: Mutex<ObjectStoreCounters>,
465
466    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
467    device_read_ops: AtomicU64,
468    device_write_ops: AtomicU64,
469    logical_read_ops: AtomicU64,
470    logical_write_ops: AtomicU64,
471
472    // Contains the last object ID and, optionally, a cipher to be used when generating new object
473    // IDs.
474    last_object_id: Mutex<LastObjectId>,
475
476    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
477    // invoked at the end of flush, while the write lock is still held.
478    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
479}
480
481#[derive(Clone, Default)]
482struct ObjectStoreCounters {
483    mutations_applied: u64,
484    mutations_dropped: u64,
485    num_flushes: u64,
486    last_flush_time: Option<std::time::SystemTime>,
487    persistent_layer_file_sizes: Vec<u64>,
488}
489
490impl ObjectStore {
491    fn new(
492        parent_store: Option<Arc<ObjectStore>>,
493        store_object_id: u64,
494        filesystem: Arc<FxFilesystem>,
495        store_info: Option<StoreInfo>,
496        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
497        mutations_cipher: Option<StreamCipher>,
498        lock_state: LockState,
499        last_object_id: LastObjectId,
500    ) -> Arc<ObjectStore> {
501        let device = filesystem.device();
502        let block_size = filesystem.block_size();
503        Arc::new(ObjectStore {
504            parent_store,
505            store_object_id,
506            device,
507            block_size,
508            filesystem: Arc::downgrade(&filesystem),
509            store_info: Mutex::new(store_info),
510            tree: LSMTree::new(merge::merge, object_cache),
511            store_info_handle: OnceCell::new(),
512            mutations_cipher: Mutex::new(mutations_cipher),
513            lock_state: Mutex::new(lock_state),
514            key_manager: KeyManager::new(),
515            trace: AtomicBool::new(false),
516            counters: Mutex::new(ObjectStoreCounters::default()),
517            device_read_ops: AtomicU64::new(0),
518            device_write_ops: AtomicU64::new(0),
519            logical_read_ops: AtomicU64::new(0),
520            logical_write_ops: AtomicU64::new(0),
521            last_object_id: Mutex::new(last_object_id),
522            flush_callback: Mutex::new(None),
523        })
524    }
525
526    fn new_empty(
527        parent_store: Option<Arc<ObjectStore>>,
528        store_object_id: u64,
529        filesystem: Arc<FxFilesystem>,
530        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
531    ) -> Arc<Self> {
532        Self::new(
533            parent_store,
534            store_object_id,
535            filesystem,
536            Some(StoreInfo::default()),
537            object_cache,
538            None,
539            LockState::Unencrypted,
540            LastObjectId::default(),
541        )
542    }
543
544    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
545    /// This should only be used from super block code.
546    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
547        ObjectStore {
548            parent_store: None,
549            store_object_id,
550            device,
551            block_size,
552            filesystem: Weak::<FxFilesystem>::new(),
553            store_info: Mutex::new(Some(StoreInfo::default())),
554            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
555            store_info_handle: OnceCell::new(),
556            mutations_cipher: Mutex::new(None),
557            lock_state: Mutex::new(LockState::Unencrypted),
558            key_manager: KeyManager::new(),
559            trace: AtomicBool::new(false),
560            counters: Mutex::new(ObjectStoreCounters::default()),
561            device_read_ops: AtomicU64::new(0),
562            device_write_ops: AtomicU64::new(0),
563            logical_read_ops: AtomicU64::new(0),
564            logical_write_ops: AtomicU64::new(0),
565            last_object_id: Mutex::new(LastObjectId::default()),
566            flush_callback: Mutex::new(None),
567        }
568    }
569
570    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
571    /// been created.
572    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
573        this.filesystem = Arc::downgrade(&filesystem);
574        this
575    }
576
577    /// Create a child store. It is a multi-step process:
578    ///
579    ///   1. Call `ObjectStore::new_child_store`.
580    ///   2. Register the store with the object-manager.
581    ///   3. Call `ObjectStore::create` to write the store-info.
582    ///
583    /// If the procedure fails, care must be taken to unregister store with the object-manager.
584    ///
585    /// The steps have to be separate because of lifetime issues when working with a transaction.
586    async fn new_child_store(
587        self: &Arc<Self>,
588        transaction: &mut Transaction<'_>,
589        options: NewChildStoreOptions,
590        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
591    ) -> Result<Arc<Self>, Error> {
592        let handle = if options.object_id != INVALID_OBJECT_ID {
593            let handle = ObjectStore::create_object_with_id(
594                self,
595                transaction,
596                options.object_id,
597                HandleOptions::default(),
598                None,
599            )
600            .await?;
601            self.update_last_object_id(options.object_id);
602            handle
603        } else {
604            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
605        };
606        let filesystem = self.filesystem();
607        let store = if let Some(crypt) = options.crypt {
608            let (wrapped_key, unwrapped_key) =
609                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
610            let (object_id_wrapped, object_id_unwrapped) =
611                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
612            Self::new(
613                Some(self.clone()),
614                handle.object_id(),
615                filesystem.clone(),
616                Some(StoreInfo {
617                    mutations_key: Some(wrapped_key),
618                    object_id_key: Some(object_id_wrapped),
619                    ..StoreInfo::new_with_guid()
620                }),
621                object_cache,
622                Some(StreamCipher::new(&unwrapped_key, 0)),
623                LockState::Unlocked { owner: options.owner, crypt },
624                LastObjectId {
625                    // We need to avoid accidentally getting INVALID_OBJECT_ID, so we set
626                    // the top 32 bits to a non-zero value.
627                    id: 1 << 32,
628                    cipher: Some(Ff1::new(&object_id_unwrapped)),
629                },
630            )
631        } else {
632            Self::new(
633                Some(self.clone()),
634                handle.object_id(),
635                filesystem.clone(),
636                Some(StoreInfo::new_with_guid()),
637                object_cache,
638                None,
639                LockState::Unencrypted,
640                LastObjectId::default(),
641            )
642        };
643        assert!(store.store_info_handle.set(handle).is_ok());
644        Ok(store)
645    }
646
647    /// Actually creates the store in a transaction.  This will also create a root directory and
648    /// graveyard directory for the store.  See `new_child_store` above.
649    async fn create<'a>(
650        self: &'a Arc<Self>,
651        transaction: &mut Transaction<'a>,
652    ) -> Result<(), Error> {
653        let buf = {
654            // Create a root directory and graveyard directory.
655            let graveyard_directory_object_id = Graveyard::create(transaction, &self);
656            let root_directory = Directory::create(transaction, &self, None).await?;
657
658            let serialized_info = {
659                let mut store_info = self.store_info.lock();
660                let store_info = store_info.as_mut().unwrap();
661
662                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
663                store_info.root_directory_object_id = root_directory.object_id();
664
665                let mut serialized_info = Vec::new();
666                store_info.serialize_with_version(&mut serialized_info)?;
667                serialized_info
668            };
669            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
670            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
671            buf
672        };
673
674        if self.filesystem().options().image_builder_mode {
675            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
676            // asked to. New object stores will have their StoreInfo written when we compact in
677            // FxFilesystem::finalize().
678            Ok(())
679        } else {
680            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
681        }
682    }
683
684    pub fn set_trace(&self, trace: bool) {
685        let old_value = self.trace.swap(trace, Ordering::Relaxed);
686        if trace != old_value {
687            info!(store_id = self.store_object_id(), trace; "OS: trace",);
688        }
689    }
690
691    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
692    /// the end of flush, while the write lock is still held.
693    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
694        let mut flush_callback = self.flush_callback.lock();
695        *flush_callback = Some(Box::new(callback));
696    }
697
698    pub fn is_root(&self) -> bool {
699        if let Some(parent) = &self.parent_store {
700            parent.parent_store.is_none()
701        } else {
702            // The root parent store isn't the root store.
703            false
704        }
705    }
706
707    /// Populates an inspect node with store statistics.
708    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
709        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
710        let counters = self.counters.lock();
711        if let Some(store_info) = self.store_info() {
712            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
713        } else {
714            warn!("Can't access store_info; store is locked.");
715        };
716        root.record_uint("store_object_id", self.store_object_id);
717        root.record_uint("mutations_applied", counters.mutations_applied);
718        root.record_uint("mutations_dropped", counters.mutations_dropped);
719        root.record_uint("num_flushes", counters.num_flushes);
720        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
721            root.record_uint(
722                "last_flush_time_ms",
723                last_flush_time
724                    .duration_since(std::time::UNIX_EPOCH)
725                    .unwrap_or(std::time::Duration::ZERO)
726                    .as_millis()
727                    .try_into()
728                    .unwrap_or(0u64),
729            );
730        }
731        let sizes = root.create_uint_array(
732            "persistent_layer_file_sizes",
733            counters.persistent_layer_file_sizes.len(),
734        );
735        for i in 0..counters.persistent_layer_file_sizes.len() {
736            sizes.set(i, counters.persistent_layer_file_sizes[i]);
737        }
738        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
739        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
740        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
741        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
742
743        root.record(sizes);
744
745        let this = self.clone();
746        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
747    }
748
749    pub fn device(&self) -> &Arc<dyn Device> {
750        &self.device
751    }
752
753    pub fn block_size(&self) -> u64 {
754        self.block_size
755    }
756
757    pub fn filesystem(&self) -> Arc<FxFilesystem> {
758        self.filesystem.upgrade().unwrap()
759    }
760
761    pub fn store_object_id(&self) -> u64 {
762        self.store_object_id
763    }
764
765    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
766        &self.tree
767    }
768
769    pub fn root_directory_object_id(&self) -> u64 {
770        self.store_info.lock().as_ref().unwrap().root_directory_object_id
771    }
772
773    pub fn graveyard_directory_object_id(&self) -> u64 {
774        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
775    }
776
777    fn set_graveyard_directory_object_id(&self, oid: u64) {
778        assert_eq!(
779            std::mem::replace(
780                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
781                oid
782            ),
783            INVALID_OBJECT_ID
784        );
785    }
786
787    pub fn object_count(&self) -> u64 {
788        self.store_info.lock().as_ref().unwrap().object_count
789    }
790
791    pub fn key_manager(&self) -> &KeyManager {
792        &self.key_manager
793    }
794
795    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
796        self.parent_store.as_ref()
797    }
798
799    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
800    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
801        match &*self.lock_state.lock() {
802            LockState::Locked => panic!("Store is locked"),
803            LockState::Invalid
804            | LockState::Unencrypted
805            | LockState::Locking
806            | LockState::Unlocking => None,
807            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
808            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
809            LockState::Unknown => {
810                panic!("Store is of unknown lock state; has the journal been replayed yet?")
811            }
812        }
813    }
814
815    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
816        // Create the transaction first to use the object store lock.
817        let mut transaction = self
818            .filesystem()
819            .new_transaction(
820                lock_keys![LockKey::object(
821                    self.parent_store.as_ref().unwrap().store_object_id,
822                    self.store_object_id,
823                )],
824                Options::default(),
825            )
826            .await?;
827        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
828        if obj_id != INVALID_OBJECT_ID {
829            return Ok(obj_id);
830        }
831
832        // Need to create an internal directory.
833        let directory = Directory::create(&mut transaction, self, None).await?;
834
835        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
836        transaction.commit().await?;
837        Ok(directory.object_id())
838    }
839
840    /// Returns the file size for the object without opening the object.
841    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
842        let item = self
843            .tree
844            .find(&ObjectKey::attribute(
845                object_id,
846                DEFAULT_DATA_ATTRIBUTE_ID,
847                AttributeKey::Attribute,
848            ))
849            .await?
850            .ok_or(FxfsError::NotFound)?;
851        if let ObjectValue::Attribute { size, .. } = item.value {
852            Ok(size)
853        } else {
854            bail!(FxfsError::NotFile);
855        }
856    }
857
858    /// `crypt` can be provided if the crypt service should be different to the default; see the
859    /// comment on create_object.  Users should avoid having more than one handle open for the same
860    /// object at the same time because they might get out-of-sync; there is no code that will
861    /// prevent this.  One example where this can cause an issue is if the object ends up using a
862    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
863    /// dropped when a handle is dropped, which will impact any other handles for the same object.
864    pub async fn open_object<S: HandleOwner>(
865        owner: &Arc<S>,
866        obj_id: u64,
867        options: HandleOptions,
868        crypt: Option<Arc<dyn Crypt>>,
869    ) -> Result<DataObjectHandle<S>, Error> {
870        let store = owner.as_ref().as_ref();
871        let mut fsverity_descriptor = None;
872        let mut overwrite_ranges = Vec::new();
873        let item = store
874            .tree
875            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
876            .await?
877            .ok_or(FxfsError::NotFound)?;
878
879        let (size, track_overwrite_extents) = match item.value {
880            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
881            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
882                fsverity_descriptor = Some(fsverity_metadata);
883                // We only track the overwrite extents in memory for writes, reads handle them
884                // implicitly, which means verified files (where the data won't change anymore)
885                // don't need to track them.
886                (size, false)
887            }
888            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
889        };
890
891        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
892
893        if track_overwrite_extents {
894            let layer_set = store.tree.layer_set();
895            let mut merger = layer_set.merger();
896            let mut iter = merger
897                .query(Query::FullRange(&ObjectKey::attribute(
898                    obj_id,
899                    DEFAULT_DATA_ATTRIBUTE_ID,
900                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
901                )))
902                .await?;
903            loop {
904                match iter.get() {
905                    Some(ItemRef {
906                        key:
907                            ObjectKey {
908                                object_id,
909                                data:
910                                    ObjectKeyData::Attribute(
911                                        attribute_id,
912                                        AttributeKey::Extent(ExtentKey { range }),
913                                    ),
914                            },
915                        value,
916                        ..
917                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
918                        match value {
919                            ObjectValue::Extent(ExtentValue::None)
920                            | ObjectValue::Extent(ExtentValue::Some {
921                                mode: ExtentMode::Raw,
922                                ..
923                            })
924                            | ObjectValue::Extent(ExtentValue::Some {
925                                mode: ExtentMode::Cow(_),
926                                ..
927                            }) => (),
928                            ObjectValue::Extent(ExtentValue::Some {
929                                mode: ExtentMode::OverwritePartial(_),
930                                ..
931                            })
932                            | ObjectValue::Extent(ExtentValue::Some {
933                                mode: ExtentMode::Overwrite,
934                                ..
935                            }) => overwrite_ranges.push(range.clone()),
936                            _ => bail!(anyhow!(FxfsError::Inconsistent)
937                                .context("open_object: Expected extent")),
938                        }
939                        iter.advance().await?;
940                    }
941                    _ => break,
942                }
943            }
944        }
945
946        // If a crypt service has been specified, it needs to be a permanent key because cached
947        // keys can only use the store's crypt service.
948        let permanent = if let Some(crypt) = crypt {
949            store
950                .key_manager
951                .get_keys(
952                    obj_id,
953                    crypt.as_ref(),
954                    &mut Some(async || store.get_keys(obj_id).await),
955                    /* permanent= */ true,
956                    /* force= */ false,
957                )
958                .await?;
959            true
960        } else {
961            false
962        };
963        let data_object_handle = DataObjectHandle::new(
964            owner.clone(),
965            obj_id,
966            permanent,
967            DEFAULT_DATA_ATTRIBUTE_ID,
968            size,
969            FsverityState::None,
970            options,
971            false,
972            overwrite_ranges,
973        );
974        if let Some(descriptor) = fsverity_descriptor {
975            match data_object_handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await? {
976                None => {
977                    return Err(anyhow!(FxfsError::NotFound));
978                }
979                Some(data) => {
980                    data_object_handle.set_fsverity_state_some(descriptor, data);
981                }
982            }
983        }
984        Ok(data_object_handle)
985    }
986
987    async fn create_object_with_id<S: HandleOwner>(
988        owner: &Arc<S>,
989        transaction: &mut Transaction<'_>,
990        object_id: u64,
991        options: HandleOptions,
992        encryption_options: Option<ObjectEncryptionOptions>,
993    ) -> Result<DataObjectHandle<S>, Error> {
994        debug_assert!(object_id != INVALID_OBJECT_ID);
995        let store = owner.as_ref().as_ref();
996        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
997        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
998        let now = Timestamp::now();
999        transaction.add(
1000            store.store_object_id(),
1001            Mutation::insert_object(
1002                ObjectKey::object(object_id),
1003                ObjectValue::file(1, 0, now.clone(), now.clone(), now.clone(), now, 0, None),
1004            ),
1005        );
1006        let mut permanent_keys = false;
1007        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1008            encryption_options
1009        {
1010            permanent_keys = permanent;
1011            transaction.add(
1012                store.store_object_id(),
1013                Mutation::insert_object(
1014                    ObjectKey::keys(object_id),
1015                    ObjectValue::keys(EncryptionKeys::AES256XTS(WrappedKeys::from(vec![(
1016                        key_id, key,
1017                    )]))),
1018                ),
1019            );
1020            store.key_manager.insert(object_id, &vec![(key_id, Some(unwrapped_key))], permanent);
1021        }
1022        transaction.add(
1023            store.store_object_id(),
1024            Mutation::insert_object(
1025                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1026                // This is a new object so nothing has pre-allocated overwrite extents yet.
1027                ObjectValue::attribute(0, false),
1028            ),
1029        );
1030        Ok(DataObjectHandle::new(
1031            owner.clone(),
1032            object_id,
1033            permanent_keys,
1034            DEFAULT_DATA_ATTRIBUTE_ID,
1035            0,
1036            FsverityState::None,
1037            options,
1038            false,
1039            Vec::new(),
1040        ))
1041    }
1042
1043    /// Creates an object in the store.
1044    ///
1045    /// If the store is encrypted, the object will be automatically encrypted as well.
1046    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1047    /// otherwise the default data key is used.
1048    pub async fn create_object<S: HandleOwner>(
1049        owner: &Arc<S>,
1050        mut transaction: &mut Transaction<'_>,
1051        options: HandleOptions,
1052        wrapping_key_id: Option<u128>,
1053    ) -> Result<DataObjectHandle<S>, Error> {
1054        let store = owner.as_ref().as_ref();
1055        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1056        let crypt = store.crypt();
1057        let encryption_options = if let Some(crypt) = crypt {
1058            let key_id =
1059                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1060            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1061                crypt.create_key_with_id(object_id, wrapping_key_id).await?
1062            } else {
1063                crypt.create_key(object_id, KeyPurpose::Data).await?
1064            };
1065            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1066        } else {
1067            None
1068        };
1069        ObjectStore::create_object_with_id(
1070            owner,
1071            &mut transaction,
1072            object_id,
1073            options,
1074            encryption_options,
1075        )
1076        .await
1077    }
1078
1079    /// Creates an object using explicitly provided keys.
1080    ///
1081    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1082    /// For example, when layer files for a child store are created in the root store, but they must
1083    /// be encrypted using the child store's keys.  This method exists for that purpose.
1084    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1085        owner: &Arc<S>,
1086        mut transaction: &mut Transaction<'_>,
1087        object_id: u64,
1088        options: HandleOptions,
1089        key: WrappedKey,
1090        unwrapped_key: UnwrappedKey,
1091    ) -> Result<DataObjectHandle<S>, Error> {
1092        ObjectStore::create_object_with_id(
1093            owner,
1094            &mut transaction,
1095            object_id,
1096            options,
1097            Some(ObjectEncryptionOptions {
1098                permanent: true,
1099                key_id: VOLUME_DATA_KEY_ID,
1100                key,
1101                unwrapped_key,
1102            }),
1103        )
1104        .await
1105    }
1106
1107    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1108    /// object is moved into the graveyard and true is returned.
1109    pub async fn adjust_refs(
1110        &self,
1111        transaction: &mut Transaction<'_>,
1112        oid: u64,
1113        delta: i64,
1114    ) -> Result<bool, Error> {
1115        let mut mutation = self.txn_get_object_mutation(transaction, oid).await?;
1116        let refs = if let ObjectValue::Object {
1117            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1118            ..
1119        } = &mut mutation.item.value
1120        {
1121            *refs =
1122                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1123            refs
1124        } else {
1125            bail!(FxfsError::NotFile);
1126        };
1127        if *refs == 0 {
1128            self.add_to_graveyard(transaction, oid);
1129
1130            // We might still need to adjust the reference count if delta was something other than
1131            // -1.
1132            if delta != -1 {
1133                *refs = 1;
1134                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1135            }
1136            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1137            // objects in graveyard.
1138            Ok(true)
1139        } else {
1140            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1141            Ok(false)
1142        }
1143    }
1144
1145    // Purges an object that is in the graveyard.
1146    pub async fn tombstone_object(
1147        &self,
1148        object_id: u64,
1149        txn_options: Options<'_>,
1150    ) -> Result<(), Error> {
1151        self.key_manager.remove(object_id).await;
1152        self.trim_or_tombstone(object_id, true, txn_options).await
1153    }
1154
1155    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1156    /// the graveyard when done.
1157    pub async fn trim(&self, object_id: u64) -> Result<(), Error> {
1158        // For the root and root parent store, we would need to use the metadata reservation which
1159        // we don't currently support, so assert that we're not those stores.
1160        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1161
1162        self.trim_or_tombstone(
1163            object_id,
1164            false,
1165            Options { borrow_metadata_space: true, ..Default::default() },
1166        )
1167        .await
1168    }
1169
1170    async fn trim_or_tombstone(
1171        &self,
1172        object_id: u64,
1173        for_tombstone: bool,
1174        txn_options: Options<'_>,
1175    ) -> Result<(), Error> {
1176        let fs = self.filesystem();
1177        let mut next_attribute = Some(0);
1178        while let Some(attribute_id) = next_attribute.take() {
1179            let mut transaction = fs
1180                .clone()
1181                .new_transaction(
1182                    lock_keys![
1183                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1184                        LockKey::object(self.store_object_id, object_id),
1185                    ],
1186                    txn_options,
1187                )
1188                .await?;
1189
1190            match self
1191                .trim_some(
1192                    &mut transaction,
1193                    object_id,
1194                    attribute_id,
1195                    if for_tombstone {
1196                        TrimMode::Tombstone(TombstoneMode::Object)
1197                    } else {
1198                        TrimMode::UseSize
1199                    },
1200                )
1201                .await?
1202            {
1203                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1204                TrimResult::Done(None) => {
1205                    if for_tombstone
1206                        || matches!(
1207                            self.tree
1208                                .find(&ObjectKey::graveyard_entry(
1209                                    self.graveyard_directory_object_id(),
1210                                    object_id,
1211                                ))
1212                                .await?,
1213                            Some(Item { value: ObjectValue::Trim, .. })
1214                        )
1215                    {
1216                        self.remove_from_graveyard(&mut transaction, object_id);
1217                    }
1218                }
1219                TrimResult::Done(id) => next_attribute = id,
1220            }
1221
1222            if !transaction.mutations().is_empty() {
1223                transaction.commit().await?;
1224            }
1225        }
1226        Ok(())
1227    }
1228
1229    // Purges an object's attribute that is in the graveyard.
1230    pub async fn tombstone_attribute(
1231        &self,
1232        object_id: u64,
1233        attribute_id: u64,
1234        txn_options: Options<'_>,
1235    ) -> Result<(), Error> {
1236        let fs = self.filesystem();
1237        let mut trim_result = TrimResult::Incomplete;
1238        while matches!(trim_result, TrimResult::Incomplete) {
1239            let mut transaction = fs
1240                .clone()
1241                .new_transaction(
1242                    lock_keys![
1243                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1244                        LockKey::object(self.store_object_id, object_id),
1245                    ],
1246                    txn_options,
1247                )
1248                .await?;
1249            trim_result = self
1250                .trim_some(
1251                    &mut transaction,
1252                    object_id,
1253                    attribute_id,
1254                    TrimMode::Tombstone(TombstoneMode::Attribute),
1255                )
1256                .await?;
1257            if let TrimResult::Done(..) = trim_result {
1258                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1259            }
1260            if !transaction.mutations().is_empty() {
1261                transaction.commit().await?;
1262            }
1263        }
1264        Ok(())
1265    }
1266
1267    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1268    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1269    /// performs a read-modify-write on the sizes.
1270    pub async fn trim_some(
1271        &self,
1272        transaction: &mut Transaction<'_>,
1273        object_id: u64,
1274        attribute_id: u64,
1275        mode: TrimMode,
1276    ) -> Result<TrimResult, Error> {
1277        let layer_set = self.tree.layer_set();
1278        let mut merger = layer_set.merger();
1279
1280        let aligned_offset = match mode {
1281            TrimMode::FromOffset(offset) => {
1282                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1283            }
1284            TrimMode::Tombstone(..) => 0,
1285            TrimMode::UseSize => {
1286                let iter = merger
1287                    .query(Query::FullRange(&ObjectKey::attribute(
1288                        object_id,
1289                        attribute_id,
1290                        AttributeKey::Attribute,
1291                    )))
1292                    .await?;
1293                if let Some(item_ref) = iter.get() {
1294                    if item_ref.key.object_id != object_id {
1295                        return Ok(TrimResult::Done(None));
1296                    }
1297
1298                    if let ItemRef {
1299                        key:
1300                            ObjectKey {
1301                                data:
1302                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1303                                ..
1304                            },
1305                        value: ObjectValue::Attribute { size, .. },
1306                        ..
1307                    } = item_ref
1308                    {
1309                        // If we found a different attribute_id, return so we can get the
1310                        // right lock.
1311                        if *size_attribute_id != attribute_id {
1312                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1313                        }
1314                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1315                    } else {
1316                        // At time of writing, we should always see a size record or None here, but
1317                        // asserting here would be brittle so just skip to the the next attribute
1318                        // instead.
1319                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1320                    }
1321                } else {
1322                    // End of the tree.
1323                    return Ok(TrimResult::Done(None));
1324                }
1325            }
1326        };
1327
1328        // Loop over the extents and deallocate them.
1329        let mut iter = merger
1330            .query(Query::FullRange(&ObjectKey::from_extent(
1331                object_id,
1332                attribute_id,
1333                ExtentKey::search_key_from_offset(aligned_offset),
1334            )))
1335            .await?;
1336        let mut end = 0;
1337        let allocator = self.allocator();
1338        let mut result = TrimResult::Done(None);
1339        let mut deallocated = 0;
1340        let block_size = self.block_size;
1341
1342        while let Some(item_ref) = iter.get() {
1343            if item_ref.key.object_id != object_id {
1344                break;
1345            }
1346            if let ObjectKey {
1347                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1348                ..
1349            } = item_ref.key
1350            {
1351                if *extent_attribute_id != attribute_id {
1352                    result = TrimResult::Done(Some(*extent_attribute_id));
1353                    break;
1354                }
1355                if let (
1356                    AttributeKey::Extent(ExtentKey { range }),
1357                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1358                ) = (attribute_key, item_ref.value)
1359                {
1360                    let start = std::cmp::max(range.start, aligned_offset);
1361                    ensure!(start < range.end, FxfsError::Inconsistent);
1362                    let device_offset = device_offset
1363                        .checked_add(start - range.start)
1364                        .ok_or(FxfsError::Inconsistent)?;
1365                    end = range.end;
1366                    let len = end - start;
1367                    let device_range = device_offset..device_offset + len;
1368                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1369                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1370                    deallocated += len;
1371                    // Stop if the transaction is getting too big.
1372                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1373                        result = TrimResult::Incomplete;
1374                        break;
1375                    }
1376                }
1377            }
1378            iter.advance().await?;
1379        }
1380
1381        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1382            && matches!(result, TrimResult::Done(None));
1383        let finished_tombstone_attribute =
1384            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1385                && !matches!(result, TrimResult::Incomplete);
1386        let mut object_mutation = None;
1387        let nodes = if finished_tombstone_object { -1 } else { 0 };
1388        if nodes != 0 || deallocated != 0 {
1389            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1390            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1391                mutation.item.value
1392            {
1393                if project_id != 0 {
1394                    transaction.add(
1395                        self.store_object_id,
1396                        Mutation::merge_object(
1397                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1398                            ObjectValue::BytesAndNodes {
1399                                bytes: -i64::try_from(deallocated).unwrap(),
1400                                nodes,
1401                            },
1402                        ),
1403                    );
1404                }
1405                object_mutation = Some(mutation);
1406            } else {
1407                panic!("Inconsistent object type.");
1408            }
1409        }
1410
1411        // Deletion marker records *must* be merged so as to consume all other records for the
1412        // object.
1413        if finished_tombstone_object {
1414            transaction.add(
1415                self.store_object_id,
1416                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1417            );
1418        } else {
1419            if finished_tombstone_attribute {
1420                transaction.add(
1421                    self.store_object_id,
1422                    Mutation::merge_object(
1423                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1424                        ObjectValue::None,
1425                    ),
1426                );
1427            }
1428            if deallocated > 0 {
1429                let mut mutation = match object_mutation {
1430                    Some(mutation) => mutation,
1431                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1432                };
1433                transaction.add(
1434                    self.store_object_id,
1435                    Mutation::merge_object(
1436                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1437                        ObjectValue::deleted_extent(),
1438                    ),
1439                );
1440                // Update allocated size.
1441                if let ObjectValue::Object {
1442                    attributes: ObjectAttributes { allocated_size, .. },
1443                    ..
1444                } = &mut mutation.item.value
1445                {
1446                    // The only way for these to fail are if the volume is inconsistent.
1447                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1448                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1449                    })?;
1450                } else {
1451                    panic!("Unexpected object value");
1452                }
1453                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1454            }
1455        }
1456        Ok(result)
1457    }
1458
1459    /// Returns all objects that exist in the parent store that pertain to this object store.
1460    /// Note that this doesn't include the object_id of the store itself which is generally
1461    /// referenced externally.
1462    pub fn parent_objects(&self) -> Vec<u64> {
1463        assert!(self.store_info_handle.get().is_some());
1464        self.store_info.lock().as_ref().unwrap().parent_objects()
1465    }
1466
1467    /// Returns root objects for this store.
1468    pub fn root_objects(&self) -> Vec<u64> {
1469        let mut objects = Vec::new();
1470        let store_info = self.store_info.lock();
1471        let info = store_info.as_ref().unwrap();
1472        if info.root_directory_object_id != INVALID_OBJECT_ID {
1473            objects.push(info.root_directory_object_id);
1474        }
1475        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1476            objects.push(info.graveyard_directory_object_id);
1477        }
1478        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1479            objects.push(info.internal_directory_object_id);
1480        }
1481        objects
1482    }
1483
1484    pub fn store_info(&self) -> Option<StoreInfo> {
1485        self.store_info.lock().as_ref().cloned()
1486    }
1487
1488    /// Returns None if called during journal replay.
1489    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1490        self.store_info_handle.get().map(|h| h.object_id())
1491    }
1492
1493    /// Called to open a store, before replay of this store's mutations.
1494    async fn open(
1495        parent_store: &Arc<ObjectStore>,
1496        store_object_id: u64,
1497        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1498    ) -> Result<Arc<ObjectStore>, Error> {
1499        let handle =
1500            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1501                .await?;
1502
1503        let info = load_store_info(parent_store, store_object_id).await?;
1504        let is_encrypted = info.mutations_key.is_some();
1505
1506        let mut total_layer_size = 0;
1507        let last_object_id;
1508
1509        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1510
1511        // If the store is encrypted, we can't open the object tree layers now, but we need to
1512        // compute the size of the layers.
1513        if is_encrypted {
1514            for &oid in &info.layers {
1515                total_layer_size += parent_store.get_file_size(oid).await?;
1516            }
1517            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1518                total_layer_size += layer_size_from_encrypted_mutations_size(
1519                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1520                );
1521            }
1522            last_object_id = LastObjectId::default();
1523        } else {
1524            last_object_id = LastObjectId { id: info.last_object_id, cipher: None };
1525        }
1526
1527        let fs = parent_store.filesystem();
1528
1529        let store = ObjectStore::new(
1530            Some(parent_store.clone()),
1531            store_object_id,
1532            fs.clone(),
1533            if is_encrypted { None } else { Some(info) },
1534            object_cache,
1535            None,
1536            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1537            last_object_id,
1538        );
1539
1540        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1541
1542        if !is_encrypted {
1543            let object_tree_layer_object_ids =
1544                store.store_info.lock().as_ref().unwrap().layers.clone();
1545            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1546            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1547            store
1548                .tree
1549                .append_layers(object_layers)
1550                .await
1551                .context("Failed to read object store layers")?;
1552        }
1553
1554        fs.object_manager().update_reservation(
1555            store_object_id,
1556            tree::reservation_amount_from_layer_size(total_layer_size),
1557        );
1558
1559        Ok(store)
1560    }
1561
1562    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1563        load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
1564    }
1565
1566    async fn open_layers(
1567        &self,
1568        object_ids: impl std::iter::IntoIterator<Item = u64>,
1569        crypt: Option<Arc<dyn Crypt>>,
1570    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1571        let parent_store = self.parent_store.as_ref().unwrap();
1572        let mut handles = Vec::new();
1573        let mut sizes = Vec::new();
1574        for object_id in object_ids {
1575            let handle = ObjectStore::open_object(
1576                &parent_store,
1577                object_id,
1578                HandleOptions::default(),
1579                crypt.clone(),
1580            )
1581            .await
1582            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1583            sizes.push(handle.get_size());
1584            handles.push(handle);
1585        }
1586        self.counters.lock().persistent_layer_file_sizes = sizes;
1587        Ok(handles)
1588    }
1589
1590    /// Unlocks a store so that it is ready to be used.
1591    /// This is not thread-safe.
1592    pub async fn unlock(
1593        self: &Arc<Self>,
1594        owner: Weak<dyn StoreOwner>,
1595        crypt: Arc<dyn Crypt>,
1596    ) -> Result<(), Error> {
1597        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1598    }
1599
1600    /// Unlocks a store so that it is ready to be read from.
1601    /// The store will generally behave like it is still locked: when flushed, the store will
1602    /// write out its mutations into the encrypted mutations file, rather than directly updating
1603    /// the layer files of the object store.
1604    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1605    /// flush, although the store might still be flushed during other operations.
1606    /// This is not thread-safe.
1607    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1608        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1609    }
1610
1611    async fn unlock_inner(
1612        self: &Arc<Self>,
1613        owner: Weak<dyn StoreOwner>,
1614        crypt: Arc<dyn Crypt>,
1615        read_only: bool,
1616    ) -> Result<(), Error> {
1617        match &*self.lock_state.lock() {
1618            LockState::Locked => {}
1619            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1620            LockState::Invalid => bail!(FxfsError::Internal),
1621            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1622                bail!(FxfsError::AlreadyBound)
1623            }
1624            LockState::Unknown => panic!("Store was unlocked before replay"),
1625            LockState::Locking => panic!("Store is being locked"),
1626            LockState::Unlocking => panic!("Store is being unlocked"),
1627        }
1628        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1629        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1630        let fs = self.filesystem();
1631        let guard = fs.lock_manager().write_lock(keys).await;
1632
1633        let store_info = self.load_store_info().await?;
1634
1635        self.tree
1636            .append_layers(
1637                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1638            )
1639            .await
1640            .context("Failed to read object tree layer file contents")?;
1641
1642        let unwrapped_key = crypt
1643            .unwrap_key(store_info.mutations_key.as_ref().unwrap(), self.store_object_id)
1644            .await
1645            .context("Failed to unwrap mutations keys")?;
1646        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1647        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1648        // wraps, so we just use u32::MAX (the offset is u64).
1649        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1650        let mut mutations_cipher =
1651            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1652
1653        let wrapped_key = store_info.object_id_key.as_ref().ok_or(FxfsError::Inconsistent)?;
1654        let object_id_cipher =
1655            Ff1::new(&crypt.unwrap_key(wrapped_key, self.store_object_id).await?);
1656        {
1657            let mut last_object_id = self.last_object_id.lock();
1658            last_object_id.cipher = Some(object_id_cipher);
1659        }
1660        self.update_last_object_id(store_info.last_object_id);
1661
1662        // Apply the encrypted mutations.
1663        let mut mutations = {
1664            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1665                EncryptedMutations::default()
1666            } else {
1667                let parent_store = self.parent_store.as_ref().unwrap();
1668                let handle = ObjectStore::open_object(
1669                    &parent_store,
1670                    store_info.encrypted_mutations_object_id,
1671                    HandleOptions::default(),
1672                    None,
1673                )
1674                .await?;
1675                let mut cursor = std::io::Cursor::new(
1676                    handle
1677                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1678                        .await
1679                        .context(FxfsError::Inconsistent)?,
1680                );
1681                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1682                    .context("Failed to deserialize EncryptedMutations")?
1683                    .0;
1684                let len = cursor.get_ref().len() as u64;
1685                while cursor.position() < len {
1686                    mutations.extend(
1687                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1688                            .context("Failed to deserialize EncryptedMutations")?
1689                            .0,
1690                    );
1691                }
1692                mutations
1693            }
1694        };
1695
1696        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
1697        let journaled = EncryptedMutations::from_replayed_mutations(
1698            self.store_object_id,
1699            fs.journal()
1700                .read_transactions_for_object(self.store_object_id)
1701                .await
1702                .context("Failed to read encrypted mutations from journal")?,
1703        );
1704        mutations.extend(&journaled);
1705
1706        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
1707        *self.store_info.lock() = Some(store_info);
1708
1709        // If we fail, clean up.
1710        let clean_up = scopeguard::guard((), |_| {
1711            *self.lock_state.lock() = LockState::Locked;
1712            *self.store_info.lock() = None;
1713            // Make sure we don't leave unencrypted data lying around in memory.
1714            self.tree.reset();
1715        });
1716
1717        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
1718
1719        let mut slice = &mut data[..];
1720        let mut last_offset = 0;
1721        for (offset, key) in mutations_key_roll {
1722            let split_offset = offset
1723                .checked_sub(last_offset)
1724                .ok_or(FxfsError::Inconsistent)
1725                .context("Invalid mutation key roll offset")?;
1726            last_offset = offset;
1727            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
1728            let (old, new) = slice.split_at_mut(split_offset);
1729            mutations_cipher.decrypt(old);
1730            let unwrapped_key = crypt
1731                .unwrap_key(&key, self.store_object_id)
1732                .await
1733                .context("Failed to unwrap mutations keys")?;
1734            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
1735            slice = new;
1736        }
1737        mutations_cipher.decrypt(slice);
1738
1739        // Always roll the mutations key when we unlock which guarantees we won't reuse a
1740        // previous key and nonce.
1741        self.roll_mutations_key(crypt.as_ref()).await?;
1742
1743        let mut cursor = std::io::Cursor::new(data);
1744        for (checkpoint, count) in transactions {
1745            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
1746            for _ in 0..count {
1747                let mutation =
1748                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
1749                        .context("failed to deserialize encrypted mutation")?;
1750                self.apply_mutation(mutation, &context, AssocObj::None)
1751                    .context("failed to apply encrypted mutation")?;
1752            }
1753        }
1754
1755        *self.lock_state.lock() = if read_only {
1756            LockState::UnlockedReadOnly(crypt)
1757        } else {
1758            LockState::Unlocked { owner, crypt }
1759        };
1760
1761        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
1762        // it's possible for more writes to be queued and for the store to be locked before we can
1763        // flush anything and that can repeat.
1764        std::mem::drop(guard);
1765
1766        if !read_only && !self.filesystem().options().read_only {
1767            self.flush_with_reason(flush::Reason::Unlock).await?;
1768
1769            // Reap purged files within this store.
1770            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
1771        }
1772
1773        // Return and cancel the clean up.
1774        Ok(ScopeGuard::into_inner(clean_up))
1775    }
1776
1777    pub fn is_locked(&self) -> bool {
1778        matches!(
1779            *self.lock_state.lock(),
1780            LockState::Locked | LockState::Locking | LockState::Unknown
1781        )
1782    }
1783
1784    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
1785    /// true.
1786    pub fn is_unlocked(&self) -> bool {
1787        matches!(
1788            *self.lock_state.lock(),
1789            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
1790        )
1791    }
1792
1793    pub fn is_unknown(&self) -> bool {
1794        matches!(*self.lock_state.lock(), LockState::Unknown)
1795    }
1796
1797    pub fn is_encrypted(&self) -> bool {
1798        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
1799    }
1800
1801    // Locks a store.
1802    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
1803    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
1804    // Whilst this can return an error, the store will be placed into an unusable but safe state
1805    // (i.e. no lingering unencrypted data) if an error is encountered.
1806    pub async fn lock(&self) -> Result<(), Error> {
1807        // We must lock flushing since it is not safe for that to be happening whilst we are locking
1808        // the store.
1809        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1810        let fs = self.filesystem();
1811        let _guard = fs.lock_manager().write_lock(keys).await;
1812
1813        {
1814            let mut lock_state = self.lock_state.lock();
1815            if let LockState::Unlocked { .. } = &*lock_state {
1816                *lock_state = LockState::Locking;
1817            } else {
1818                panic!("Unexpected lock state: {:?}", &*lock_state);
1819            }
1820        }
1821
1822        // Sync the journal now to ensure that any buffered mutations for this store make it out to
1823        // disk.  This is necessary to be able to unlock the store again.
1824        // We need to establish a barrier at this point (so that the journaled writes are observable
1825        // by any future attempts to unlock the store), hence the flush_device.
1826        let sync_result =
1827            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
1828
1829        *self.lock_state.lock() = if let Err(error) = &sync_result {
1830            error!(error:?; "Failed to sync journal; store will no longer be usable");
1831            LockState::Invalid
1832        } else {
1833            LockState::Locked
1834        };
1835        self.key_manager.clear();
1836        *self.store_info.lock() = None;
1837        self.tree.reset();
1838
1839        sync_result
1840    }
1841
1842    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
1843    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
1844    // and will be replayed next time the store is unlocked.
1845    pub fn lock_read_only(&self) {
1846        *self.lock_state.lock() = LockState::Locked;
1847        *self.store_info.lock() = None;
1848        self.tree.reset();
1849    }
1850
1851    // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
1852    fn maybe_get_next_object_id(&self) -> u64 {
1853        let mut last_object_id = self.last_object_id.lock();
1854        if last_object_id.should_create_cipher() {
1855            INVALID_OBJECT_ID
1856        } else {
1857            last_object_id.get_next_object_id()
1858        }
1859    }
1860
1861    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
1862    ///
1863    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
1864    /// This transaction does not take the filesystem lock, hence `txn_guard`.
1865    pub async fn get_next_object_id(&self, txn_guard: &TxnGuard<'_>) -> Result<u64, Error> {
1866        let object_id = self.maybe_get_next_object_id();
1867        if object_id != INVALID_OBJECT_ID {
1868            return Ok(object_id);
1869        }
1870
1871        // Create a transaction (which has a lock) and then check again.
1872        let mut transaction = self
1873            .filesystem()
1874            .new_transaction(
1875                lock_keys![LockKey::object(
1876                    self.parent_store.as_ref().unwrap().store_object_id,
1877                    self.store_object_id,
1878                )],
1879                Options {
1880                    // We must skip journal checks because this transaction might be needed to
1881                    // compact.
1882                    skip_journal_checks: true,
1883                    borrow_metadata_space: true,
1884                    txn_guard: Some(txn_guard),
1885                    ..Default::default()
1886                },
1887            )
1888            .await?;
1889
1890        {
1891            let mut last_object_id = self.last_object_id.lock();
1892            if !last_object_id.should_create_cipher() {
1893                // We lost a race.
1894                return Ok(last_object_id.get_next_object_id());
1895            }
1896            // It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
1897            // happens, it's most likely due to corruption.
1898            ensure!(
1899                last_object_id.id & OBJECT_ID_HI_MASK != OBJECT_ID_HI_MASK,
1900                FxfsError::Inconsistent
1901            );
1902        }
1903
1904        // Create a key.
1905        let (object_id_wrapped, object_id_unwrapped) =
1906            self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
1907
1908        // Update StoreInfo.
1909        let buf = {
1910            let serialized_info = {
1911                let mut store_info = self.store_info.lock();
1912                let store_info = store_info.as_mut().unwrap();
1913                store_info.object_id_key = Some(object_id_wrapped);
1914                let mut serialized_info = Vec::new();
1915                store_info.serialize_with_version(&mut serialized_info)?;
1916                serialized_info
1917            };
1918            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
1919            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
1920            buf
1921        };
1922
1923        self.store_info_handle
1924            .get()
1925            .unwrap()
1926            .txn_write(&mut transaction, 0u64, buf.as_ref())
1927            .await?;
1928        transaction.commit().await?;
1929
1930        let mut last_object_id = self.last_object_id.lock();
1931        last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped));
1932        last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK;
1933
1934        Ok((last_object_id.id & OBJECT_ID_HI_MASK)
1935            | last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64)
1936    }
1937
1938    fn allocator(&self) -> Arc<Allocator> {
1939        self.filesystem().allocator()
1940    }
1941
1942    // If |transaction| has an impending mutation for the underlying object, returns that.
1943    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
1944    // mutation is returned here rather than the item because the mutation includes the operation
1945    // which has significance: inserting an object implies it's the first of its kind unlike
1946    // replacing an object.
1947    async fn txn_get_object_mutation(
1948        &self,
1949        transaction: &Transaction<'_>,
1950        object_id: u64,
1951    ) -> Result<ObjectStoreMutation, Error> {
1952        if let Some(mutation) =
1953            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
1954        {
1955            Ok(mutation.clone())
1956        } else {
1957            Ok(ObjectStoreMutation {
1958                item: self
1959                    .tree
1960                    .find(&ObjectKey::object(object_id))
1961                    .await?
1962                    .ok_or(FxfsError::Inconsistent)
1963                    .context("Object id missing")?,
1964                op: Operation::ReplaceOrInsert,
1965            })
1966        }
1967    }
1968
1969    fn update_last_object_id(&self, mut object_id: u64) {
1970        let mut last_object_id = self.last_object_id.lock();
1971        // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
1972        if let Some(cipher) = &last_object_id.cipher {
1973            // If the object ID cipher has been rolled, then it's possible we might see object IDs
1974            // that were generated using a different cipher so the decrypt here will return the
1975            // wrong value, but that won't matter because the hi part of the object ID should still
1976            // discriminate.
1977            object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
1978        }
1979        if object_id > last_object_id.id {
1980            last_object_id.id = object_id;
1981        }
1982    }
1983
1984    /// Adds the specified object to the graveyard.
1985    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
1986        let graveyard_id = self.graveyard_directory_object_id();
1987        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
1988        transaction.add(
1989            self.store_object_id,
1990            Mutation::replace_or_insert_object(
1991                ObjectKey::graveyard_entry(graveyard_id, object_id),
1992                ObjectValue::Some,
1993            ),
1994        );
1995    }
1996
1997    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
1998    /// this because graveyard entries are used for purging deleted files *and* for trimming
1999    /// extents.  For example, consider the following sequence:
2000    ///
2001    ///     1. Add Trim graveyard entry.
2002    ///     2. Replace with Some graveyard entry (see above).
2003    ///     3. Remove graveyard entry.
2004    ///
2005    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2006    /// actually be:
2007    ///
2008    ///     3. Replace with Trim graveyard entry.
2009    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2010        transaction.add(
2011            self.store_object_id,
2012            Mutation::replace_or_insert_object(
2013                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2014                ObjectValue::None,
2015            ),
2016        );
2017    }
2018
2019    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2020    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2021    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2022    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2023    pub fn remove_attribute_from_graveyard(
2024        &self,
2025        transaction: &mut Transaction<'_>,
2026        object_id: u64,
2027        attribute_id: u64,
2028    ) {
2029        transaction.add(
2030            self.store_object_id,
2031            Mutation::replace_or_insert_object(
2032                ObjectKey::graveyard_attribute_entry(
2033                    self.graveyard_directory_object_id(),
2034                    object_id,
2035                    attribute_id,
2036                ),
2037                ObjectValue::None,
2038            ),
2039        );
2040    }
2041
2042    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2043    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2044        let (wrapped_key, unwrapped_key) =
2045            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2046
2047        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2048        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2049        // end up writing the wrong wrapped key.
2050        let mut cipher = self.mutations_cipher.lock();
2051        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2052        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2053        // mutations_cipher_offset is updated by flush.
2054        Ok(())
2055    }
2056
2057    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2058    // is identical to that which was passed in as the target on `create_symlink`.
2059    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2060    // get a standard length and then base64 encodes the hash and returns that to the caller.
2061    pub async fn read_encrypted_symlink(
2062        &self,
2063        object_id: u64,
2064        link: Vec<u8>,
2065    ) -> Result<Vec<u8>, Error> {
2066        let mut link = link;
2067        let key = self
2068            .key_manager()
2069            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2070                self.get_keys(object_id).await
2071            })
2072            .await?;
2073        if let Some(key) = key {
2074            key.decrypt_filename(object_id, &mut link)?;
2075            Ok(link)
2076        } else {
2077            let digest = Sha256::hash(&link).bytes();
2078            let encrypted_link = BASE64_URL_SAFE_NO_PAD.encode(&digest);
2079            Ok(encrypted_link.into())
2080        }
2081    }
2082
2083    /// Returns the link of a symlink object.
2084    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2085        match self.tree.find(&ObjectKey::object(object_id)).await? {
2086            None => bail!(FxfsError::NotFound),
2087            Some(Item {
2088                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2089                ..
2090            }) => self.read_encrypted_symlink(object_id, link).await,
2091            Some(Item {
2092                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2093                ..
2094            }) => Ok(link),
2095            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2096                .context(format!("Unexpected item in lookup: {item:?}"))),
2097        }
2098    }
2099
2100    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2101    /// will be considered an inconsistency if they don't.
2102    pub async fn get_keys(&self, object_id: u64) -> Result<WrappedKeys, Error> {
2103        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2104            Item { value: ObjectValue::Keys(EncryptionKeys::AES256XTS(keys)), .. } => Ok(keys),
2105            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2106        }
2107    }
2108
2109    pub async fn update_attributes<'a>(
2110        &self,
2111        transaction: &mut Transaction<'a>,
2112        object_id: u64,
2113        node_attributes: Option<&fio::MutableNodeAttributes>,
2114        change_time: Option<Timestamp>,
2115    ) -> Result<(), Error> {
2116        if change_time.is_none() {
2117            if let Some(attributes) = node_attributes {
2118                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2119                if *attributes == empty_attributes {
2120                    return Ok(());
2121                }
2122            } else {
2123                return Ok(());
2124            }
2125        }
2126        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2127        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2128            if let Some(time) = change_time {
2129                attributes.change_time = time;
2130            }
2131            if let Some(node_attributes) = node_attributes {
2132                if let Some(time) = node_attributes.creation_time {
2133                    attributes.creation_time = Timestamp::from_nanos(time);
2134                }
2135                if let Some(time) = node_attributes.modification_time {
2136                    attributes.modification_time = Timestamp::from_nanos(time);
2137                }
2138                if let Some(time) = node_attributes.access_time {
2139                    attributes.access_time = Timestamp::from_nanos(time);
2140                }
2141                if node_attributes.mode.is_some()
2142                    || node_attributes.uid.is_some()
2143                    || node_attributes.gid.is_some()
2144                    || node_attributes.rdev.is_some()
2145                {
2146                    if let Some(a) = &mut attributes.posix_attributes {
2147                        if let Some(mode) = node_attributes.mode {
2148                            a.mode = mode;
2149                        }
2150                        if let Some(uid) = node_attributes.uid {
2151                            a.uid = uid;
2152                        }
2153                        if let Some(gid) = node_attributes.gid {
2154                            a.gid = gid;
2155                        }
2156                        if let Some(rdev) = node_attributes.rdev {
2157                            a.rdev = rdev;
2158                        }
2159                    } else {
2160                        attributes.posix_attributes = Some(PosixAttributes {
2161                            mode: node_attributes.mode.unwrap_or_default(),
2162                            uid: node_attributes.uid.unwrap_or_default(),
2163                            gid: node_attributes.gid.unwrap_or_default(),
2164                            rdev: node_attributes.rdev.unwrap_or_default(),
2165                        });
2166                    }
2167                }
2168            }
2169        } else {
2170            bail!(anyhow!(FxfsError::Inconsistent)
2171                .context("ObjectStore.update_attributes: Expected object value"));
2172        };
2173        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2174        Ok(())
2175    }
2176
2177    // Updates and commits the changes to access time in ObjectProperties. The update matches
2178    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2179    // than or equal to the last modification or status change, or if it has been more than a day
2180    // since the last access.
2181    pub async fn update_access_time(
2182        &self,
2183        object_id: u64,
2184        props: &mut ObjectProperties,
2185    ) -> Result<(), Error> {
2186        let access_time = props.access_time.as_nanos();
2187        let modification_time = props.modification_time.as_nanos();
2188        let change_time = props.change_time.as_nanos();
2189        let now = Timestamp::now();
2190        if access_time <= modification_time
2191            || access_time <= change_time
2192            || access_time
2193                < now.as_nanos()
2194                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2195        {
2196            let mut transaction = self
2197                .filesystem()
2198                .clone()
2199                .new_transaction(
2200                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2201                    Options { borrow_metadata_space: true, ..Default::default() },
2202                )
2203                .await?;
2204            self.update_attributes(
2205                &mut transaction,
2206                object_id,
2207                Some(&fio::MutableNodeAttributes {
2208                    access_time: Some(now.as_nanos()),
2209                    ..Default::default()
2210                }),
2211                None,
2212            )
2213            .await?;
2214            transaction.commit().await?;
2215            props.access_time = now;
2216        }
2217        Ok(())
2218    }
2219}
2220
2221#[async_trait]
2222impl JournalingObject for ObjectStore {
2223    fn apply_mutation(
2224        &self,
2225        mutation: Mutation,
2226        context: &ApplyContext<'_, '_>,
2227        _assoc_obj: AssocObj<'_>,
2228    ) -> Result<(), Error> {
2229        match &*self.lock_state.lock() {
2230            LockState::Locked | LockState::Locking => {
2231                ensure!(
2232                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2233                        || matches!(
2234                            mutation,
2235                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2236                                if context.mode.is_replay()
2237                        ),
2238                    anyhow!(FxfsError::Inconsistent)
2239                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2240                );
2241            }
2242            LockState::Invalid
2243            | LockState::Unlocking
2244            | LockState::Unencrypted
2245            | LockState::Unlocked { .. }
2246            | LockState::UnlockedReadOnly(..) => {}
2247            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2248        }
2249        match mutation {
2250            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2251                item.sequence = context.checkpoint.file_offset;
2252                match op {
2253                    Operation::Insert => {
2254                        // If we are inserting an object record for the first time, it signifies the
2255                        // birth of the object so we need to adjust the object count.
2256                        if matches!(item.value, ObjectValue::Object { .. }) {
2257                            {
2258                                let info = &mut self.store_info.lock();
2259                                let object_count = &mut info.as_mut().unwrap().object_count;
2260                                *object_count = object_count.saturating_add(1);
2261                            }
2262                            if context.mode.is_replay() {
2263                                self.update_last_object_id(item.key.object_id);
2264                            }
2265                        }
2266                        self.tree.insert(item)?;
2267                    }
2268                    Operation::ReplaceOrInsert => {
2269                        self.tree.replace_or_insert(item);
2270                    }
2271                    Operation::Merge => {
2272                        if item.is_tombstone() {
2273                            let info = &mut self.store_info.lock();
2274                            let object_count = &mut info.as_mut().unwrap().object_count;
2275                            *object_count = object_count.saturating_sub(1);
2276                        }
2277                        let lower_bound = item.key.key_for_merge_into();
2278                        self.tree.merge_into(item, &lower_bound);
2279                    }
2280                }
2281            }
2282            Mutation::BeginFlush => {
2283                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2284                self.tree.seal();
2285            }
2286            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2287            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2288                // We will process these during Self::unlock.
2289                ensure!(
2290                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2291                    FxfsError::Inconsistent
2292                );
2293            }
2294            Mutation::CreateInternalDir(object_id) => {
2295                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2296                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2297            }
2298            _ => bail!("unexpected mutation: {:?}", mutation),
2299        }
2300        self.counters.lock().mutations_applied += 1;
2301        Ok(())
2302    }
2303
2304    fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
2305        self.counters.lock().mutations_dropped += 1;
2306    }
2307
2308    /// Push all in-memory structures to the device. This is not necessary for sync since the
2309    /// journal will take care of it.  This is supposed to be called when there is either memory or
2310    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2311    /// be trimmed).
2312    ///
2313    /// Also returns the earliest version of a struct in the filesystem (when known).
2314    async fn flush(&self) -> Result<Version, Error> {
2315        self.flush_with_reason(flush::Reason::Journal).await
2316    }
2317
2318    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2319        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2320        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2321        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2322        // won't be replayed after reading `StoreInfo`.
2323        match mutation {
2324            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2325            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2326            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2327            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2328            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2329            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2330            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2331            // encrypted mutations and handled them all in sequence during `unlock()`.
2332            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2333                let mut cipher = self.mutations_cipher.lock();
2334                if let Some(cipher) = cipher.as_mut() {
2335                    // If this is the first time we've used this key, we must write the key out.
2336                    if cipher.offset() == 0 {
2337                        writer.write(Mutation::update_mutations_key(
2338                            self.store_info
2339                                .lock()
2340                                .as_ref()
2341                                .unwrap()
2342                                .mutations_key
2343                                .as_ref()
2344                                .unwrap()
2345                                .clone(),
2346                        ));
2347                    }
2348                    let mut buffer = Vec::new();
2349                    mutation.serialize_into(&mut buffer).unwrap();
2350                    cipher.encrypt(&mut buffer);
2351                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2352                    return;
2353                }
2354            }
2355            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2356            // encrypted object stores, but are either the encrypted mutation data itself or
2357            // metadata governing how the data will be encrypted. They should only be produced here.
2358            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2359                debug_assert!(false, "Only this method should generate encrypted mutations");
2360            }
2361            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2362            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2363            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2364            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2365            Mutation::Allocator(_)
2366            | Mutation::BeginFlush
2367            | Mutation::EndFlush
2368            | Mutation::DeleteVolume
2369            | Mutation::UpdateBorrowed(_) => {}
2370        }
2371        writer.write(mutation.clone());
2372    }
2373}
2374
2375impl HandleOwner for ObjectStore {}
2376
2377impl AsRef<ObjectStore> for ObjectStore {
2378    fn as_ref(&self) -> &ObjectStore {
2379        self
2380    }
2381}
2382
2383fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2384    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2385    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2386    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2387    // return once the data has been written to layer files and <= than
2388    // reserved_space_from_journal_usage would use.  We can't just use
2389    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2390    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2391    size * 3
2392}
2393
2394impl AssociatedObject for ObjectStore {}
2395
2396/// Argument to the trim_some method.
2397#[derive(Debug)]
2398pub enum TrimMode {
2399    /// Trim extents beyond the current size.
2400    UseSize,
2401
2402    /// Trim extents beyond the supplied offset.
2403    FromOffset(u64),
2404
2405    /// Remove the object (or attribute) from the store once it is fully trimmed.
2406    Tombstone(TombstoneMode),
2407}
2408
2409/// Sets the mode for tombstoning (either at the object or attribute level).
2410#[derive(Debug)]
2411pub enum TombstoneMode {
2412    Object,
2413    Attribute,
2414}
2415
2416/// Result of the trim_some method.
2417#[derive(Debug)]
2418pub enum TrimResult {
2419    /// We reached the limit of the transaction and more extents might follow.
2420    Incomplete,
2421
2422    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2423    /// there is one.
2424    Done(Option<u64>),
2425}
2426
2427/// Loads store info.
2428pub async fn load_store_info(
2429    parent: &Arc<ObjectStore>,
2430    store_object_id: u64,
2431) -> Result<StoreInfo, Error> {
2432    let handle =
2433        ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
2434
2435    Ok(if handle.get_size() > 0 {
2436        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2437        let mut cursor = std::io::Cursor::new(serialized_info);
2438        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2439            .context("Failed to deserialize StoreInfo")?;
2440        store_info
2441    } else {
2442        // The store_info will be absent for a newly created and empty object store.
2443        StoreInfo::default()
2444    })
2445}
2446
2447#[cfg(test)]
2448mod tests {
2449    use super::{
2450        StoreInfo, DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID,
2451        MAX_STORE_INFO_SERIALIZED_SIZE, NO_OWNER, OBJECT_ID_HI_MASK,
2452    };
2453    use crate::errors::FxfsError;
2454    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2455    use crate::fsck::fsck;
2456    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2457    use crate::lsm_tree::Query;
2458    use crate::object_handle::{
2459        ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
2460    };
2461    use crate::object_store::directory::Directory;
2462    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2463    use crate::object_store::transaction::{lock_keys, Options};
2464    use crate::object_store::volume::root_volume;
2465    use crate::object_store::{
2466        FsverityMetadata, HandleOptions, LockKey, Mutation, ObjectStore, RootDigest, StoreOwner,
2467    };
2468    use crate::serialized_types::VersionedLatest;
2469    use assert_matches::assert_matches;
2470    use async_trait::async_trait;
2471    use fuchsia_async as fasync;
2472    use fuchsia_sync::Mutex;
2473    use futures::join;
2474    use fxfs_crypto::{Crypt, WrappedKey, WrappedKeyBytes, WRAPPED_KEY_SIZE};
2475    use fxfs_insecure_crypto::InsecureCrypt;
2476    use std::sync::Arc;
2477    use std::time::Duration;
2478    use storage_device::fake_device::FakeDevice;
2479    use storage_device::DeviceHolder;
2480
2481    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2482
2483    async fn test_filesystem() -> OpenFxFilesystem {
2484        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2485        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2486    }
2487
2488    #[fuchsia::test]
2489    async fn test_item_sequences() {
2490        let fs = test_filesystem().await;
2491        let object1;
2492        let object2;
2493        let object3;
2494        let mut transaction = fs
2495            .clone()
2496            .new_transaction(lock_keys![], Options::default())
2497            .await
2498            .expect("new_transaction failed");
2499        let store = fs.root_store();
2500        object1 = Arc::new(
2501            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2502                .await
2503                .expect("create_object failed"),
2504        );
2505        transaction.commit().await.expect("commit failed");
2506        let mut transaction = fs
2507            .clone()
2508            .new_transaction(lock_keys![], Options::default())
2509            .await
2510            .expect("new_transaction failed");
2511        object2 = Arc::new(
2512            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2513                .await
2514                .expect("create_object failed"),
2515        );
2516        transaction.commit().await.expect("commit failed");
2517
2518        fs.sync(SyncOptions::default()).await.expect("sync failed");
2519
2520        let mut transaction = fs
2521            .clone()
2522            .new_transaction(lock_keys![], Options::default())
2523            .await
2524            .expect("new_transaction failed");
2525        object3 = Arc::new(
2526            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2527                .await
2528                .expect("create_object failed"),
2529        );
2530        transaction.commit().await.expect("commit failed");
2531
2532        let layer_set = store.tree.layer_set();
2533        let mut merger = layer_set.merger();
2534        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2535        let mut sequences = [0u64; 3];
2536        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2537            if *object_id == object1.object_id() {
2538                sequences[0] = sequence;
2539            } else if *object_id == object2.object_id() {
2540                sequences[1] = sequence;
2541            } else if *object_id == object3.object_id() {
2542                sequences[2] = sequence;
2543            }
2544            iter.advance().await.expect("advance failed");
2545        }
2546
2547        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2548        // The last item came after a sync, so should be strictly greater.
2549        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2550        fs.close().await.expect("Close failed");
2551    }
2552
2553    #[fuchsia::test]
2554    async fn test_verified_file_with_verified_attribute() {
2555        let fs: OpenFxFilesystem = test_filesystem().await;
2556        let mut transaction = fs
2557            .clone()
2558            .new_transaction(lock_keys![], Options::default())
2559            .await
2560            .expect("new_transaction failed");
2561        let store = fs.root_store();
2562        let object = Arc::new(
2563            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2564                .await
2565                .expect("create_object failed"),
2566        );
2567
2568        transaction.add(
2569            store.store_object_id(),
2570            Mutation::replace_or_insert_object(
2571                ObjectKey::attribute(
2572                    object.object_id(),
2573                    DEFAULT_DATA_ATTRIBUTE_ID,
2574                    AttributeKey::Attribute,
2575                ),
2576                ObjectValue::verified_attribute(
2577                    0,
2578                    FsverityMetadata { root_digest: RootDigest::Sha256([0; 32]), salt: vec![] },
2579                ),
2580            ),
2581        );
2582
2583        transaction.add(
2584            store.store_object_id(),
2585            Mutation::replace_or_insert_object(
2586                ObjectKey::attribute(
2587                    object.object_id(),
2588                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2589                    AttributeKey::Attribute,
2590                ),
2591                ObjectValue::attribute(0, false),
2592            ),
2593        );
2594
2595        transaction.commit().await.unwrap();
2596
2597        let handle =
2598            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2599                .await
2600                .expect("open_object failed");
2601
2602        assert!(handle.is_verified_file());
2603
2604        fs.close().await.expect("Close failed");
2605    }
2606
2607    #[fuchsia::test]
2608    async fn test_verified_file_without_verified_attribute() {
2609        let fs: OpenFxFilesystem = test_filesystem().await;
2610        let mut transaction = fs
2611            .clone()
2612            .new_transaction(lock_keys![], Options::default())
2613            .await
2614            .expect("new_transaction failed");
2615        let store = fs.root_store();
2616        let object = Arc::new(
2617            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2618                .await
2619                .expect("create_object failed"),
2620        );
2621
2622        transaction.commit().await.unwrap();
2623
2624        let handle =
2625            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2626                .await
2627                .expect("open_object failed");
2628
2629        assert!(!handle.is_verified_file());
2630
2631        fs.close().await.expect("Close failed");
2632    }
2633
2634    #[fuchsia::test]
2635    async fn test_create_and_open_store() {
2636        let fs = test_filesystem().await;
2637        let store_id = {
2638            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2639            root_volume
2640                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2641                .await
2642                .expect("new_volume failed")
2643                .store_object_id()
2644        };
2645
2646        fs.close().await.expect("close failed");
2647        let device = fs.take_device().await;
2648        device.reopen(false);
2649        let fs = FxFilesystem::open(device).await.expect("open failed");
2650
2651        {
2652            let store = fs.object_manager().store(store_id).expect("store not found");
2653            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2654        }
2655        fs.close().await.expect("Close failed");
2656    }
2657
2658    #[fuchsia::test]
2659    async fn test_create_and_open_internal_dir() {
2660        let fs = test_filesystem().await;
2661        let dir_id;
2662        let store_id;
2663        {
2664            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2665            let store = root_volume
2666                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2667                .await
2668                .expect("new_volume failed");
2669            dir_id =
2670                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2671            store_id = store.store_object_id();
2672        }
2673
2674        fs.close().await.expect("close failed");
2675        let device = fs.take_device().await;
2676        device.reopen(false);
2677        let fs = FxFilesystem::open(device).await.expect("open failed");
2678
2679        {
2680            let store = fs.object_manager().store(store_id).expect("store not found");
2681            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2682            assert_eq!(
2683                dir_id,
2684                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2685            );
2686            let obj = store
2687                .tree()
2688                .find(&ObjectKey::object(dir_id))
2689                .await
2690                .expect("Searching tree for dir")
2691                .unwrap();
2692            assert_matches!(
2693                obj.value,
2694                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2695            );
2696        }
2697        fs.close().await.expect("Close failed");
2698    }
2699
2700    #[fuchsia::test]
2701    async fn test_create_and_open_internal_dir_unencrypted() {
2702        let fs = test_filesystem().await;
2703        let dir_id;
2704        let store_id;
2705        {
2706            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2707            let store =
2708                root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2709            dir_id =
2710                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2711            store_id = store.store_object_id();
2712        }
2713
2714        fs.close().await.expect("close failed");
2715        let device = fs.take_device().await;
2716        device.reopen(false);
2717        let fs = FxFilesystem::open(device).await.expect("open failed");
2718
2719        {
2720            let store = fs.object_manager().store(store_id).expect("store not found");
2721            assert_eq!(
2722                dir_id,
2723                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2724            );
2725            let obj = store
2726                .tree()
2727                .find(&ObjectKey::object(dir_id))
2728                .await
2729                .expect("Searching tree for dir")
2730                .unwrap();
2731            assert_matches!(
2732                obj.value,
2733                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2734            );
2735        }
2736        fs.close().await.expect("Close failed");
2737    }
2738
2739    #[fuchsia::test(threads = 10)]
2740    async fn test_old_layers_are_purged() {
2741        let fs = test_filesystem().await;
2742
2743        let store = fs.root_store();
2744        let mut transaction = fs
2745            .clone()
2746            .new_transaction(lock_keys![], Options::default())
2747            .await
2748            .expect("new_transaction failed");
2749        let object = Arc::new(
2750            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2751                .await
2752                .expect("create_object failed"),
2753        );
2754        transaction.commit().await.expect("commit failed");
2755
2756        store.flush().await.expect("flush failed");
2757
2758        let mut buf = object.allocate_buffer(5).await;
2759        buf.as_mut_slice().copy_from_slice(b"hello");
2760        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2761
2762        // Getting the layer-set should cause the flush to stall.
2763        let layer_set = store.tree().layer_set();
2764
2765        let done = Mutex::new(false);
2766        let mut object_id = 0;
2767
2768        join!(
2769            async {
2770                store.flush().await.expect("flush failed");
2771                assert!(*done.lock());
2772            },
2773            async {
2774                // This is a halting problem so all we can do is sleep.
2775                fasync::Timer::new(Duration::from_secs(1)).await;
2776                *done.lock() = true;
2777                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
2778                std::mem::drop(layer_set);
2779            }
2780        );
2781
2782        if let Err(e) = ObjectStore::open_object(
2783            &store.parent_store.as_ref().unwrap(),
2784            object_id,
2785            HandleOptions::default(),
2786            store.crypt(),
2787        )
2788        .await
2789        {
2790            assert!(FxfsError::NotFound.matches(&e));
2791        } else {
2792            panic!("open_object succeeded");
2793        }
2794    }
2795
2796    #[fuchsia::test]
2797    async fn test_tombstone_deletes_data() {
2798        let fs = test_filesystem().await;
2799        let root_store = fs.root_store();
2800        let child_id = {
2801            let mut transaction = fs
2802                .clone()
2803                .new_transaction(lock_keys![], Options::default())
2804                .await
2805                .expect("new_transaction failed");
2806            let child = ObjectStore::create_object(
2807                &root_store,
2808                &mut transaction,
2809                HandleOptions::default(),
2810                None,
2811            )
2812            .await
2813            .expect("create_object failed");
2814            transaction.commit().await.expect("commit failed");
2815
2816            // Allocate an extent in the file.
2817            let mut buffer = child.allocate_buffer(8192).await;
2818            buffer.as_mut_slice().fill(0xaa);
2819            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2820
2821            child.object_id()
2822        };
2823
2824        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2825
2826        // Let fsck check allocations.
2827        fsck(fs.clone()).await.expect("fsck failed");
2828    }
2829
2830    #[fuchsia::test]
2831    async fn test_tombstone_purges_keys() {
2832        let fs = test_filesystem().await;
2833        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2834        let store = root_volume
2835            .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2836            .await
2837            .expect("new_volume failed");
2838        let mut transaction = fs
2839            .clone()
2840            .new_transaction(lock_keys![], Options::default())
2841            .await
2842            .expect("new_transaction failed");
2843        let child =
2844            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2845                .await
2846                .expect("create_object failed");
2847        transaction.commit().await.expect("commit failed");
2848        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
2849        store
2850            .tombstone_object(child.object_id(), Options::default())
2851            .await
2852            .expect("tombstone_object failed");
2853        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
2854        fs.close().await.expect("close failed");
2855    }
2856
2857    #[fuchsia::test]
2858    async fn test_major_compaction_discards_unnecessary_records() {
2859        let fs = test_filesystem().await;
2860        let root_store = fs.root_store();
2861        let child_id = {
2862            let mut transaction = fs
2863                .clone()
2864                .new_transaction(lock_keys![], Options::default())
2865                .await
2866                .expect("new_transaction failed");
2867            let child = ObjectStore::create_object(
2868                &root_store,
2869                &mut transaction,
2870                HandleOptions::default(),
2871                None,
2872            )
2873            .await
2874            .expect("create_object failed");
2875            transaction.commit().await.expect("commit failed");
2876
2877            // Allocate an extent in the file.
2878            let mut buffer = child.allocate_buffer(8192).await;
2879            buffer.as_mut_slice().fill(0xaa);
2880            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2881
2882            child.object_id()
2883        };
2884
2885        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2886        {
2887            let layers = root_store.tree.layer_set();
2888            let mut merger = layers.merger();
2889            let iter = merger
2890                .query(Query::FullRange(&ObjectKey::object(child_id)))
2891                .await
2892                .expect("seek failed");
2893            // Find at least one object still in the tree.
2894            match iter.get() {
2895                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
2896                    if *object_id == child_id => {}
2897                _ => panic!("Objects should still be in the tree."),
2898            }
2899        }
2900        root_store.flush().await.expect("flush failed");
2901
2902        // There should be no records for the object.
2903        let layers = root_store.tree.layer_set();
2904        let mut merger = layers.merger();
2905        let iter = merger
2906            .query(Query::FullRange(&ObjectKey::object(child_id)))
2907            .await
2908            .expect("seek failed");
2909        match iter.get() {
2910            None => {}
2911            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
2912                assert_ne!(*object_id, child_id)
2913            }
2914        }
2915    }
2916
2917    #[fuchsia::test]
2918    async fn test_overlapping_extents_in_different_layers() {
2919        let fs = test_filesystem().await;
2920        let store = fs.root_store();
2921
2922        let mut transaction = fs
2923            .clone()
2924            .new_transaction(
2925                lock_keys![LockKey::object(
2926                    store.store_object_id(),
2927                    store.root_directory_object_id()
2928                )],
2929                Options::default(),
2930            )
2931            .await
2932            .expect("new_transaction failed");
2933        let root_directory =
2934            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2935        let object = root_directory
2936            .create_child_file(&mut transaction, "test")
2937            .await
2938            .expect("create_child_file failed");
2939        transaction.commit().await.expect("commit failed");
2940
2941        let buf = object.allocate_buffer(16384).await;
2942        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2943
2944        store.flush().await.expect("flush failed");
2945
2946        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
2947
2948        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
2949        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
2950        // overwrite both of those extents.
2951        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2952
2953        fsck(fs.clone()).await.expect("fsck failed");
2954    }
2955
2956    #[fuchsia::test(threads = 10)]
2957    async fn test_encrypted_mutations() {
2958        async fn one_iteration(
2959            fs: OpenFxFilesystem,
2960            crypt: Arc<dyn Crypt>,
2961            iteration: u64,
2962        ) -> OpenFxFilesystem {
2963            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
2964                fs.close().await.expect("Close failed");
2965                let device = fs.take_device().await;
2966                device.reopen(false);
2967                FxFilesystem::open(device).await.expect("FS open failed")
2968            }
2969
2970            let fs = reopen(fs).await;
2971
2972            let (store_object_id, object_id) = {
2973                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2974                let store = root_volume
2975                    .volume("test", NO_OWNER, Some(crypt.clone()))
2976                    .await
2977                    .expect("volume failed");
2978
2979                let mut transaction = fs
2980                    .clone()
2981                    .new_transaction(
2982                        lock_keys![LockKey::object(
2983                            store.store_object_id(),
2984                            store.root_directory_object_id(),
2985                        )],
2986                        Options::default(),
2987                    )
2988                    .await
2989                    .expect("new_transaction failed");
2990                let root_directory = Directory::open(&store, store.root_directory_object_id())
2991                    .await
2992                    .expect("open failed");
2993                let object = root_directory
2994                    .create_child_file(&mut transaction, &format!("test {}", iteration))
2995                    .await
2996                    .expect("create_child_file failed");
2997                transaction.commit().await.expect("commit failed");
2998
2999                let mut buf = object.allocate_buffer(1000).await;
3000                for i in 0..buf.len() {
3001                    buf.as_mut_slice()[i] = i as u8;
3002                }
3003                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3004
3005                (store.store_object_id(), object.object_id())
3006            };
3007
3008            let fs = reopen(fs).await;
3009
3010            let check_object = |fs: Arc<FxFilesystem>| {
3011                let crypt = crypt.clone();
3012                async move {
3013                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3014                    let volume = root_volume
3015                        .volume("test", NO_OWNER, Some(crypt))
3016                        .await
3017                        .expect("volume failed");
3018
3019                    let object = ObjectStore::open_object(
3020                        &volume,
3021                        object_id,
3022                        HandleOptions::default(),
3023                        None,
3024                    )
3025                    .await
3026                    .expect("open_object failed");
3027                    let mut buf = object.allocate_buffer(1000).await;
3028                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3029                    for i in 0..buf.len() {
3030                        assert_eq!(buf.as_slice()[i], i as u8);
3031                    }
3032                }
3033            };
3034
3035            check_object(fs.clone()).await;
3036
3037            let fs = reopen(fs).await;
3038
3039            // At this point the "test" volume is locked.  Before checking the object, flush the
3040            // filesystem.  This should leave a file with encrypted mutations.
3041            fs.object_manager().flush().await.expect("flush failed");
3042
3043            assert_ne!(
3044                fs.object_manager()
3045                    .store(store_object_id)
3046                    .unwrap()
3047                    .load_store_info()
3048                    .await
3049                    .expect("load_store_info failed")
3050                    .encrypted_mutations_object_id,
3051                INVALID_OBJECT_ID
3052            );
3053
3054            check_object(fs.clone()).await;
3055
3056            // Checking the object should have triggered a flush and so now there should be no
3057            // encrypted mutations object.
3058            assert_eq!(
3059                fs.object_manager()
3060                    .store(store_object_id)
3061                    .unwrap()
3062                    .load_store_info()
3063                    .await
3064                    .expect("load_store_info failed")
3065                    .encrypted_mutations_object_id,
3066                INVALID_OBJECT_ID
3067            );
3068
3069            let fs = reopen(fs).await;
3070
3071            fsck(fs.clone()).await.expect("fsck failed");
3072
3073            let fs = reopen(fs).await;
3074
3075            check_object(fs.clone()).await;
3076
3077            fs
3078        }
3079
3080        let mut fs = test_filesystem().await;
3081        let crypt = Arc::new(InsecureCrypt::new());
3082
3083        {
3084            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3085            let _store = root_volume
3086                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3087                .await
3088                .expect("new_volume failed");
3089        }
3090
3091        // Run a few iterations so that we test changes with the stream cipher offset.
3092        for i in 0..5 {
3093            fs = one_iteration(fs, crypt.clone(), i).await;
3094        }
3095    }
3096
3097    #[fuchsia::test(threads = 10)]
3098    async fn test_object_id_cipher_roll() {
3099        let fs = test_filesystem().await;
3100        let crypt = Arc::new(InsecureCrypt::new());
3101
3102        {
3103            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3104            let store = root_volume
3105                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3106                .await
3107                .expect("new_volume failed");
3108
3109            let store_info = store.store_info().unwrap();
3110
3111            // Hack the last object ID to force a roll of the object ID cipher.
3112            {
3113                let mut last_object_id = store.last_object_id.lock();
3114                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 1u64 << 32);
3115                last_object_id.id |= 0xffffffff;
3116            }
3117
3118            let mut transaction = fs
3119                .clone()
3120                .new_transaction(
3121                    lock_keys![LockKey::object(
3122                        store.store_object_id(),
3123                        store.root_directory_object_id()
3124                    )],
3125                    Options::default(),
3126                )
3127                .await
3128                .expect("new_transaction failed");
3129            let root_directory = Directory::open(&store, store.root_directory_object_id())
3130                .await
3131                .expect("open failed");
3132            let object = root_directory
3133                .create_child_file(&mut transaction, "test")
3134                .await
3135                .expect("create_child_file failed");
3136            transaction.commit().await.expect("commit failed");
3137
3138            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 2u64 << 32);
3139
3140            // Check that the key has been changed.
3141            assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
3142
3143            assert_eq!(store.last_object_id.lock().id, 2u64 << 32);
3144        };
3145
3146        fs.close().await.expect("Close failed");
3147        let device = fs.take_device().await;
3148        device.reopen(false);
3149        let fs = FxFilesystem::open(device).await.expect("open failed");
3150        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3151        let store =
3152            root_volume.volume("test", NO_OWNER, Some(crypt.clone())).await.expect("volume failed");
3153
3154        assert_eq!(store.last_object_id.lock().id, 2u64 << 32);
3155    }
3156
3157    #[fuchsia::test(threads = 10)]
3158    async fn test_lock_store() {
3159        let fs = test_filesystem().await;
3160        let crypt = Arc::new(InsecureCrypt::new());
3161
3162        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3163        let store = root_volume
3164            .new_volume("test", NO_OWNER, Some(crypt.clone()))
3165            .await
3166            .expect("new_volume failed");
3167        let mut transaction = fs
3168            .clone()
3169            .new_transaction(
3170                lock_keys![LockKey::object(
3171                    store.store_object_id(),
3172                    store.root_directory_object_id()
3173                )],
3174                Options::default(),
3175            )
3176            .await
3177            .expect("new_transaction failed");
3178        let root_directory =
3179            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3180        root_directory
3181            .create_child_file(&mut transaction, "test")
3182            .await
3183            .expect("create_child_file failed");
3184        transaction.commit().await.expect("commit failed");
3185        store.lock().await.expect("lock failed");
3186
3187        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3188        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3189    }
3190
3191    #[fuchsia::test(threads = 10)]
3192    async fn test_unlock_read_only() {
3193        let fs = test_filesystem().await;
3194        let crypt = Arc::new(InsecureCrypt::new());
3195
3196        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3197        let store = root_volume
3198            .new_volume("test", NO_OWNER, Some(crypt.clone()))
3199            .await
3200            .expect("new_volume failed");
3201        let mut transaction = fs
3202            .clone()
3203            .new_transaction(
3204                lock_keys![LockKey::object(
3205                    store.store_object_id(),
3206                    store.root_directory_object_id()
3207                )],
3208                Options::default(),
3209            )
3210            .await
3211            .expect("new_transaction failed");
3212        let root_directory =
3213            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3214        root_directory
3215            .create_child_file(&mut transaction, "test")
3216            .await
3217            .expect("create_child_file failed");
3218        transaction.commit().await.expect("commit failed");
3219        store.lock().await.expect("lock failed");
3220
3221        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
3222        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3223        store.lock_read_only();
3224        store.unlock_read_only(crypt).await.expect("unlock failed");
3225        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3226    }
3227
3228    #[fuchsia::test(threads = 10)]
3229    async fn test_key_rolled_when_unlocked() {
3230        let fs = test_filesystem().await;
3231        let crypt = Arc::new(InsecureCrypt::new());
3232
3233        let object_id;
3234        {
3235            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3236            let store = root_volume
3237                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3238                .await
3239                .expect("new_volume failed");
3240            let mut transaction = fs
3241                .clone()
3242                .new_transaction(
3243                    lock_keys![LockKey::object(
3244                        store.store_object_id(),
3245                        store.root_directory_object_id()
3246                    )],
3247                    Options::default(),
3248                )
3249                .await
3250                .expect("new_transaction failed");
3251            let root_directory = Directory::open(&store, store.root_directory_object_id())
3252                .await
3253                .expect("open failed");
3254            object_id = root_directory
3255                .create_child_file(&mut transaction, "test")
3256                .await
3257                .expect("create_child_file failed")
3258                .object_id();
3259            transaction.commit().await.expect("commit failed");
3260        }
3261
3262        fs.close().await.expect("Close failed");
3263        let mut device = fs.take_device().await;
3264
3265        // Repeatedly remount so that we can be sure that we can remount when there are many
3266        // mutations keys.
3267        for _ in 0..100 {
3268            device.reopen(false);
3269            let fs = FxFilesystem::open(device).await.expect("open failed");
3270            {
3271                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3272                let store = root_volume
3273                    .volume("test", NO_OWNER, Some(crypt.clone()))
3274                    .await
3275                    .expect("open_volume failed");
3276
3277                // The key should get rolled every time we unlock.
3278                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
3279
3280                // Make sure there's an encrypted mutation.
3281                let handle =
3282                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
3283                        .await
3284                        .expect("open_object failed");
3285                let buffer = handle.allocate_buffer(100).await;
3286                handle
3287                    .write_or_append(Some(0), buffer.as_ref())
3288                    .await
3289                    .expect("write_or_append failed");
3290            }
3291            fs.close().await.expect("Close failed");
3292            device = fs.take_device().await;
3293        }
3294    }
3295
3296    #[test]
3297    fn test_store_info_max_serialized_size() {
3298        let info = StoreInfo {
3299            guid: [0xff; 16],
3300            last_object_id: 0x1234567812345678,
3301            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
3302            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
3303            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
3304            // any size should fit.
3305            layers: vec![0x1234567812345678; 120],
3306            root_directory_object_id: 0x1234567812345678,
3307            graveyard_directory_object_id: 0x1234567812345678,
3308            object_count: 0x1234567812345678,
3309            mutations_key: Some(WrappedKey {
3310                wrapping_key_id: 0x1234567812345678,
3311                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
3312            }),
3313            mutations_cipher_offset: 0x1234567812345678,
3314            encrypted_mutations_object_id: 0x1234567812345678,
3315            object_id_key: Some(WrappedKey {
3316                wrapping_key_id: 0x1234567812345678,
3317                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
3318            }),
3319            internal_directory_object_id: INVALID_OBJECT_ID,
3320        };
3321        let mut serialized_info = Vec::new();
3322        info.serialize_with_version(&mut serialized_info).unwrap();
3323        assert!(
3324            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
3325            "{}",
3326            serialized_info.len()
3327        );
3328    }
3329
3330    async fn reopen_after_crypt_failure_inner(read_only: bool) {
3331        let fs = test_filesystem().await;
3332        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3333
3334        let store = {
3335            let crypt = Arc::new(InsecureCrypt::new());
3336            let store = root_volume
3337                .new_volume("vol", NO_OWNER, Some(crypt.clone()))
3338                .await
3339                .expect("new_volume failed");
3340            let root_directory = Directory::open(&store, store.root_directory_object_id())
3341                .await
3342                .expect("open failed");
3343            let mut transaction = fs
3344                .clone()
3345                .new_transaction(
3346                    lock_keys![LockKey::object(
3347                        store.store_object_id(),
3348                        root_directory.object_id()
3349                    )],
3350                    Options::default(),
3351                )
3352                .await
3353                .expect("new_transaction failed");
3354            root_directory
3355                .create_child_file(&mut transaction, "test")
3356                .await
3357                .expect("create_child_file failed");
3358            transaction.commit().await.expect("commit failed");
3359
3360            crypt.shutdown();
3361            let mut transaction = fs
3362                .clone()
3363                .new_transaction(
3364                    lock_keys![LockKey::object(
3365                        store.store_object_id(),
3366                        root_directory.object_id()
3367                    )],
3368                    Options::default(),
3369                )
3370                .await
3371                .expect("new_transaction failed");
3372            root_directory
3373                .create_child_file(&mut transaction, "test2")
3374                .await
3375                .map(|_| ())
3376                .expect_err("create_child_file should fail");
3377            store.lock().await.expect("lock failed");
3378            store
3379        };
3380
3381        let crypt = Arc::new(InsecureCrypt::new());
3382        if read_only {
3383            store.unlock_read_only(crypt).await.expect("unlock failed");
3384        } else {
3385            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3386        }
3387        let root_directory =
3388            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3389        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3390    }
3391
3392    #[fuchsia::test(threads = 10)]
3393    async fn test_reopen_after_crypt_failure() {
3394        reopen_after_crypt_failure_inner(false).await;
3395    }
3396
3397    #[fuchsia::test(threads = 10)]
3398    async fn test_reopen_read_only_after_crypt_failure() {
3399        reopen_after_crypt_failure_inner(true).await;
3400    }
3401
3402    #[fuchsia::test(threads = 10)]
3403    #[should_panic(expected = "Insufficient reservation space")]
3404    #[cfg(debug_assertions)]
3405    async fn large_transaction_causes_panic_in_debug_builds() {
3406        let fs = test_filesystem().await;
3407        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3408        let store = root_volume.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
3409        let root_directory =
3410            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3411        let mut transaction = fs
3412            .clone()
3413            .new_transaction(
3414                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
3415                Options::default(),
3416            )
3417            .await
3418            .expect("transaction");
3419        for i in 0..500 {
3420            root_directory
3421                .create_symlink(&mut transaction, b"link", &format!("{}", i))
3422                .await
3423                .expect("symlink");
3424        }
3425        assert_eq!(transaction.commit().await.expect("commit"), 0);
3426    }
3427
3428    #[fuchsia::test]
3429    async fn test_crypt_failure_does_not_fuse_journal() {
3430        let fs = test_filesystem().await;
3431
3432        struct Owner;
3433        #[async_trait]
3434        impl StoreOwner for Owner {
3435            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3436                store.lock().await
3437            }
3438        }
3439        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3440
3441        {
3442            // Create two stores and a record for each store, so the journal will need to flush them
3443            // both later.
3444            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3445            let store1 = root_volume
3446                .new_volume("vol1", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3447                .await
3448                .expect("new_volume failed");
3449            let crypt = Arc::new(InsecureCrypt::new());
3450            let store2 = root_volume
3451                .new_volume("vol2", Arc::downgrade(&owner), Some(crypt.clone()))
3452                .await
3453                .expect("new_volume failed");
3454            for store in [&store1, &store2] {
3455                let root_directory = Directory::open(store, store.root_directory_object_id())
3456                    .await
3457                    .expect("open failed");
3458                let mut transaction = fs
3459                    .clone()
3460                    .new_transaction(
3461                        lock_keys![LockKey::object(
3462                            store.store_object_id(),
3463                            root_directory.object_id()
3464                        )],
3465                        Options::default(),
3466                    )
3467                    .await
3468                    .expect("new_transaction failed");
3469                root_directory
3470                    .create_child_file(&mut transaction, "test")
3471                    .await
3472                    .expect("create_child_file failed");
3473                transaction.commit().await.expect("commit failed");
3474            }
3475            // Shut down the crypt instance for store2, and then compact.  Compaction should not
3476            // fail, and the store should become locked.
3477            crypt.shutdown();
3478            fs.journal().compact().await.expect("compact failed");
3479            // The store should now be locked.
3480            assert!(store2.is_locked());
3481        }
3482
3483        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
3484        // held in the journal.
3485        fs.close().await.expect("close failed");
3486        let device = fs.take_device().await;
3487        device.reopen(false);
3488        let fs = FxFilesystem::open(device).await.expect("open failed");
3489        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3490
3491        for volume_name in ["vol1", "vol2"] {
3492            let store = root_volume
3493                .volume(volume_name, NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3494                .await
3495                .expect("open volume failed");
3496            let root_directory = Directory::open(&store, store.root_directory_object_id())
3497                .await
3498                .expect("open failed");
3499            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3500        }
3501
3502        fs.close().await.expect("close failed");
3503    }
3504
3505    #[fuchsia::test]
3506    async fn test_crypt_failure_during_unlock_race() {
3507        let fs = test_filesystem().await;
3508
3509        struct Owner;
3510        #[async_trait]
3511        impl StoreOwner for Owner {
3512            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3513                store.lock().await
3514            }
3515        }
3516        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3517
3518        let store_object_id = {
3519            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3520            let store = root_volume
3521                .new_volume("vol", Arc::downgrade(&owner), Some(Arc::new(InsecureCrypt::new())))
3522                .await
3523                .expect("new_volume failed");
3524            let root_directory = Directory::open(&store, store.root_directory_object_id())
3525                .await
3526                .expect("open failed");
3527            let mut transaction = fs
3528                .clone()
3529                .new_transaction(
3530                    lock_keys![LockKey::object(
3531                        store.store_object_id(),
3532                        root_directory.object_id()
3533                    )],
3534                    Options::default(),
3535                )
3536                .await
3537                .expect("new_transaction failed");
3538            root_directory
3539                .create_child_file(&mut transaction, "test")
3540                .await
3541                .expect("create_child_file failed");
3542            transaction.commit().await.expect("commit failed");
3543            store.store_object_id()
3544        };
3545
3546        fs.close().await.expect("close failed");
3547        let device = fs.take_device().await;
3548        device.reopen(false);
3549
3550        let fs = FxFilesystem::open(device).await.expect("open failed");
3551        {
3552            let fs_clone = fs.clone();
3553            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3554
3555            let crypt = Arc::new(InsecureCrypt::new());
3556            let crypt_clone = crypt.clone();
3557            join!(
3558                async move {
3559                    // Unlock might fail, so ignore errors.
3560                    let _ =
3561                        root_volume.volume("vol", Arc::downgrade(&owner), Some(crypt_clone)).await;
3562                },
3563                async move {
3564                    // Block until unlock is finished but before flushing due to unlock is finished, to
3565                    // maximize the chances of weirdness.
3566                    let keys = lock_keys![LockKey::flush(store_object_id)];
3567                    let _ = fs_clone.lock_manager().write_lock(keys).await;
3568                    crypt.shutdown();
3569                }
3570            );
3571        }
3572
3573        fs.close().await.expect("close failed");
3574        let device = fs.take_device().await;
3575        device.reopen(false);
3576
3577        let fs = FxFilesystem::open(device).await.expect("open failed");
3578        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3579        let store = root_volume
3580            .volume("vol", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3581            .await
3582            .expect("open volume failed");
3583        let root_directory =
3584            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3585        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3586
3587        fs.close().await.expect("close failed");
3588    }
3589}