fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7mod data_object_handle;
8pub mod directory;
9mod extent_record;
10mod flush;
11pub mod graveyard;
12pub mod journal;
13mod key_manager;
14pub(crate) mod merge;
15pub mod object_manager;
16pub mod object_record;
17pub mod project_id;
18mod store_object_handle;
19pub mod transaction;
20mod tree;
21mod tree_cache;
22pub mod volume;
23
24pub use data_object_handle::{
25    DataObjectHandle, DirectWriter, FsverityState, FsverityStateInner, RangeType,
26};
27pub use directory::Directory;
28pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
29pub use store_object_handle::{
30    SetExtendedAttributeMode, StoreObjectHandle, EXTENDED_ATTRIBUTE_RANGE_END,
31    EXTENDED_ATTRIBUTE_RANGE_START,
32};
33
34use crate::errors::FxfsError;
35use crate::filesystem::{
36    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions, TruncateGuard, TxnGuard,
37    MAX_FILE_SIZE,
38};
39use crate::log::*;
40use crate::lsm_tree::cache::{NullCache, ObjectCache};
41use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
42use crate::lsm_tree::{LSMTree, Query};
43use crate::object_handle::{ObjectHandle, ObjectProperties, ReadObjectHandle, INVALID_OBJECT_ID};
44use crate::object_store::allocator::Allocator;
45use crate::object_store::graveyard::Graveyard;
46use crate::object_store::journal::{JournalCheckpoint, JournaledTransaction};
47use crate::object_store::key_manager::KeyManager;
48use crate::object_store::transaction::{
49    lock_keys, AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options,
50    Transaction,
51};
52use crate::range::RangeExt;
53use crate::round::round_up;
54use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, VersionedLatest};
55use anyhow::{anyhow, bail, ensure, Context, Error};
56use async_trait::async_trait;
57use base64::engine::general_purpose::URL_SAFE_NO_PAD as BASE64_URL_SAFE_NO_PAD;
58use base64::engine::Engine as _;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_inspect::ArrayProperty;
62use fuchsia_sync::Mutex;
63use fxfs_crypto::ff1::Ff1;
64use fxfs_crypto::{
65    Crypt, KeyPurpose, StreamCipher, UnwrappedKey, WrappedKey, WrappedKeyV32, WrappedKeyV40,
66    WrappedKeys,
67};
68use mundane::hash::{Digest, Hasher, Sha256};
69use once_cell::sync::OnceCell;
70use scopeguard::ScopeGuard;
71use serde::{Deserialize, Serialize};
72use std::fmt;
73use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
74use std::sync::{Arc, Weak};
75use storage_device::Device;
76use uuid::Uuid;
77
78pub use extent_record::{
79    ExtentKey, ExtentMode, ExtentValue, BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID,
80    FSVERITY_MERKLE_ATTRIBUTE_ID,
81};
82pub use object_record::{
83    AttributeKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, ObjectAttributes,
84    ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, ProjectProperty, RootDigest,
85};
86pub use transaction::Mutation;
87
88// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
89// attacks more difficult. This mask can be used to extract the hi part of the object ID.
90const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
91
92// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
93const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
94
95// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
96// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
97// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
98// key to encrypt large extended attributes. Thus, encrypted files and directories with large
99// xattrs will have both an fscrypt and volume data key.
100pub const VOLUME_DATA_KEY_ID: u64 = 0;
101pub const FSCRYPT_KEY_ID: u64 = 1;
102
103/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
104/// owner is required.
105pub const NO_OWNER: Weak<()> = Weak::new();
106impl StoreOwner for () {}
107
108#[async_trait]
109pub trait StoreOwner: Send + Sync {
110    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
111    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
112    /// be called when the store is not in use.
113    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
114        Err(anyhow!(FxfsError::Internal))
115    }
116}
117
118/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
119/// back to an ObjectStore.
120pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
121
122/// StoreInfo stores information about the object store.  This is stored within the parent object
123/// store, and is used, for example, to get the persistent layer objects.
124pub type StoreInfo = StoreInfoV40;
125
126#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
127pub struct StoreInfoV40 {
128    /// The globally unique identifier for the associated object store. If unset, will be all zero.
129    guid: [u8; 16],
130
131    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
132    /// last_object_id field is the one to use in that case.  Technically, this might not be the
133    /// last object ID used for the latest transaction that created an object because we use this at
134    /// the point of creating the object but before we commit the transaction.  Transactions can
135    /// then get committed in an arbitrary order (or not at all).
136    last_object_id: u64,
137
138    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
139    /// so we can support snapshots.
140    pub layers: Vec<u64>,
141
142    /// The object ID for the root directory.
143    root_directory_object_id: u64,
144
145    /// The object ID for the graveyard.
146    graveyard_directory_object_id: u64,
147
148    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
149    /// due to filesystem inconsistencies.
150    object_count: u64,
151
152    /// The (wrapped) key that encrypted mutations should use.
153    mutations_key: Option<WrappedKeyV40>,
154
155    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
156    /// need to know the offset in the cipher stream to start it.
157    mutations_cipher_offset: u64,
158
159    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
160    /// mutations to an object. This is the object ID of that file if it exists.
161    pub encrypted_mutations_object_id: u64,
162
163    /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
164    /// reveal (such as the number of files in the system and the ordering of their creation in
165    /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
166    /// increment after 2^32 object IDs have been used and this allows us to roll the key.
167    object_id_key: Option<WrappedKeyV40>,
168
169    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
170    /// when the directory doesn't yet exist.
171    internal_directory_object_id: u64,
172}
173
174#[derive(Default, Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
175#[migrate_to_version(StoreInfoV40)]
176pub struct StoreInfoV36 {
177    guid: [u8; 16],
178    last_object_id: u64,
179    pub layers: Vec<u64>,
180    root_directory_object_id: u64,
181    graveyard_directory_object_id: u64,
182    object_count: u64,
183    mutations_key: Option<WrappedKeyV32>,
184    mutations_cipher_offset: u64,
185    pub encrypted_mutations_object_id: u64,
186    object_id_key: Option<WrappedKeyV32>,
187    internal_directory_object_id: u64,
188}
189
190#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
191#[migrate_to_version(StoreInfoV36)]
192pub struct StoreInfoV32 {
193    guid: [u8; 16],
194    last_object_id: u64,
195    pub layers: Vec<u64>,
196    root_directory_object_id: u64,
197    graveyard_directory_object_id: u64,
198    object_count: u64,
199    mutations_key: Option<WrappedKeyV32>,
200    mutations_cipher_offset: u64,
201    pub encrypted_mutations_object_id: u64,
202    object_id_key: Option<WrappedKeyV32>,
203}
204
205impl StoreInfo {
206    /// Create a new/default [`StoreInfo`] but with a newly generated GUID.
207    fn new_with_guid() -> Self {
208        let guid = Uuid::new_v4();
209        Self { guid: *guid.as_bytes(), ..Default::default() }
210    }
211
212    /// Returns the parent objects for this store.
213    pub fn parent_objects(&self) -> Vec<u64> {
214        // We should not include the ID of the store itself, since that should be referred to in the
215        // volume directory.
216        let mut objects = self.layers.to_vec();
217        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
218            objects.push(self.encrypted_mutations_object_id);
219        }
220        objects
221    }
222}
223
224// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
225// It will likely involve placing limits on the maximum number of layers.
226pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
227
228// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
229// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
230// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
231pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
232
233#[derive(Default)]
234pub struct HandleOptions {
235    /// If true, transactions used by this handle will skip journal space checks.
236    pub skip_journal_checks: bool,
237    /// If true, data written to any attribute of this handle will not have per-block checksums
238    /// computed.
239    pub skip_checksums: bool,
240}
241
242/// Parameters for encrypting a newly created object.
243pub struct ObjectEncryptionOptions {
244    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
245    /// This is necessary when keys are managed by another store; for example, the layer files
246    /// of a child store are objects in the root store, but they are encrypted with keys from the
247    /// child store.  Generally, most objects should have this set to `false`.
248    pub permanent: bool,
249    pub key_id: u64,
250    pub key: WrappedKey,
251    pub unwrapped_key: UnwrappedKey,
252}
253
254pub struct NewChildStoreOptions {
255    /// The owner of the store.
256    pub owner: Weak<dyn StoreOwner>,
257
258    /// The store is unencrypted if store is none.
259    pub crypt: Option<Arc<dyn Crypt>>,
260
261    /// Specifies the object ID in the root store to be used for the store.  If set to
262    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
263    pub object_id: u64,
264}
265
266impl Default for NewChildStoreOptions {
267    fn default() -> Self {
268        Self { owner: NO_OWNER, crypt: None, object_id: INVALID_OBJECT_ID }
269    }
270}
271
272pub type EncryptedMutations = EncryptedMutationsV40;
273
274#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
275pub struct EncryptedMutationsV40 {
276    // Information about the mutations are held here, but the actual encrypted data is held within
277    // data.  For each transaction, we record the checkpoint and the count of mutations within the
278    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
279    // mutations), and the version so that we can correctly decode the mutation after it has been
280    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
281    transactions: Vec<(JournalCheckpoint, u64)>,
282
283    // The encrypted mutations.
284    data: Vec<u8>,
285
286    // If the mutations key was rolled, this holds the offset in `data` where the new key should
287    // apply.
288    mutations_key_roll: Vec<(usize, WrappedKeyV40)>,
289}
290
291impl std::fmt::Debug for EncryptedMutations {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
293        f.debug_struct("EncryptedMutations")
294            .field("transactions", &self.transactions)
295            .field("len", &self.data.len())
296            .field(
297                "mutations_key_roll",
298                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
299            )
300            .finish()
301    }
302}
303
304impl Versioned for EncryptedMutations {
305    fn max_serialized_size() -> u64 {
306        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
307    }
308}
309
310impl EncryptedMutations {
311    fn from_replayed_mutations(
312        store_object_id: u64,
313        transactions: Vec<JournaledTransaction>,
314    ) -> Self {
315        let mut this = Self::default();
316        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
317            for (object_id, mutation) in non_root_mutations {
318                if store_object_id == object_id {
319                    if let Mutation::EncryptedObjectStore(data) = mutation {
320                        this.push(&checkpoint, data);
321                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
322                        this.mutations_key_roll.push((this.data.len(), key.into()));
323                    }
324                }
325            }
326        }
327        this
328    }
329
330    fn extend(&mut self, other: &EncryptedMutations) {
331        self.transactions.extend_from_slice(&other.transactions[..]);
332        self.mutations_key_roll.extend(
333            other
334                .mutations_key_roll
335                .iter()
336                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
337        );
338        self.data.extend_from_slice(&other.data[..]);
339    }
340
341    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
342        self.data.append(&mut data.into());
343        // If the checkpoint is the same as the last mutation we pushed, increment the count.
344        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
345            if last_checkpoint.file_offset == checkpoint.file_offset {
346                *count += 1;
347                return;
348            }
349        }
350        self.transactions.push((checkpoint.clone(), 1));
351    }
352}
353
354pub enum LockState {
355    Locked,
356    Unencrypted,
357    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
358
359    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
360    // performed on the store.
361    UnlockedReadOnly(Arc<dyn Crypt>),
362
363    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
364    // after locking the store).  The store cannot be unlocked.
365    Invalid,
366
367    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
368    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
369    Unknown,
370
371    // The store is in the process of being locked.  Whilst the store is being locked, the store
372    // isn't usable; assertions will trip if any mutations are applied.
373    Locking,
374
375    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
376    // it's in the Unlocked state.
377    Unlocking,
378}
379
380impl LockState {
381    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
382        if let Self::Unlocked { owner, .. } = self {
383            owner.upgrade()
384        } else {
385            None
386        }
387    }
388}
389
390impl fmt::Debug for LockState {
391    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
392        formatter.write_str(match self {
393            LockState::Locked => "Locked",
394            LockState::Unencrypted => "Unencrypted",
395            LockState::Unlocked { .. } => "Unlocked",
396            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
397            LockState::Invalid => "Invalid",
398            LockState::Unknown => "Unknown",
399            LockState::Locking => "Locking",
400            LockState::Unlocking => "Unlocking",
401        })
402    }
403}
404
405#[derive(Default)]
406struct LastObjectId {
407    // The *unencrypted* value of the last object ID.
408    id: u64,
409
410    // Encrypted stores will use a cipher to obfuscate the object ID.
411    cipher: Option<Ff1>,
412}
413
414impl LastObjectId {
415    // Returns true if a cipher is needed to generate new object IDs.
416    fn should_create_cipher(&self) -> bool {
417        self.id as u32 == u32::MAX
418    }
419
420    fn get_next_object_id(&mut self) -> u64 {
421        self.id += 1;
422        if let Some(cipher) = &self.cipher {
423            let hi = self.id & OBJECT_ID_HI_MASK;
424            assert_ne!(hi, INVALID_OBJECT_ID);
425            assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
426            hi | cipher.encrypt(self.id as u32) as u64
427        } else {
428            self.id
429        }
430    }
431}
432
433/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
434/// identifier.  And object store has to be backed by a parent object store (which stores metadata
435/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
436/// in-memory only.
437pub struct ObjectStore {
438    parent_store: Option<Arc<ObjectStore>>,
439    store_object_id: u64,
440    device: Arc<dyn Device>,
441    block_size: u64,
442    filesystem: Weak<FxFilesystem>,
443    // Lock ordering: This must be taken before `lock_state`.
444    store_info: Mutex<Option<StoreInfo>>,
445    tree: LSMTree<ObjectKey, ObjectValue>,
446
447    // When replaying the journal, the store cannot read StoreInfo until the whole journal
448    // has been replayed, so during that time, store_info_handle will be None and records
449    // just get sent to the tree. Once the journal has been replayed, we can open the store
450    // and load all the other layer information.
451    store_info_handle: OnceCell<DataObjectHandle<ObjectStore>>,
452
453    // The cipher to use for encrypted mutations, if this store is encrypted.
454    mutations_cipher: Mutex<Option<StreamCipher>>,
455
456    // Current lock state of the store.
457    // Lock ordering: This must be taken after `store_info`.
458    lock_state: Mutex<LockState>,
459    key_manager: KeyManager,
460
461    // Enable/disable tracing.
462    trace: AtomicBool,
463
464    // Informational counters for events occurring within the store.
465    counters: Mutex<ObjectStoreCounters>,
466
467    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
468    device_read_ops: AtomicU64,
469    device_write_ops: AtomicU64,
470    logical_read_ops: AtomicU64,
471    logical_write_ops: AtomicU64,
472
473    // Contains the last object ID and, optionally, a cipher to be used when generating new object
474    // IDs.
475    last_object_id: Mutex<LastObjectId>,
476
477    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
478    // invoked at the end of flush, while the write lock is still held.
479    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
480}
481
482#[derive(Clone, Default)]
483struct ObjectStoreCounters {
484    mutations_applied: u64,
485    mutations_dropped: u64,
486    num_flushes: u64,
487    last_flush_time: Option<std::time::SystemTime>,
488    persistent_layer_file_sizes: Vec<u64>,
489}
490
491impl ObjectStore {
492    fn new(
493        parent_store: Option<Arc<ObjectStore>>,
494        store_object_id: u64,
495        filesystem: Arc<FxFilesystem>,
496        store_info: Option<StoreInfo>,
497        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
498        mutations_cipher: Option<StreamCipher>,
499        lock_state: LockState,
500        last_object_id: LastObjectId,
501    ) -> Arc<ObjectStore> {
502        let device = filesystem.device();
503        let block_size = filesystem.block_size();
504        Arc::new(ObjectStore {
505            parent_store,
506            store_object_id,
507            device,
508            block_size,
509            filesystem: Arc::downgrade(&filesystem),
510            store_info: Mutex::new(store_info),
511            tree: LSMTree::new(merge::merge, object_cache),
512            store_info_handle: OnceCell::new(),
513            mutations_cipher: Mutex::new(mutations_cipher),
514            lock_state: Mutex::new(lock_state),
515            key_manager: KeyManager::new(),
516            trace: AtomicBool::new(false),
517            counters: Mutex::new(ObjectStoreCounters::default()),
518            device_read_ops: AtomicU64::new(0),
519            device_write_ops: AtomicU64::new(0),
520            logical_read_ops: AtomicU64::new(0),
521            logical_write_ops: AtomicU64::new(0),
522            last_object_id: Mutex::new(last_object_id),
523            flush_callback: Mutex::new(None),
524        })
525    }
526
527    fn new_empty(
528        parent_store: Option<Arc<ObjectStore>>,
529        store_object_id: u64,
530        filesystem: Arc<FxFilesystem>,
531        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
532    ) -> Arc<Self> {
533        Self::new(
534            parent_store,
535            store_object_id,
536            filesystem,
537            Some(StoreInfo::default()),
538            object_cache,
539            None,
540            LockState::Unencrypted,
541            LastObjectId::default(),
542        )
543    }
544
545    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
546    /// This should only be used from super block code.
547    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
548        ObjectStore {
549            parent_store: None,
550            store_object_id,
551            device,
552            block_size,
553            filesystem: Weak::<FxFilesystem>::new(),
554            store_info: Mutex::new(Some(StoreInfo::default())),
555            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
556            store_info_handle: OnceCell::new(),
557            mutations_cipher: Mutex::new(None),
558            lock_state: Mutex::new(LockState::Unencrypted),
559            key_manager: KeyManager::new(),
560            trace: AtomicBool::new(false),
561            counters: Mutex::new(ObjectStoreCounters::default()),
562            device_read_ops: AtomicU64::new(0),
563            device_write_ops: AtomicU64::new(0),
564            logical_read_ops: AtomicU64::new(0),
565            logical_write_ops: AtomicU64::new(0),
566            last_object_id: Mutex::new(LastObjectId::default()),
567            flush_callback: Mutex::new(None),
568        }
569    }
570
571    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
572    /// been created.
573    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
574        this.filesystem = Arc::downgrade(&filesystem);
575        this
576    }
577
578    /// Create a child store. It is a multi-step process:
579    ///
580    ///   1. Call `ObjectStore::new_child_store`.
581    ///   2. Register the store with the object-manager.
582    ///   3. Call `ObjectStore::create` to write the store-info.
583    ///
584    /// If the procedure fails, care must be taken to unregister store with the object-manager.
585    ///
586    /// The steps have to be separate because of lifetime issues when working with a transaction.
587    async fn new_child_store(
588        self: &Arc<Self>,
589        transaction: &mut Transaction<'_>,
590        options: NewChildStoreOptions,
591        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
592    ) -> Result<Arc<Self>, Error> {
593        let handle = if options.object_id != INVALID_OBJECT_ID {
594            let handle = ObjectStore::create_object_with_id(
595                self,
596                transaction,
597                options.object_id,
598                HandleOptions::default(),
599                None,
600            )
601            .await?;
602            self.update_last_object_id(options.object_id);
603            handle
604        } else {
605            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
606        };
607        let filesystem = self.filesystem();
608        let store = if let Some(crypt) = options.crypt {
609            let (wrapped_key, unwrapped_key) =
610                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
611            let (object_id_wrapped, object_id_unwrapped) =
612                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
613            Self::new(
614                Some(self.clone()),
615                handle.object_id(),
616                filesystem.clone(),
617                Some(StoreInfo {
618                    mutations_key: Some(wrapped_key),
619                    object_id_key: Some(object_id_wrapped),
620                    ..StoreInfo::new_with_guid()
621                }),
622                object_cache,
623                Some(StreamCipher::new(&unwrapped_key, 0)),
624                LockState::Unlocked { owner: options.owner, crypt },
625                LastObjectId {
626                    // We need to avoid accidentally getting INVALID_OBJECT_ID, so we set
627                    // the top 32 bits to a non-zero value.
628                    id: 1 << 32,
629                    cipher: Some(Ff1::new(&object_id_unwrapped)),
630                },
631            )
632        } else {
633            Self::new(
634                Some(self.clone()),
635                handle.object_id(),
636                filesystem.clone(),
637                Some(StoreInfo::new_with_guid()),
638                object_cache,
639                None,
640                LockState::Unencrypted,
641                LastObjectId::default(),
642            )
643        };
644        assert!(store.store_info_handle.set(handle).is_ok());
645        Ok(store)
646    }
647
648    /// Actually creates the store in a transaction.  This will also create a root directory and
649    /// graveyard directory for the store.  See `new_child_store` above.
650    async fn create<'a>(
651        self: &'a Arc<Self>,
652        transaction: &mut Transaction<'a>,
653    ) -> Result<(), Error> {
654        let buf = {
655            // Create a root directory and graveyard directory.
656            let graveyard_directory_object_id = Graveyard::create(transaction, &self);
657            let root_directory = Directory::create(transaction, &self, None).await?;
658
659            let serialized_info = {
660                let mut store_info = self.store_info.lock();
661                let store_info = store_info.as_mut().unwrap();
662
663                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
664                store_info.root_directory_object_id = root_directory.object_id();
665
666                let mut serialized_info = Vec::new();
667                store_info.serialize_with_version(&mut serialized_info)?;
668                serialized_info
669            };
670            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
671            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
672            buf
673        };
674
675        if self.filesystem().options().image_builder_mode.is_some() {
676            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
677            // asked to. New object stores will have their StoreInfo written when we compact in
678            // FxFilesystem::finalize().
679            Ok(())
680        } else {
681            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
682        }
683    }
684
685    pub fn set_trace(&self, trace: bool) {
686        let old_value = self.trace.swap(trace, Ordering::Relaxed);
687        if trace != old_value {
688            info!(store_id = self.store_object_id(), trace; "OS: trace",);
689        }
690    }
691
692    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
693    /// the end of flush, while the write lock is still held.
694    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
695        let mut flush_callback = self.flush_callback.lock();
696        *flush_callback = Some(Box::new(callback));
697    }
698
699    pub fn is_root(&self) -> bool {
700        if let Some(parent) = &self.parent_store {
701            parent.parent_store.is_none()
702        } else {
703            // The root parent store isn't the root store.
704            false
705        }
706    }
707
708    /// Populates an inspect node with store statistics.
709    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
710        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
711        let counters = self.counters.lock();
712        if let Some(store_info) = self.store_info() {
713            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
714        } else {
715            warn!("Can't access store_info; store is locked.");
716        };
717        root.record_uint("store_object_id", self.store_object_id);
718        root.record_uint("mutations_applied", counters.mutations_applied);
719        root.record_uint("mutations_dropped", counters.mutations_dropped);
720        root.record_uint("num_flushes", counters.num_flushes);
721        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
722            root.record_uint(
723                "last_flush_time_ms",
724                last_flush_time
725                    .duration_since(std::time::UNIX_EPOCH)
726                    .unwrap_or(std::time::Duration::ZERO)
727                    .as_millis()
728                    .try_into()
729                    .unwrap_or(0u64),
730            );
731        }
732        let sizes = root.create_uint_array(
733            "persistent_layer_file_sizes",
734            counters.persistent_layer_file_sizes.len(),
735        );
736        for i in 0..counters.persistent_layer_file_sizes.len() {
737            sizes.set(i, counters.persistent_layer_file_sizes[i]);
738        }
739        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
740        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
741        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
742        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
743
744        root.record(sizes);
745
746        let this = self.clone();
747        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
748    }
749
750    pub fn device(&self) -> &Arc<dyn Device> {
751        &self.device
752    }
753
754    pub fn block_size(&self) -> u64 {
755        self.block_size
756    }
757
758    pub fn filesystem(&self) -> Arc<FxFilesystem> {
759        self.filesystem.upgrade().unwrap()
760    }
761
762    pub fn store_object_id(&self) -> u64 {
763        self.store_object_id
764    }
765
766    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
767        &self.tree
768    }
769
770    pub fn root_directory_object_id(&self) -> u64 {
771        self.store_info.lock().as_ref().unwrap().root_directory_object_id
772    }
773
774    pub fn graveyard_directory_object_id(&self) -> u64 {
775        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
776    }
777
778    fn set_graveyard_directory_object_id(&self, oid: u64) {
779        assert_eq!(
780            std::mem::replace(
781                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
782                oid
783            ),
784            INVALID_OBJECT_ID
785        );
786    }
787
788    pub fn object_count(&self) -> u64 {
789        self.store_info.lock().as_ref().unwrap().object_count
790    }
791
792    pub fn key_manager(&self) -> &KeyManager {
793        &self.key_manager
794    }
795
796    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
797        self.parent_store.as_ref()
798    }
799
800    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
801    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
802        match &*self.lock_state.lock() {
803            LockState::Locked => panic!("Store is locked"),
804            LockState::Invalid
805            | LockState::Unencrypted
806            | LockState::Locking
807            | LockState::Unlocking => None,
808            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
809            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
810            LockState::Unknown => {
811                panic!("Store is of unknown lock state; has the journal been replayed yet?")
812            }
813        }
814    }
815
816    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
817        // Create the transaction first to use the object store lock.
818        let mut transaction = self
819            .filesystem()
820            .new_transaction(
821                lock_keys![LockKey::object(
822                    self.parent_store.as_ref().unwrap().store_object_id,
823                    self.store_object_id,
824                )],
825                Options::default(),
826            )
827            .await?;
828        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
829        if obj_id != INVALID_OBJECT_ID {
830            return Ok(obj_id);
831        }
832
833        // Need to create an internal directory.
834        let directory = Directory::create(&mut transaction, self, None).await?;
835
836        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
837        transaction.commit().await?;
838        Ok(directory.object_id())
839    }
840
841    /// Returns the file size for the object without opening the object.
842    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
843        let item = self
844            .tree
845            .find(&ObjectKey::attribute(
846                object_id,
847                DEFAULT_DATA_ATTRIBUTE_ID,
848                AttributeKey::Attribute,
849            ))
850            .await?
851            .ok_or(FxfsError::NotFound)?;
852        if let ObjectValue::Attribute { size, .. } = item.value {
853            Ok(size)
854        } else {
855            bail!(FxfsError::NotFile);
856        }
857    }
858
859    #[cfg(feature = "migration")]
860    pub fn last_object_id(&self) -> u64 {
861        self.last_object_id.lock().id
862    }
863
864    /// Bumps the unencrypted last object ID if `object_id` is greater than
865    /// the current maximum.
866    #[cfg(feature = "migration")]
867    pub fn maybe_bump_last_object_id(&self, object_id: u64) -> Result<(), Error> {
868        let mut last_object_id = self.last_object_id.lock();
869        if object_id > last_object_id.id {
870            ensure!(
871                object_id < (u32::MAX as u64) && last_object_id.cipher.is_none(),
872                "LastObjectId bump only valid for unencrypted inodes"
873            );
874            last_object_id.id = object_id;
875        }
876        Ok(())
877    }
878
879    /// Provides access to the allocator to mark a specific region of the device as allocated.
880    #[cfg(feature = "migration")]
881    pub async fn mark_allocated(
882        &self,
883        transaction: &mut Transaction<'_>,
884        store_object_id: u64,
885        device_range: std::ops::Range<u64>,
886    ) -> Result<(), Error> {
887        self.allocator().mark_allocated(transaction, store_object_id, device_range).await
888    }
889
890    /// `crypt` can be provided if the crypt service should be different to the default; see the
891    /// comment on create_object.  Users should avoid having more than one handle open for the same
892    /// object at the same time because they might get out-of-sync; there is no code that will
893    /// prevent this.  One example where this can cause an issue is if the object ends up using a
894    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
895    /// dropped when a handle is dropped, which will impact any other handles for the same object.
896    pub async fn open_object<S: HandleOwner>(
897        owner: &Arc<S>,
898        obj_id: u64,
899        options: HandleOptions,
900        crypt: Option<Arc<dyn Crypt>>,
901    ) -> Result<DataObjectHandle<S>, Error> {
902        let store = owner.as_ref().as_ref();
903        let mut fsverity_descriptor = None;
904        let mut overwrite_ranges = Vec::new();
905        let item = store
906            .tree
907            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
908            .await?
909            .ok_or(FxfsError::NotFound)?;
910
911        let (size, track_overwrite_extents) = match item.value {
912            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
913            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
914                fsverity_descriptor = Some(fsverity_metadata);
915                // We only track the overwrite extents in memory for writes, reads handle them
916                // implicitly, which means verified files (where the data won't change anymore)
917                // don't need to track them.
918                (size, false)
919            }
920            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
921        };
922
923        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
924
925        if track_overwrite_extents {
926            let layer_set = store.tree.layer_set();
927            let mut merger = layer_set.merger();
928            let mut iter = merger
929                .query(Query::FullRange(&ObjectKey::attribute(
930                    obj_id,
931                    DEFAULT_DATA_ATTRIBUTE_ID,
932                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
933                )))
934                .await?;
935            loop {
936                match iter.get() {
937                    Some(ItemRef {
938                        key:
939                            ObjectKey {
940                                object_id,
941                                data:
942                                    ObjectKeyData::Attribute(
943                                        attribute_id,
944                                        AttributeKey::Extent(ExtentKey { range }),
945                                    ),
946                            },
947                        value,
948                        ..
949                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
950                        match value {
951                            ObjectValue::Extent(ExtentValue::None)
952                            | ObjectValue::Extent(ExtentValue::Some {
953                                mode: ExtentMode::Raw,
954                                ..
955                            })
956                            | ObjectValue::Extent(ExtentValue::Some {
957                                mode: ExtentMode::Cow(_),
958                                ..
959                            }) => (),
960                            ObjectValue::Extent(ExtentValue::Some {
961                                mode: ExtentMode::OverwritePartial(_),
962                                ..
963                            })
964                            | ObjectValue::Extent(ExtentValue::Some {
965                                mode: ExtentMode::Overwrite,
966                                ..
967                            }) => overwrite_ranges.push(range.clone()),
968                            _ => bail!(anyhow!(FxfsError::Inconsistent)
969                                .context("open_object: Expected extent")),
970                        }
971                        iter.advance().await?;
972                    }
973                    _ => break,
974                }
975            }
976        }
977
978        // If a crypt service has been specified, it needs to be a permanent key because cached
979        // keys can only use the store's crypt service.
980        let permanent = if let Some(crypt) = crypt {
981            store
982                .key_manager
983                .get_keys(
984                    obj_id,
985                    crypt.as_ref(),
986                    &mut Some(async || store.get_keys(obj_id).await),
987                    /* permanent= */ true,
988                    /* force= */ false,
989                )
990                .await?;
991            true
992        } else {
993            false
994        };
995        let data_object_handle = DataObjectHandle::new(
996            owner.clone(),
997            obj_id,
998            permanent,
999            DEFAULT_DATA_ATTRIBUTE_ID,
1000            size,
1001            FsverityState::None,
1002            options,
1003            false,
1004            &overwrite_ranges,
1005        );
1006        if let Some(descriptor) = fsverity_descriptor {
1007            match data_object_handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await? {
1008                None => {
1009                    return Err(anyhow!(FxfsError::NotFound));
1010                }
1011                Some(data) => {
1012                    data_object_handle.set_fsverity_state_some(descriptor, data);
1013                }
1014            }
1015        }
1016        Ok(data_object_handle)
1017    }
1018
1019    pub async fn create_object_with_id<S: HandleOwner>(
1020        owner: &Arc<S>,
1021        transaction: &mut Transaction<'_>,
1022        object_id: u64,
1023        options: HandleOptions,
1024        encryption_options: Option<ObjectEncryptionOptions>,
1025    ) -> Result<DataObjectHandle<S>, Error> {
1026        debug_assert!(object_id != INVALID_OBJECT_ID);
1027        let store = owner.as_ref().as_ref();
1028        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1029        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1030        let now = Timestamp::now();
1031        transaction.add(
1032            store.store_object_id(),
1033            Mutation::insert_object(
1034                ObjectKey::object(object_id),
1035                ObjectValue::file(1, 0, now.clone(), now.clone(), now.clone(), now, 0, None),
1036            ),
1037        );
1038        let mut permanent_keys = false;
1039        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1040            encryption_options
1041        {
1042            permanent_keys = permanent;
1043            transaction.add(
1044                store.store_object_id(),
1045                Mutation::insert_object(
1046                    ObjectKey::keys(object_id),
1047                    ObjectValue::keys(EncryptionKeys::AES256XTS(WrappedKeys::from(vec![(
1048                        key_id, key,
1049                    )]))),
1050                ),
1051            );
1052            store.key_manager.insert(object_id, &vec![(key_id, Some(unwrapped_key))], permanent);
1053        }
1054        transaction.add(
1055            store.store_object_id(),
1056            Mutation::insert_object(
1057                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1058                // This is a new object so nothing has pre-allocated overwrite extents yet.
1059                ObjectValue::attribute(0, false),
1060            ),
1061        );
1062        Ok(DataObjectHandle::new(
1063            owner.clone(),
1064            object_id,
1065            permanent_keys,
1066            DEFAULT_DATA_ATTRIBUTE_ID,
1067            0,
1068            FsverityState::None,
1069            options,
1070            false,
1071            &[],
1072        ))
1073    }
1074
1075    /// Creates an object in the store.
1076    ///
1077    /// If the store is encrypted, the object will be automatically encrypted as well.
1078    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1079    /// otherwise the default data key is used.
1080    pub async fn create_object<S: HandleOwner>(
1081        owner: &Arc<S>,
1082        mut transaction: &mut Transaction<'_>,
1083        options: HandleOptions,
1084        wrapping_key_id: Option<u128>,
1085    ) -> Result<DataObjectHandle<S>, Error> {
1086        let store = owner.as_ref().as_ref();
1087        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1088        let crypt = store.crypt();
1089        let encryption_options = if let Some(crypt) = crypt {
1090            let key_id =
1091                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1092            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1093                crypt.create_key_with_id(object_id, wrapping_key_id).await?
1094            } else {
1095                crypt.create_key(object_id, KeyPurpose::Data).await?
1096            };
1097            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1098        } else {
1099            None
1100        };
1101        ObjectStore::create_object_with_id(
1102            owner,
1103            &mut transaction,
1104            object_id,
1105            options,
1106            encryption_options,
1107        )
1108        .await
1109    }
1110
1111    /// Creates an object using explicitly provided keys.
1112    ///
1113    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1114    /// For example, when layer files for a child store are created in the root store, but they must
1115    /// be encrypted using the child store's keys.  This method exists for that purpose.
1116    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1117        owner: &Arc<S>,
1118        mut transaction: &mut Transaction<'_>,
1119        object_id: u64,
1120        options: HandleOptions,
1121        key: WrappedKey,
1122        unwrapped_key: UnwrappedKey,
1123    ) -> Result<DataObjectHandle<S>, Error> {
1124        ObjectStore::create_object_with_id(
1125            owner,
1126            &mut transaction,
1127            object_id,
1128            options,
1129            Some(ObjectEncryptionOptions {
1130                permanent: true,
1131                key_id: VOLUME_DATA_KEY_ID,
1132                key,
1133                unwrapped_key,
1134            }),
1135        )
1136        .await
1137    }
1138
1139    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1140    /// object is moved into the graveyard and true is returned.
1141    pub async fn adjust_refs(
1142        &self,
1143        transaction: &mut Transaction<'_>,
1144        object_id: u64,
1145        delta: i64,
1146    ) -> Result<bool, Error> {
1147        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1148        let refs = if let ObjectValue::Object {
1149            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1150            ..
1151        } = &mut mutation.item.value
1152        {
1153            *refs =
1154                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1155            refs
1156        } else {
1157            bail!(FxfsError::NotFile);
1158        };
1159        if *refs == 0 {
1160            self.add_to_graveyard(transaction, object_id);
1161
1162            // We might still need to adjust the reference count if delta was something other than
1163            // -1.
1164            if delta != -1 {
1165                *refs = 1;
1166                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1167            }
1168            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1169            // objects in graveyard.
1170            Ok(true)
1171        } else {
1172            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1173            Ok(false)
1174        }
1175    }
1176
1177    // Purges an object that is in the graveyard.
1178    pub async fn tombstone_object(
1179        &self,
1180        object_id: u64,
1181        txn_options: Options<'_>,
1182    ) -> Result<(), Error> {
1183        self.key_manager.remove(object_id).await;
1184        let fs = self.filesystem();
1185        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1186        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1187    }
1188
1189    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1190    /// the graveyard when done.
1191    pub async fn trim(
1192        &self,
1193        object_id: u64,
1194        truncate_guard: &TruncateGuard<'_>,
1195    ) -> Result<(), Error> {
1196        // For the root and root parent store, we would need to use the metadata reservation which
1197        // we don't currently support, so assert that we're not those stores.
1198        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1199
1200        self.trim_or_tombstone(
1201            object_id,
1202            false,
1203            Options { borrow_metadata_space: true, ..Default::default() },
1204            truncate_guard,
1205        )
1206        .await
1207    }
1208
1209    /// Trims or tombstones an object.
1210    async fn trim_or_tombstone(
1211        &self,
1212        object_id: u64,
1213        for_tombstone: bool,
1214        txn_options: Options<'_>,
1215        _truncate_guard: &TruncateGuard<'_>,
1216    ) -> Result<(), Error> {
1217        let fs = self.filesystem();
1218        let mut next_attribute = Some(0);
1219        while let Some(attribute_id) = next_attribute.take() {
1220            let mut transaction = fs
1221                .clone()
1222                .new_transaction(
1223                    lock_keys![
1224                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1225                        LockKey::object(self.store_object_id, object_id),
1226                    ],
1227                    txn_options,
1228                )
1229                .await?;
1230
1231            match self
1232                .trim_some(
1233                    &mut transaction,
1234                    object_id,
1235                    attribute_id,
1236                    if for_tombstone {
1237                        TrimMode::Tombstone(TombstoneMode::Object)
1238                    } else {
1239                        TrimMode::UseSize
1240                    },
1241                )
1242                .await?
1243            {
1244                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1245                TrimResult::Done(None) => {
1246                    if for_tombstone
1247                        || matches!(
1248                            self.tree
1249                                .find(&ObjectKey::graveyard_entry(
1250                                    self.graveyard_directory_object_id(),
1251                                    object_id,
1252                                ))
1253                                .await?,
1254                            Some(Item { value: ObjectValue::Trim, .. })
1255                        )
1256                    {
1257                        self.remove_from_graveyard(&mut transaction, object_id);
1258                    }
1259                }
1260                TrimResult::Done(id) => next_attribute = id,
1261            }
1262
1263            if !transaction.mutations().is_empty() {
1264                transaction.commit().await?;
1265            }
1266        }
1267        Ok(())
1268    }
1269
1270    // Purges an object's attribute that is in the graveyard.
1271    pub async fn tombstone_attribute(
1272        &self,
1273        object_id: u64,
1274        attribute_id: u64,
1275        txn_options: Options<'_>,
1276    ) -> Result<(), Error> {
1277        let fs = self.filesystem();
1278        let mut trim_result = TrimResult::Incomplete;
1279        while matches!(trim_result, TrimResult::Incomplete) {
1280            let mut transaction = fs
1281                .clone()
1282                .new_transaction(
1283                    lock_keys![
1284                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1285                        LockKey::object(self.store_object_id, object_id),
1286                    ],
1287                    txn_options,
1288                )
1289                .await?;
1290            trim_result = self
1291                .trim_some(
1292                    &mut transaction,
1293                    object_id,
1294                    attribute_id,
1295                    TrimMode::Tombstone(TombstoneMode::Attribute),
1296                )
1297                .await?;
1298            if let TrimResult::Done(..) = trim_result {
1299                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1300            }
1301            if !transaction.mutations().is_empty() {
1302                transaction.commit().await?;
1303            }
1304        }
1305        Ok(())
1306    }
1307
1308    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1309    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1310    /// performs a read-modify-write on the sizes.
1311    pub async fn trim_some(
1312        &self,
1313        transaction: &mut Transaction<'_>,
1314        object_id: u64,
1315        attribute_id: u64,
1316        mode: TrimMode,
1317    ) -> Result<TrimResult, Error> {
1318        let layer_set = self.tree.layer_set();
1319        let mut merger = layer_set.merger();
1320
1321        let aligned_offset = match mode {
1322            TrimMode::FromOffset(offset) => {
1323                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1324            }
1325            TrimMode::Tombstone(..) => 0,
1326            TrimMode::UseSize => {
1327                let iter = merger
1328                    .query(Query::FullRange(&ObjectKey::attribute(
1329                        object_id,
1330                        attribute_id,
1331                        AttributeKey::Attribute,
1332                    )))
1333                    .await?;
1334                if let Some(item_ref) = iter.get() {
1335                    if item_ref.key.object_id != object_id {
1336                        return Ok(TrimResult::Done(None));
1337                    }
1338
1339                    if let ItemRef {
1340                        key:
1341                            ObjectKey {
1342                                data:
1343                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1344                                ..
1345                            },
1346                        value: ObjectValue::Attribute { size, .. },
1347                        ..
1348                    } = item_ref
1349                    {
1350                        // If we found a different attribute_id, return so we can get the
1351                        // right lock.
1352                        if *size_attribute_id != attribute_id {
1353                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1354                        }
1355                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1356                    } else {
1357                        // At time of writing, we should always see a size record or None here, but
1358                        // asserting here would be brittle so just skip to the the next attribute
1359                        // instead.
1360                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1361                    }
1362                } else {
1363                    // End of the tree.
1364                    return Ok(TrimResult::Done(None));
1365                }
1366            }
1367        };
1368
1369        // Loop over the extents and deallocate them.
1370        let mut iter = merger
1371            .query(Query::FullRange(&ObjectKey::from_extent(
1372                object_id,
1373                attribute_id,
1374                ExtentKey::search_key_from_offset(aligned_offset),
1375            )))
1376            .await?;
1377        let mut end = 0;
1378        let allocator = self.allocator();
1379        let mut result = TrimResult::Done(None);
1380        let mut deallocated = 0;
1381        let block_size = self.block_size;
1382
1383        while let Some(item_ref) = iter.get() {
1384            if item_ref.key.object_id != object_id {
1385                break;
1386            }
1387            if let ObjectKey {
1388                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1389                ..
1390            } = item_ref.key
1391            {
1392                if *extent_attribute_id != attribute_id {
1393                    result = TrimResult::Done(Some(*extent_attribute_id));
1394                    break;
1395                }
1396                if let (
1397                    AttributeKey::Extent(ExtentKey { range }),
1398                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1399                ) = (attribute_key, item_ref.value)
1400                {
1401                    let start = std::cmp::max(range.start, aligned_offset);
1402                    ensure!(start < range.end, FxfsError::Inconsistent);
1403                    let device_offset = device_offset
1404                        .checked_add(start - range.start)
1405                        .ok_or(FxfsError::Inconsistent)?;
1406                    end = range.end;
1407                    let len = end - start;
1408                    let device_range = device_offset..device_offset + len;
1409                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1410                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1411                    deallocated += len;
1412                    // Stop if the transaction is getting too big.
1413                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1414                        result = TrimResult::Incomplete;
1415                        break;
1416                    }
1417                }
1418            }
1419            iter.advance().await?;
1420        }
1421
1422        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1423            && matches!(result, TrimResult::Done(None));
1424        let finished_tombstone_attribute =
1425            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1426                && !matches!(result, TrimResult::Incomplete);
1427        let mut object_mutation = None;
1428        let nodes = if finished_tombstone_object { -1 } else { 0 };
1429        if nodes != 0 || deallocated != 0 {
1430            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1431            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1432                mutation.item.value
1433            {
1434                if project_id != 0 {
1435                    transaction.add(
1436                        self.store_object_id,
1437                        Mutation::merge_object(
1438                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1439                            ObjectValue::BytesAndNodes {
1440                                bytes: -i64::try_from(deallocated).unwrap(),
1441                                nodes,
1442                            },
1443                        ),
1444                    );
1445                }
1446                object_mutation = Some(mutation);
1447            } else {
1448                panic!("Inconsistent object type.");
1449            }
1450        }
1451
1452        // Deletion marker records *must* be merged so as to consume all other records for the
1453        // object.
1454        if finished_tombstone_object {
1455            transaction.add(
1456                self.store_object_id,
1457                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1458            );
1459        } else {
1460            if finished_tombstone_attribute {
1461                transaction.add(
1462                    self.store_object_id,
1463                    Mutation::merge_object(
1464                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1465                        ObjectValue::None,
1466                    ),
1467                );
1468            }
1469            if deallocated > 0 {
1470                let mut mutation = match object_mutation {
1471                    Some(mutation) => mutation,
1472                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1473                };
1474                transaction.add(
1475                    self.store_object_id,
1476                    Mutation::merge_object(
1477                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1478                        ObjectValue::deleted_extent(),
1479                    ),
1480                );
1481                // Update allocated size.
1482                if let ObjectValue::Object {
1483                    attributes: ObjectAttributes { allocated_size, .. },
1484                    ..
1485                } = &mut mutation.item.value
1486                {
1487                    // The only way for these to fail are if the volume is inconsistent.
1488                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1489                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1490                    })?;
1491                } else {
1492                    panic!("Unexpected object value");
1493                }
1494                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1495            }
1496        }
1497        Ok(result)
1498    }
1499
1500    /// Returns all objects that exist in the parent store that pertain to this object store.
1501    /// Note that this doesn't include the object_id of the store itself which is generally
1502    /// referenced externally.
1503    pub fn parent_objects(&self) -> Vec<u64> {
1504        assert!(self.store_info_handle.get().is_some());
1505        self.store_info.lock().as_ref().unwrap().parent_objects()
1506    }
1507
1508    /// Returns root objects for this store.
1509    pub fn root_objects(&self) -> Vec<u64> {
1510        let mut objects = Vec::new();
1511        let store_info = self.store_info.lock();
1512        let info = store_info.as_ref().unwrap();
1513        if info.root_directory_object_id != INVALID_OBJECT_ID {
1514            objects.push(info.root_directory_object_id);
1515        }
1516        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1517            objects.push(info.graveyard_directory_object_id);
1518        }
1519        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1520            objects.push(info.internal_directory_object_id);
1521        }
1522        objects
1523    }
1524
1525    pub fn store_info(&self) -> Option<StoreInfo> {
1526        self.store_info.lock().as_ref().cloned()
1527    }
1528
1529    /// Returns None if called during journal replay.
1530    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1531        self.store_info_handle.get().map(|h| h.object_id())
1532    }
1533
1534    /// Called to open a store, before replay of this store's mutations.
1535    async fn open(
1536        parent_store: &Arc<ObjectStore>,
1537        store_object_id: u64,
1538        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1539    ) -> Result<Arc<ObjectStore>, Error> {
1540        let handle =
1541            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1542                .await?;
1543
1544        let info = load_store_info(parent_store, store_object_id).await?;
1545        let is_encrypted = info.mutations_key.is_some();
1546
1547        let mut total_layer_size = 0;
1548        let last_object_id;
1549
1550        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1551
1552        // If the store is encrypted, we can't open the object tree layers now, but we need to
1553        // compute the size of the layers.
1554        if is_encrypted {
1555            for &oid in &info.layers {
1556                total_layer_size += parent_store.get_file_size(oid).await?;
1557            }
1558            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1559                total_layer_size += layer_size_from_encrypted_mutations_size(
1560                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1561                );
1562            }
1563            last_object_id = LastObjectId::default();
1564        } else {
1565            last_object_id = LastObjectId { id: info.last_object_id, cipher: None };
1566        }
1567
1568        let fs = parent_store.filesystem();
1569
1570        let store = ObjectStore::new(
1571            Some(parent_store.clone()),
1572            store_object_id,
1573            fs.clone(),
1574            if is_encrypted { None } else { Some(info) },
1575            object_cache,
1576            None,
1577            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1578            last_object_id,
1579        );
1580
1581        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1582
1583        if !is_encrypted {
1584            let object_tree_layer_object_ids =
1585                store.store_info.lock().as_ref().unwrap().layers.clone();
1586            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1587            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1588            store
1589                .tree
1590                .append_layers(object_layers)
1591                .await
1592                .context("Failed to read object store layers")?;
1593        }
1594
1595        fs.object_manager().update_reservation(
1596            store_object_id,
1597            tree::reservation_amount_from_layer_size(total_layer_size),
1598        );
1599
1600        Ok(store)
1601    }
1602
1603    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1604        load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
1605    }
1606
1607    async fn open_layers(
1608        &self,
1609        object_ids: impl std::iter::IntoIterator<Item = u64>,
1610        crypt: Option<Arc<dyn Crypt>>,
1611    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1612        let parent_store = self.parent_store.as_ref().unwrap();
1613        let mut handles = Vec::new();
1614        let mut sizes = Vec::new();
1615        for object_id in object_ids {
1616            let handle = ObjectStore::open_object(
1617                &parent_store,
1618                object_id,
1619                HandleOptions::default(),
1620                crypt.clone(),
1621            )
1622            .await
1623            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1624            sizes.push(handle.get_size());
1625            handles.push(handle);
1626        }
1627        self.counters.lock().persistent_layer_file_sizes = sizes;
1628        Ok(handles)
1629    }
1630
1631    /// Unlocks a store so that it is ready to be used.
1632    /// This is not thread-safe.
1633    pub async fn unlock(
1634        self: &Arc<Self>,
1635        owner: Weak<dyn StoreOwner>,
1636        crypt: Arc<dyn Crypt>,
1637    ) -> Result<(), Error> {
1638        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1639    }
1640
1641    /// Unlocks a store so that it is ready to be read from.
1642    /// The store will generally behave like it is still locked: when flushed, the store will
1643    /// write out its mutations into the encrypted mutations file, rather than directly updating
1644    /// the layer files of the object store.
1645    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1646    /// flush, although the store might still be flushed during other operations.
1647    /// This is not thread-safe.
1648    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1649        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1650    }
1651
1652    async fn unlock_inner(
1653        self: &Arc<Self>,
1654        owner: Weak<dyn StoreOwner>,
1655        crypt: Arc<dyn Crypt>,
1656        read_only: bool,
1657    ) -> Result<(), Error> {
1658        match &*self.lock_state.lock() {
1659            LockState::Locked => {}
1660            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1661            LockState::Invalid => bail!(FxfsError::Internal),
1662            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1663                bail!(FxfsError::AlreadyBound)
1664            }
1665            LockState::Unknown => panic!("Store was unlocked before replay"),
1666            LockState::Locking => panic!("Store is being locked"),
1667            LockState::Unlocking => panic!("Store is being unlocked"),
1668        }
1669        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1670        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1671        let fs = self.filesystem();
1672        let guard = fs.lock_manager().write_lock(keys).await;
1673
1674        let store_info = self.load_store_info().await?;
1675
1676        self.tree
1677            .append_layers(
1678                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1679            )
1680            .await
1681            .context("Failed to read object tree layer file contents")?;
1682
1683        let unwrapped_key = crypt
1684            .unwrap_key(store_info.mutations_key.as_ref().unwrap(), self.store_object_id)
1685            .await
1686            .context("Failed to unwrap mutations keys")?;
1687        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1688        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1689        // wraps, so we just use u32::MAX (the offset is u64).
1690        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1691        let mut mutations_cipher =
1692            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1693
1694        let wrapped_key = store_info.object_id_key.as_ref().ok_or(FxfsError::Inconsistent)?;
1695        let object_id_cipher =
1696            Ff1::new(&crypt.unwrap_key(wrapped_key, self.store_object_id).await?);
1697        {
1698            let mut last_object_id = self.last_object_id.lock();
1699            last_object_id.cipher = Some(object_id_cipher);
1700        }
1701        self.update_last_object_id(store_info.last_object_id);
1702
1703        // Apply the encrypted mutations.
1704        let mut mutations = {
1705            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1706                EncryptedMutations::default()
1707            } else {
1708                let parent_store = self.parent_store.as_ref().unwrap();
1709                let handle = ObjectStore::open_object(
1710                    &parent_store,
1711                    store_info.encrypted_mutations_object_id,
1712                    HandleOptions::default(),
1713                    None,
1714                )
1715                .await?;
1716                let mut cursor = std::io::Cursor::new(
1717                    handle
1718                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1719                        .await
1720                        .context(FxfsError::Inconsistent)?,
1721                );
1722                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1723                    .context("Failed to deserialize EncryptedMutations")?
1724                    .0;
1725                let len = cursor.get_ref().len() as u64;
1726                while cursor.position() < len {
1727                    mutations.extend(
1728                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1729                            .context("Failed to deserialize EncryptedMutations")?
1730                            .0,
1731                    );
1732                }
1733                mutations
1734            }
1735        };
1736
1737        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
1738        let journaled = EncryptedMutations::from_replayed_mutations(
1739            self.store_object_id,
1740            fs.journal()
1741                .read_transactions_for_object(self.store_object_id)
1742                .await
1743                .context("Failed to read encrypted mutations from journal")?,
1744        );
1745        mutations.extend(&journaled);
1746
1747        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
1748        *self.store_info.lock() = Some(store_info);
1749
1750        // If we fail, clean up.
1751        let clean_up = scopeguard::guard((), |_| {
1752            *self.lock_state.lock() = LockState::Locked;
1753            *self.store_info.lock() = None;
1754            // Make sure we don't leave unencrypted data lying around in memory.
1755            self.tree.reset();
1756        });
1757
1758        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
1759
1760        let mut slice = &mut data[..];
1761        let mut last_offset = 0;
1762        for (offset, key) in mutations_key_roll {
1763            let split_offset = offset
1764                .checked_sub(last_offset)
1765                .ok_or(FxfsError::Inconsistent)
1766                .context("Invalid mutation key roll offset")?;
1767            last_offset = offset;
1768            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
1769            let (old, new) = slice.split_at_mut(split_offset);
1770            mutations_cipher.decrypt(old);
1771            let unwrapped_key = crypt
1772                .unwrap_key(&key, self.store_object_id)
1773                .await
1774                .context("Failed to unwrap mutations keys")?;
1775            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
1776            slice = new;
1777        }
1778        mutations_cipher.decrypt(slice);
1779
1780        // Always roll the mutations key when we unlock which guarantees we won't reuse a
1781        // previous key and nonce.
1782        self.roll_mutations_key(crypt.as_ref()).await?;
1783
1784        let mut cursor = std::io::Cursor::new(data);
1785        for (checkpoint, count) in transactions {
1786            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
1787            for _ in 0..count {
1788                let mutation =
1789                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
1790                        .context("failed to deserialize encrypted mutation")?;
1791                self.apply_mutation(mutation, &context, AssocObj::None)
1792                    .context("failed to apply encrypted mutation")?;
1793            }
1794        }
1795
1796        *self.lock_state.lock() = if read_only {
1797            LockState::UnlockedReadOnly(crypt)
1798        } else {
1799            LockState::Unlocked { owner, crypt }
1800        };
1801
1802        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
1803        // it's possible for more writes to be queued and for the store to be locked before we can
1804        // flush anything and that can repeat.
1805        std::mem::drop(guard);
1806
1807        if !read_only && !self.filesystem().options().read_only {
1808            self.flush_with_reason(flush::Reason::Unlock).await?;
1809
1810            // Reap purged files within this store.
1811            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
1812        }
1813
1814        // Return and cancel the clean up.
1815        Ok(ScopeGuard::into_inner(clean_up))
1816    }
1817
1818    pub fn is_locked(&self) -> bool {
1819        matches!(
1820            *self.lock_state.lock(),
1821            LockState::Locked | LockState::Locking | LockState::Unknown
1822        )
1823    }
1824
1825    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
1826    /// true.
1827    pub fn is_unlocked(&self) -> bool {
1828        matches!(
1829            *self.lock_state.lock(),
1830            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
1831        )
1832    }
1833
1834    pub fn is_unknown(&self) -> bool {
1835        matches!(*self.lock_state.lock(), LockState::Unknown)
1836    }
1837
1838    pub fn is_encrypted(&self) -> bool {
1839        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
1840    }
1841
1842    // Locks a store.
1843    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
1844    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
1845    // Whilst this can return an error, the store will be placed into an unusable but safe state
1846    // (i.e. no lingering unencrypted data) if an error is encountered.
1847    pub async fn lock(&self) -> Result<(), Error> {
1848        // We must lock flushing since it is not safe for that to be happening whilst we are locking
1849        // the store.
1850        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1851        let fs = self.filesystem();
1852        let _guard = fs.lock_manager().write_lock(keys).await;
1853
1854        {
1855            let mut lock_state = self.lock_state.lock();
1856            if let LockState::Unlocked { .. } = &*lock_state {
1857                *lock_state = LockState::Locking;
1858            } else {
1859                panic!("Unexpected lock state: {:?}", &*lock_state);
1860            }
1861        }
1862
1863        // Sync the journal now to ensure that any buffered mutations for this store make it out to
1864        // disk.  This is necessary to be able to unlock the store again.
1865        // We need to establish a barrier at this point (so that the journaled writes are observable
1866        // by any future attempts to unlock the store), hence the flush_device.
1867        let sync_result =
1868            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
1869
1870        *self.lock_state.lock() = if let Err(error) = &sync_result {
1871            error!(error:?; "Failed to sync journal; store will no longer be usable");
1872            LockState::Invalid
1873        } else {
1874            LockState::Locked
1875        };
1876        self.key_manager.clear();
1877        *self.store_info.lock() = None;
1878        self.tree.reset();
1879
1880        sync_result
1881    }
1882
1883    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
1884    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
1885    // and will be replayed next time the store is unlocked.
1886    pub fn lock_read_only(&self) {
1887        *self.lock_state.lock() = LockState::Locked;
1888        *self.store_info.lock() = None;
1889        self.tree.reset();
1890    }
1891
1892    // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
1893    fn maybe_get_next_object_id(&self) -> u64 {
1894        let mut last_object_id = self.last_object_id.lock();
1895        if last_object_id.should_create_cipher() {
1896            INVALID_OBJECT_ID
1897        } else {
1898            last_object_id.get_next_object_id()
1899        }
1900    }
1901
1902    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
1903    ///
1904    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
1905    /// This transaction does not take the filesystem lock, hence `txn_guard`.
1906    pub async fn get_next_object_id(&self, txn_guard: &TxnGuard<'_>) -> Result<u64, Error> {
1907        let object_id = self.maybe_get_next_object_id();
1908        if object_id != INVALID_OBJECT_ID {
1909            return Ok(object_id);
1910        }
1911
1912        // Create a transaction (which has a lock) and then check again.
1913        let mut transaction = self
1914            .filesystem()
1915            .new_transaction(
1916                lock_keys![LockKey::object(
1917                    self.parent_store.as_ref().unwrap().store_object_id,
1918                    self.store_object_id,
1919                )],
1920                Options {
1921                    // We must skip journal checks because this transaction might be needed to
1922                    // compact.
1923                    skip_journal_checks: true,
1924                    borrow_metadata_space: true,
1925                    txn_guard: Some(txn_guard),
1926                    ..Default::default()
1927                },
1928            )
1929            .await?;
1930
1931        {
1932            let mut last_object_id = self.last_object_id.lock();
1933            if !last_object_id.should_create_cipher() {
1934                // We lost a race.
1935                return Ok(last_object_id.get_next_object_id());
1936            }
1937            // It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
1938            // happens, it's most likely due to corruption.
1939            ensure!(
1940                last_object_id.id & OBJECT_ID_HI_MASK != OBJECT_ID_HI_MASK,
1941                FxfsError::Inconsistent
1942            );
1943        }
1944
1945        // Create a key.
1946        let (object_id_wrapped, object_id_unwrapped) =
1947            self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
1948
1949        // Update StoreInfo.
1950        let buf = {
1951            let serialized_info = {
1952                let mut store_info = self.store_info.lock();
1953                let store_info = store_info.as_mut().unwrap();
1954                store_info.object_id_key = Some(object_id_wrapped);
1955                let mut serialized_info = Vec::new();
1956                store_info.serialize_with_version(&mut serialized_info)?;
1957                serialized_info
1958            };
1959            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
1960            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
1961            buf
1962        };
1963
1964        self.store_info_handle
1965            .get()
1966            .unwrap()
1967            .txn_write(&mut transaction, 0u64, buf.as_ref())
1968            .await?;
1969        transaction.commit().await?;
1970
1971        let mut last_object_id = self.last_object_id.lock();
1972        last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped));
1973        last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK;
1974
1975        Ok((last_object_id.id & OBJECT_ID_HI_MASK)
1976            | last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64)
1977    }
1978
1979    fn allocator(&self) -> Arc<Allocator> {
1980        self.filesystem().allocator()
1981    }
1982
1983    // If |transaction| has an impending mutation for the underlying object, returns that.
1984    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
1985    // mutation is returned here rather than the item because the mutation includes the operation
1986    // which has significance: inserting an object implies it's the first of its kind unlike
1987    // replacing an object.
1988    async fn txn_get_object_mutation(
1989        &self,
1990        transaction: &Transaction<'_>,
1991        object_id: u64,
1992    ) -> Result<ObjectStoreMutation, Error> {
1993        if let Some(mutation) =
1994            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
1995        {
1996            Ok(mutation.clone())
1997        } else {
1998            Ok(ObjectStoreMutation {
1999                item: self
2000                    .tree
2001                    .find(&ObjectKey::object(object_id))
2002                    .await?
2003                    .ok_or(FxfsError::Inconsistent)
2004                    .context("Object id missing")?,
2005                op: Operation::ReplaceOrInsert,
2006            })
2007        }
2008    }
2009
2010    /// Like txn_get_object_mutation but with expanded visibility.
2011    /// Only available in migration code.
2012    #[cfg(feature = "migration")]
2013    pub async fn get_object_mutation(
2014        &self,
2015        transaction: &Transaction<'_>,
2016        object_id: u64,
2017    ) -> Result<ObjectStoreMutation, Error> {
2018        self.txn_get_object_mutation(transaction, object_id).await
2019    }
2020
2021    fn update_last_object_id(&self, mut object_id: u64) {
2022        let mut last_object_id = self.last_object_id.lock();
2023        // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2024        if let Some(cipher) = &last_object_id.cipher {
2025            // If the object ID cipher has been rolled, then it's possible we might see object IDs
2026            // that were generated using a different cipher so the decrypt here will return the
2027            // wrong value, but that won't matter because the hi part of the object ID should still
2028            // discriminate.
2029            object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2030        }
2031        if object_id > last_object_id.id {
2032            last_object_id.id = object_id;
2033        }
2034    }
2035
2036    /// Adds the specified object to the graveyard.
2037    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2038        let graveyard_id = self.graveyard_directory_object_id();
2039        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2040        transaction.add(
2041            self.store_object_id,
2042            Mutation::replace_or_insert_object(
2043                ObjectKey::graveyard_entry(graveyard_id, object_id),
2044                ObjectValue::Some,
2045            ),
2046        );
2047    }
2048
2049    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2050    /// this because graveyard entries are used for purging deleted files *and* for trimming
2051    /// extents.  For example, consider the following sequence:
2052    ///
2053    ///     1. Add Trim graveyard entry.
2054    ///     2. Replace with Some graveyard entry (see above).
2055    ///     3. Remove graveyard entry.
2056    ///
2057    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2058    /// actually be:
2059    ///
2060    ///     3. Replace with Trim graveyard entry.
2061    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2062        transaction.add(
2063            self.store_object_id,
2064            Mutation::replace_or_insert_object(
2065                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2066                ObjectValue::None,
2067            ),
2068        );
2069    }
2070
2071    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2072    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2073    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2074    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2075    pub fn remove_attribute_from_graveyard(
2076        &self,
2077        transaction: &mut Transaction<'_>,
2078        object_id: u64,
2079        attribute_id: u64,
2080    ) {
2081        transaction.add(
2082            self.store_object_id,
2083            Mutation::replace_or_insert_object(
2084                ObjectKey::graveyard_attribute_entry(
2085                    self.graveyard_directory_object_id(),
2086                    object_id,
2087                    attribute_id,
2088                ),
2089                ObjectValue::None,
2090            ),
2091        );
2092    }
2093
2094    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2095    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2096        let (wrapped_key, unwrapped_key) =
2097            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2098
2099        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2100        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2101        // end up writing the wrong wrapped key.
2102        let mut cipher = self.mutations_cipher.lock();
2103        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2104        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2105        // mutations_cipher_offset is updated by flush.
2106        Ok(())
2107    }
2108
2109    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2110    // is identical to that which was passed in as the target on `create_symlink`.
2111    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2112    // get a standard length and then base64 encodes the hash and returns that to the caller.
2113    pub async fn read_encrypted_symlink(
2114        &self,
2115        object_id: u64,
2116        link: Vec<u8>,
2117    ) -> Result<Vec<u8>, Error> {
2118        let mut link = link;
2119        let key = self
2120            .key_manager()
2121            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2122                self.get_keys(object_id).await
2123            })
2124            .await?;
2125        if let Some(key) = key {
2126            key.decrypt_filename(object_id, &mut link)?;
2127            Ok(link)
2128        } else {
2129            let digest = Sha256::hash(&link).bytes();
2130            let encrypted_link = BASE64_URL_SAFE_NO_PAD.encode(&digest);
2131            Ok(encrypted_link.into())
2132        }
2133    }
2134
2135    /// Returns the link of a symlink object.
2136    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2137        match self.tree.find(&ObjectKey::object(object_id)).await? {
2138            None => bail!(FxfsError::NotFound),
2139            Some(Item {
2140                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2141                ..
2142            }) => self.read_encrypted_symlink(object_id, link).await,
2143            Some(Item {
2144                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2145                ..
2146            }) => Ok(link),
2147            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2148                .context(format!("Unexpected item in lookup: {item:?}"))),
2149        }
2150    }
2151
2152    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2153    /// will be considered an inconsistency if they don't.
2154    pub async fn get_keys(&self, object_id: u64) -> Result<WrappedKeys, Error> {
2155        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2156            Item { value: ObjectValue::Keys(EncryptionKeys::AES256XTS(keys)), .. } => Ok(keys),
2157            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2158        }
2159    }
2160
2161    pub async fn update_attributes<'a>(
2162        &self,
2163        transaction: &mut Transaction<'a>,
2164        object_id: u64,
2165        node_attributes: Option<&fio::MutableNodeAttributes>,
2166        change_time: Option<Timestamp>,
2167    ) -> Result<(), Error> {
2168        if change_time.is_none() {
2169            if let Some(attributes) = node_attributes {
2170                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2171                if *attributes == empty_attributes {
2172                    return Ok(());
2173                }
2174            } else {
2175                return Ok(());
2176            }
2177        }
2178        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2179        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2180            if let Some(time) = change_time {
2181                attributes.change_time = time;
2182            }
2183            if let Some(node_attributes) = node_attributes {
2184                if let Some(time) = node_attributes.creation_time {
2185                    attributes.creation_time = Timestamp::from_nanos(time);
2186                }
2187                if let Some(time) = node_attributes.modification_time {
2188                    attributes.modification_time = Timestamp::from_nanos(time);
2189                }
2190                if let Some(time) = node_attributes.access_time {
2191                    attributes.access_time = Timestamp::from_nanos(time);
2192                }
2193                if node_attributes.mode.is_some()
2194                    || node_attributes.uid.is_some()
2195                    || node_attributes.gid.is_some()
2196                    || node_attributes.rdev.is_some()
2197                {
2198                    if let Some(a) = &mut attributes.posix_attributes {
2199                        if let Some(mode) = node_attributes.mode {
2200                            a.mode = mode;
2201                        }
2202                        if let Some(uid) = node_attributes.uid {
2203                            a.uid = uid;
2204                        }
2205                        if let Some(gid) = node_attributes.gid {
2206                            a.gid = gid;
2207                        }
2208                        if let Some(rdev) = node_attributes.rdev {
2209                            a.rdev = rdev;
2210                        }
2211                    } else {
2212                        attributes.posix_attributes = Some(PosixAttributes {
2213                            mode: node_attributes.mode.unwrap_or_default(),
2214                            uid: node_attributes.uid.unwrap_or_default(),
2215                            gid: node_attributes.gid.unwrap_or_default(),
2216                            rdev: node_attributes.rdev.unwrap_or_default(),
2217                        });
2218                    }
2219                }
2220            }
2221        } else {
2222            bail!(anyhow!(FxfsError::Inconsistent)
2223                .context("ObjectStore.update_attributes: Expected object value"));
2224        };
2225        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2226        Ok(())
2227    }
2228
2229    // Updates and commits the changes to access time in ObjectProperties. The update matches
2230    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2231    // than or equal to the last modification or status change, or if it has been more than a day
2232    // since the last access.
2233    pub async fn update_access_time(
2234        &self,
2235        object_id: u64,
2236        props: &mut ObjectProperties,
2237    ) -> Result<(), Error> {
2238        let access_time = props.access_time.as_nanos();
2239        let modification_time = props.modification_time.as_nanos();
2240        let change_time = props.change_time.as_nanos();
2241        let now = Timestamp::now();
2242        if access_time <= modification_time
2243            || access_time <= change_time
2244            || access_time
2245                < now.as_nanos()
2246                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2247        {
2248            let mut transaction = self
2249                .filesystem()
2250                .clone()
2251                .new_transaction(
2252                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2253                    Options { borrow_metadata_space: true, ..Default::default() },
2254                )
2255                .await?;
2256            self.update_attributes(
2257                &mut transaction,
2258                object_id,
2259                Some(&fio::MutableNodeAttributes {
2260                    access_time: Some(now.as_nanos()),
2261                    ..Default::default()
2262                }),
2263                None,
2264            )
2265            .await?;
2266            transaction.commit().await?;
2267            props.access_time = now;
2268        }
2269        Ok(())
2270    }
2271}
2272
2273#[async_trait]
2274impl JournalingObject for ObjectStore {
2275    fn apply_mutation(
2276        &self,
2277        mutation: Mutation,
2278        context: &ApplyContext<'_, '_>,
2279        _assoc_obj: AssocObj<'_>,
2280    ) -> Result<(), Error> {
2281        match &*self.lock_state.lock() {
2282            LockState::Locked | LockState::Locking => {
2283                ensure!(
2284                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2285                        || matches!(
2286                            mutation,
2287                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2288                                if context.mode.is_replay()
2289                        ),
2290                    anyhow!(FxfsError::Inconsistent)
2291                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2292                );
2293            }
2294            LockState::Invalid
2295            | LockState::Unlocking
2296            | LockState::Unencrypted
2297            | LockState::Unlocked { .. }
2298            | LockState::UnlockedReadOnly(..) => {}
2299            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2300        }
2301        match mutation {
2302            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2303                item.sequence = context.checkpoint.file_offset;
2304                match op {
2305                    Operation::Insert => {
2306                        // If we are inserting an object record for the first time, it signifies the
2307                        // birth of the object so we need to adjust the object count.
2308                        if matches!(item.value, ObjectValue::Object { .. }) {
2309                            {
2310                                let info = &mut self.store_info.lock();
2311                                let object_count = &mut info.as_mut().unwrap().object_count;
2312                                *object_count = object_count.saturating_add(1);
2313                            }
2314                            if context.mode.is_replay() {
2315                                self.update_last_object_id(item.key.object_id);
2316                            }
2317                        }
2318                        self.tree.insert(item)?;
2319                    }
2320                    Operation::ReplaceOrInsert => {
2321                        self.tree.replace_or_insert(item);
2322                    }
2323                    Operation::Merge => {
2324                        if item.is_tombstone() {
2325                            let info = &mut self.store_info.lock();
2326                            let object_count = &mut info.as_mut().unwrap().object_count;
2327                            *object_count = object_count.saturating_sub(1);
2328                        }
2329                        let lower_bound = item.key.key_for_merge_into();
2330                        self.tree.merge_into(item, &lower_bound);
2331                    }
2332                }
2333            }
2334            Mutation::BeginFlush => {
2335                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2336                self.tree.seal();
2337            }
2338            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2339            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2340                // We will process these during Self::unlock.
2341                ensure!(
2342                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2343                    FxfsError::Inconsistent
2344                );
2345            }
2346            Mutation::CreateInternalDir(object_id) => {
2347                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2348                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2349            }
2350            _ => bail!("unexpected mutation: {:?}", mutation),
2351        }
2352        self.counters.lock().mutations_applied += 1;
2353        Ok(())
2354    }
2355
2356    fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
2357        self.counters.lock().mutations_dropped += 1;
2358    }
2359
2360    /// Push all in-memory structures to the device. This is not necessary for sync since the
2361    /// journal will take care of it.  This is supposed to be called when there is either memory or
2362    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2363    /// be trimmed).
2364    ///
2365    /// Also returns the earliest version of a struct in the filesystem (when known).
2366    async fn flush(&self) -> Result<Version, Error> {
2367        self.flush_with_reason(flush::Reason::Journal).await
2368    }
2369
2370    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2371        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2372        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2373        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2374        // won't be replayed after reading `StoreInfo`.
2375        match mutation {
2376            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2377            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2378            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2379            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2380            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2381            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2382            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2383            // encrypted mutations and handled them all in sequence during `unlock()`.
2384            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2385                let mut cipher = self.mutations_cipher.lock();
2386                if let Some(cipher) = cipher.as_mut() {
2387                    // If this is the first time we've used this key, we must write the key out.
2388                    if cipher.offset() == 0 {
2389                        writer.write(Mutation::update_mutations_key(
2390                            self.store_info
2391                                .lock()
2392                                .as_ref()
2393                                .unwrap()
2394                                .mutations_key
2395                                .as_ref()
2396                                .unwrap()
2397                                .clone(),
2398                        ));
2399                    }
2400                    let mut buffer = Vec::new();
2401                    mutation.serialize_into(&mut buffer).unwrap();
2402                    cipher.encrypt(&mut buffer);
2403                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2404                    return;
2405                }
2406            }
2407            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2408            // encrypted object stores, but are either the encrypted mutation data itself or
2409            // metadata governing how the data will be encrypted. They should only be produced here.
2410            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2411                debug_assert!(false, "Only this method should generate encrypted mutations");
2412            }
2413            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2414            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2415            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2416            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2417            Mutation::Allocator(_)
2418            | Mutation::BeginFlush
2419            | Mutation::EndFlush
2420            | Mutation::DeleteVolume
2421            | Mutation::UpdateBorrowed(_) => {}
2422        }
2423        writer.write(mutation.clone());
2424    }
2425}
2426
2427impl HandleOwner for ObjectStore {}
2428
2429impl AsRef<ObjectStore> for ObjectStore {
2430    fn as_ref(&self) -> &ObjectStore {
2431        self
2432    }
2433}
2434
2435fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2436    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2437    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2438    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2439    // return once the data has been written to layer files and <= than
2440    // reserved_space_from_journal_usage would use.  We can't just use
2441    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2442    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2443    size * 3
2444}
2445
2446impl AssociatedObject for ObjectStore {}
2447
2448/// Argument to the trim_some method.
2449#[derive(Debug)]
2450pub enum TrimMode {
2451    /// Trim extents beyond the current size.
2452    UseSize,
2453
2454    /// Trim extents beyond the supplied offset.
2455    FromOffset(u64),
2456
2457    /// Remove the object (or attribute) from the store once it is fully trimmed.
2458    Tombstone(TombstoneMode),
2459}
2460
2461/// Sets the mode for tombstoning (either at the object or attribute level).
2462#[derive(Debug)]
2463pub enum TombstoneMode {
2464    Object,
2465    Attribute,
2466}
2467
2468/// Result of the trim_some method.
2469#[derive(Debug)]
2470pub enum TrimResult {
2471    /// We reached the limit of the transaction and more extents might follow.
2472    Incomplete,
2473
2474    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2475    /// there is one.
2476    Done(Option<u64>),
2477}
2478
2479/// Loads store info.
2480pub async fn load_store_info(
2481    parent: &Arc<ObjectStore>,
2482    store_object_id: u64,
2483) -> Result<StoreInfo, Error> {
2484    let handle =
2485        ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
2486
2487    Ok(if handle.get_size() > 0 {
2488        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2489        let mut cursor = std::io::Cursor::new(serialized_info);
2490        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2491            .context("Failed to deserialize StoreInfo")?;
2492        store_info
2493    } else {
2494        // The store_info will be absent for a newly created and empty object store.
2495        StoreInfo::default()
2496    })
2497}
2498
2499#[cfg(test)]
2500mod tests {
2501    use super::{
2502        StoreInfo, DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID,
2503        MAX_STORE_INFO_SERIALIZED_SIZE, NO_OWNER, OBJECT_ID_HI_MASK,
2504    };
2505    use crate::errors::FxfsError;
2506    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2507    use crate::fsck::fsck;
2508    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2509    use crate::lsm_tree::Query;
2510    use crate::object_handle::{
2511        ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
2512    };
2513    use crate::object_store::directory::Directory;
2514    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2515    use crate::object_store::transaction::{lock_keys, Options};
2516    use crate::object_store::volume::root_volume;
2517    use crate::object_store::{
2518        FsverityMetadata, HandleOptions, LockKey, Mutation, ObjectStore, RootDigest, StoreOwner,
2519    };
2520    use crate::serialized_types::VersionedLatest;
2521    use assert_matches::assert_matches;
2522    use async_trait::async_trait;
2523    use fuchsia_async as fasync;
2524    use fuchsia_sync::Mutex;
2525    use futures::join;
2526    use fxfs_crypto::{Crypt, WrappedKey, WrappedKeyBytes, WRAPPED_KEY_SIZE};
2527    use fxfs_insecure_crypto::InsecureCrypt;
2528    use std::sync::Arc;
2529    use std::time::Duration;
2530    use storage_device::fake_device::FakeDevice;
2531    use storage_device::DeviceHolder;
2532
2533    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2534
2535    async fn test_filesystem() -> OpenFxFilesystem {
2536        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2537        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2538    }
2539
2540    #[fuchsia::test]
2541    async fn test_item_sequences() {
2542        let fs = test_filesystem().await;
2543        let object1;
2544        let object2;
2545        let object3;
2546        let mut transaction = fs
2547            .clone()
2548            .new_transaction(lock_keys![], Options::default())
2549            .await
2550            .expect("new_transaction failed");
2551        let store = fs.root_store();
2552        object1 = Arc::new(
2553            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2554                .await
2555                .expect("create_object failed"),
2556        );
2557        transaction.commit().await.expect("commit failed");
2558        let mut transaction = fs
2559            .clone()
2560            .new_transaction(lock_keys![], Options::default())
2561            .await
2562            .expect("new_transaction failed");
2563        object2 = Arc::new(
2564            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2565                .await
2566                .expect("create_object failed"),
2567        );
2568        transaction.commit().await.expect("commit failed");
2569
2570        fs.sync(SyncOptions::default()).await.expect("sync failed");
2571
2572        let mut transaction = fs
2573            .clone()
2574            .new_transaction(lock_keys![], Options::default())
2575            .await
2576            .expect("new_transaction failed");
2577        object3 = Arc::new(
2578            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2579                .await
2580                .expect("create_object failed"),
2581        );
2582        transaction.commit().await.expect("commit failed");
2583
2584        let layer_set = store.tree.layer_set();
2585        let mut merger = layer_set.merger();
2586        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2587        let mut sequences = [0u64; 3];
2588        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2589            if *object_id == object1.object_id() {
2590                sequences[0] = sequence;
2591            } else if *object_id == object2.object_id() {
2592                sequences[1] = sequence;
2593            } else if *object_id == object3.object_id() {
2594                sequences[2] = sequence;
2595            }
2596            iter.advance().await.expect("advance failed");
2597        }
2598
2599        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2600        // The last item came after a sync, so should be strictly greater.
2601        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2602        fs.close().await.expect("Close failed");
2603    }
2604
2605    #[fuchsia::test]
2606    async fn test_verified_file_with_verified_attribute() {
2607        let fs: OpenFxFilesystem = test_filesystem().await;
2608        let mut transaction = fs
2609            .clone()
2610            .new_transaction(lock_keys![], Options::default())
2611            .await
2612            .expect("new_transaction failed");
2613        let store = fs.root_store();
2614        let object = Arc::new(
2615            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2616                .await
2617                .expect("create_object failed"),
2618        );
2619
2620        transaction.add(
2621            store.store_object_id(),
2622            Mutation::replace_or_insert_object(
2623                ObjectKey::attribute(
2624                    object.object_id(),
2625                    DEFAULT_DATA_ATTRIBUTE_ID,
2626                    AttributeKey::Attribute,
2627                ),
2628                ObjectValue::verified_attribute(
2629                    0,
2630                    FsverityMetadata { root_digest: RootDigest::Sha256([0; 32]), salt: vec![] },
2631                ),
2632            ),
2633        );
2634
2635        transaction.add(
2636            store.store_object_id(),
2637            Mutation::replace_or_insert_object(
2638                ObjectKey::attribute(
2639                    object.object_id(),
2640                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2641                    AttributeKey::Attribute,
2642                ),
2643                ObjectValue::attribute(0, false),
2644            ),
2645        );
2646
2647        transaction.commit().await.unwrap();
2648
2649        let handle =
2650            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2651                .await
2652                .expect("open_object failed");
2653
2654        assert!(handle.is_verified_file());
2655
2656        fs.close().await.expect("Close failed");
2657    }
2658
2659    #[fuchsia::test]
2660    async fn test_verified_file_without_verified_attribute() {
2661        let fs: OpenFxFilesystem = test_filesystem().await;
2662        let mut transaction = fs
2663            .clone()
2664            .new_transaction(lock_keys![], Options::default())
2665            .await
2666            .expect("new_transaction failed");
2667        let store = fs.root_store();
2668        let object = Arc::new(
2669            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2670                .await
2671                .expect("create_object failed"),
2672        );
2673
2674        transaction.commit().await.unwrap();
2675
2676        let handle =
2677            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2678                .await
2679                .expect("open_object failed");
2680
2681        assert!(!handle.is_verified_file());
2682
2683        fs.close().await.expect("Close failed");
2684    }
2685
2686    #[fuchsia::test]
2687    async fn test_create_and_open_store() {
2688        let fs = test_filesystem().await;
2689        let store_id = {
2690            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2691            root_volume
2692                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2693                .await
2694                .expect("new_volume failed")
2695                .store_object_id()
2696        };
2697
2698        fs.close().await.expect("close failed");
2699        let device = fs.take_device().await;
2700        device.reopen(false);
2701        let fs = FxFilesystem::open(device).await.expect("open failed");
2702
2703        {
2704            let store = fs.object_manager().store(store_id).expect("store not found");
2705            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2706        }
2707        fs.close().await.expect("Close failed");
2708    }
2709
2710    #[fuchsia::test]
2711    async fn test_create_and_open_internal_dir() {
2712        let fs = test_filesystem().await;
2713        let dir_id;
2714        let store_id;
2715        {
2716            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2717            let store = root_volume
2718                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2719                .await
2720                .expect("new_volume failed");
2721            dir_id =
2722                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2723            store_id = store.store_object_id();
2724        }
2725
2726        fs.close().await.expect("close failed");
2727        let device = fs.take_device().await;
2728        device.reopen(false);
2729        let fs = FxFilesystem::open(device).await.expect("open failed");
2730
2731        {
2732            let store = fs.object_manager().store(store_id).expect("store not found");
2733            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2734            assert_eq!(
2735                dir_id,
2736                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2737            );
2738            let obj = store
2739                .tree()
2740                .find(&ObjectKey::object(dir_id))
2741                .await
2742                .expect("Searching tree for dir")
2743                .unwrap();
2744            assert_matches!(
2745                obj.value,
2746                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2747            );
2748        }
2749        fs.close().await.expect("Close failed");
2750    }
2751
2752    #[fuchsia::test]
2753    async fn test_create_and_open_internal_dir_unencrypted() {
2754        let fs = test_filesystem().await;
2755        let dir_id;
2756        let store_id;
2757        {
2758            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2759            let store =
2760                root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2761            dir_id =
2762                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2763            store_id = store.store_object_id();
2764        }
2765
2766        fs.close().await.expect("close failed");
2767        let device = fs.take_device().await;
2768        device.reopen(false);
2769        let fs = FxFilesystem::open(device).await.expect("open failed");
2770
2771        {
2772            let store = fs.object_manager().store(store_id).expect("store not found");
2773            assert_eq!(
2774                dir_id,
2775                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2776            );
2777            let obj = store
2778                .tree()
2779                .find(&ObjectKey::object(dir_id))
2780                .await
2781                .expect("Searching tree for dir")
2782                .unwrap();
2783            assert_matches!(
2784                obj.value,
2785                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2786            );
2787        }
2788        fs.close().await.expect("Close failed");
2789    }
2790
2791    #[fuchsia::test(threads = 10)]
2792    async fn test_old_layers_are_purged() {
2793        let fs = test_filesystem().await;
2794
2795        let store = fs.root_store();
2796        let mut transaction = fs
2797            .clone()
2798            .new_transaction(lock_keys![], Options::default())
2799            .await
2800            .expect("new_transaction failed");
2801        let object = Arc::new(
2802            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2803                .await
2804                .expect("create_object failed"),
2805        );
2806        transaction.commit().await.expect("commit failed");
2807
2808        store.flush().await.expect("flush failed");
2809
2810        let mut buf = object.allocate_buffer(5).await;
2811        buf.as_mut_slice().copy_from_slice(b"hello");
2812        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2813
2814        // Getting the layer-set should cause the flush to stall.
2815        let layer_set = store.tree().layer_set();
2816
2817        let done = Mutex::new(false);
2818        let mut object_id = 0;
2819
2820        join!(
2821            async {
2822                store.flush().await.expect("flush failed");
2823                assert!(*done.lock());
2824            },
2825            async {
2826                // This is a halting problem so all we can do is sleep.
2827                fasync::Timer::new(Duration::from_secs(1)).await;
2828                *done.lock() = true;
2829                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
2830                std::mem::drop(layer_set);
2831            }
2832        );
2833
2834        if let Err(e) = ObjectStore::open_object(
2835            &store.parent_store.as_ref().unwrap(),
2836            object_id,
2837            HandleOptions::default(),
2838            store.crypt(),
2839        )
2840        .await
2841        {
2842            assert!(FxfsError::NotFound.matches(&e));
2843        } else {
2844            panic!("open_object succeeded");
2845        }
2846    }
2847
2848    #[fuchsia::test]
2849    async fn test_tombstone_deletes_data() {
2850        let fs = test_filesystem().await;
2851        let root_store = fs.root_store();
2852        let child_id = {
2853            let mut transaction = fs
2854                .clone()
2855                .new_transaction(lock_keys![], Options::default())
2856                .await
2857                .expect("new_transaction failed");
2858            let child = ObjectStore::create_object(
2859                &root_store,
2860                &mut transaction,
2861                HandleOptions::default(),
2862                None,
2863            )
2864            .await
2865            .expect("create_object failed");
2866            transaction.commit().await.expect("commit failed");
2867
2868            // Allocate an extent in the file.
2869            let mut buffer = child.allocate_buffer(8192).await;
2870            buffer.as_mut_slice().fill(0xaa);
2871            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2872
2873            child.object_id()
2874        };
2875
2876        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2877
2878        // Let fsck check allocations.
2879        fsck(fs.clone()).await.expect("fsck failed");
2880    }
2881
2882    #[fuchsia::test]
2883    async fn test_tombstone_purges_keys() {
2884        let fs = test_filesystem().await;
2885        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2886        let store = root_volume
2887            .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2888            .await
2889            .expect("new_volume failed");
2890        let mut transaction = fs
2891            .clone()
2892            .new_transaction(lock_keys![], Options::default())
2893            .await
2894            .expect("new_transaction failed");
2895        let child =
2896            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2897                .await
2898                .expect("create_object failed");
2899        transaction.commit().await.expect("commit failed");
2900        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
2901        store
2902            .tombstone_object(child.object_id(), Options::default())
2903            .await
2904            .expect("tombstone_object failed");
2905        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
2906        fs.close().await.expect("close failed");
2907    }
2908
2909    #[fuchsia::test]
2910    async fn test_major_compaction_discards_unnecessary_records() {
2911        let fs = test_filesystem().await;
2912        let root_store = fs.root_store();
2913        let child_id = {
2914            let mut transaction = fs
2915                .clone()
2916                .new_transaction(lock_keys![], Options::default())
2917                .await
2918                .expect("new_transaction failed");
2919            let child = ObjectStore::create_object(
2920                &root_store,
2921                &mut transaction,
2922                HandleOptions::default(),
2923                None,
2924            )
2925            .await
2926            .expect("create_object failed");
2927            transaction.commit().await.expect("commit failed");
2928
2929            // Allocate an extent in the file.
2930            let mut buffer = child.allocate_buffer(8192).await;
2931            buffer.as_mut_slice().fill(0xaa);
2932            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2933
2934            child.object_id()
2935        };
2936
2937        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2938        {
2939            let layers = root_store.tree.layer_set();
2940            let mut merger = layers.merger();
2941            let iter = merger
2942                .query(Query::FullRange(&ObjectKey::object(child_id)))
2943                .await
2944                .expect("seek failed");
2945            // Find at least one object still in the tree.
2946            match iter.get() {
2947                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
2948                    if *object_id == child_id => {}
2949                _ => panic!("Objects should still be in the tree."),
2950            }
2951        }
2952        root_store.flush().await.expect("flush failed");
2953
2954        // There should be no records for the object.
2955        let layers = root_store.tree.layer_set();
2956        let mut merger = layers.merger();
2957        let iter = merger
2958            .query(Query::FullRange(&ObjectKey::object(child_id)))
2959            .await
2960            .expect("seek failed");
2961        match iter.get() {
2962            None => {}
2963            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
2964                assert_ne!(*object_id, child_id)
2965            }
2966        }
2967    }
2968
2969    #[fuchsia::test]
2970    async fn test_overlapping_extents_in_different_layers() {
2971        let fs = test_filesystem().await;
2972        let store = fs.root_store();
2973
2974        let mut transaction = fs
2975            .clone()
2976            .new_transaction(
2977                lock_keys![LockKey::object(
2978                    store.store_object_id(),
2979                    store.root_directory_object_id()
2980                )],
2981                Options::default(),
2982            )
2983            .await
2984            .expect("new_transaction failed");
2985        let root_directory =
2986            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2987        let object = root_directory
2988            .create_child_file(&mut transaction, "test")
2989            .await
2990            .expect("create_child_file failed");
2991        transaction.commit().await.expect("commit failed");
2992
2993        let buf = object.allocate_buffer(16384).await;
2994        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2995
2996        store.flush().await.expect("flush failed");
2997
2998        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
2999
3000        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3001        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3002        // overwrite both of those extents.
3003        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3004
3005        fsck(fs.clone()).await.expect("fsck failed");
3006    }
3007
3008    #[fuchsia::test(threads = 10)]
3009    async fn test_encrypted_mutations() {
3010        async fn one_iteration(
3011            fs: OpenFxFilesystem,
3012            crypt: Arc<dyn Crypt>,
3013            iteration: u64,
3014        ) -> OpenFxFilesystem {
3015            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3016                fs.close().await.expect("Close failed");
3017                let device = fs.take_device().await;
3018                device.reopen(false);
3019                FxFilesystem::open(device).await.expect("FS open failed")
3020            }
3021
3022            let fs = reopen(fs).await;
3023
3024            let (store_object_id, object_id) = {
3025                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3026                let store = root_volume
3027                    .volume("test", NO_OWNER, Some(crypt.clone()))
3028                    .await
3029                    .expect("volume failed");
3030
3031                let mut transaction = fs
3032                    .clone()
3033                    .new_transaction(
3034                        lock_keys![LockKey::object(
3035                            store.store_object_id(),
3036                            store.root_directory_object_id(),
3037                        )],
3038                        Options::default(),
3039                    )
3040                    .await
3041                    .expect("new_transaction failed");
3042                let root_directory = Directory::open(&store, store.root_directory_object_id())
3043                    .await
3044                    .expect("open failed");
3045                let object = root_directory
3046                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3047                    .await
3048                    .expect("create_child_file failed");
3049                transaction.commit().await.expect("commit failed");
3050
3051                let mut buf = object.allocate_buffer(1000).await;
3052                for i in 0..buf.len() {
3053                    buf.as_mut_slice()[i] = i as u8;
3054                }
3055                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3056
3057                (store.store_object_id(), object.object_id())
3058            };
3059
3060            let fs = reopen(fs).await;
3061
3062            let check_object = |fs: Arc<FxFilesystem>| {
3063                let crypt = crypt.clone();
3064                async move {
3065                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3066                    let volume = root_volume
3067                        .volume("test", NO_OWNER, Some(crypt))
3068                        .await
3069                        .expect("volume failed");
3070
3071                    let object = ObjectStore::open_object(
3072                        &volume,
3073                        object_id,
3074                        HandleOptions::default(),
3075                        None,
3076                    )
3077                    .await
3078                    .expect("open_object failed");
3079                    let mut buf = object.allocate_buffer(1000).await;
3080                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3081                    for i in 0..buf.len() {
3082                        assert_eq!(buf.as_slice()[i], i as u8);
3083                    }
3084                }
3085            };
3086
3087            check_object(fs.clone()).await;
3088
3089            let fs = reopen(fs).await;
3090
3091            // At this point the "test" volume is locked.  Before checking the object, flush the
3092            // filesystem.  This should leave a file with encrypted mutations.
3093            fs.object_manager().flush().await.expect("flush failed");
3094
3095            assert_ne!(
3096                fs.object_manager()
3097                    .store(store_object_id)
3098                    .unwrap()
3099                    .load_store_info()
3100                    .await
3101                    .expect("load_store_info failed")
3102                    .encrypted_mutations_object_id,
3103                INVALID_OBJECT_ID
3104            );
3105
3106            check_object(fs.clone()).await;
3107
3108            // Checking the object should have triggered a flush and so now there should be no
3109            // encrypted mutations object.
3110            assert_eq!(
3111                fs.object_manager()
3112                    .store(store_object_id)
3113                    .unwrap()
3114                    .load_store_info()
3115                    .await
3116                    .expect("load_store_info failed")
3117                    .encrypted_mutations_object_id,
3118                INVALID_OBJECT_ID
3119            );
3120
3121            let fs = reopen(fs).await;
3122
3123            fsck(fs.clone()).await.expect("fsck failed");
3124
3125            let fs = reopen(fs).await;
3126
3127            check_object(fs.clone()).await;
3128
3129            fs
3130        }
3131
3132        let mut fs = test_filesystem().await;
3133        let crypt = Arc::new(InsecureCrypt::new());
3134
3135        {
3136            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3137            let _store = root_volume
3138                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3139                .await
3140                .expect("new_volume failed");
3141        }
3142
3143        // Run a few iterations so that we test changes with the stream cipher offset.
3144        for i in 0..5 {
3145            fs = one_iteration(fs, crypt.clone(), i).await;
3146        }
3147    }
3148
3149    #[fuchsia::test(threads = 10)]
3150    async fn test_object_id_cipher_roll() {
3151        let fs = test_filesystem().await;
3152        let crypt = Arc::new(InsecureCrypt::new());
3153
3154        {
3155            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3156            let store = root_volume
3157                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3158                .await
3159                .expect("new_volume failed");
3160
3161            let store_info = store.store_info().unwrap();
3162
3163            // Hack the last object ID to force a roll of the object ID cipher.
3164            {
3165                let mut last_object_id = store.last_object_id.lock();
3166                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 1u64 << 32);
3167                last_object_id.id |= 0xffffffff;
3168            }
3169
3170            let mut transaction = fs
3171                .clone()
3172                .new_transaction(
3173                    lock_keys![LockKey::object(
3174                        store.store_object_id(),
3175                        store.root_directory_object_id()
3176                    )],
3177                    Options::default(),
3178                )
3179                .await
3180                .expect("new_transaction failed");
3181            let root_directory = Directory::open(&store, store.root_directory_object_id())
3182                .await
3183                .expect("open failed");
3184            let object = root_directory
3185                .create_child_file(&mut transaction, "test")
3186                .await
3187                .expect("create_child_file failed");
3188            transaction.commit().await.expect("commit failed");
3189
3190            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 2u64 << 32);
3191
3192            // Check that the key has been changed.
3193            assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
3194
3195            assert_eq!(store.last_object_id.lock().id, 2u64 << 32);
3196        };
3197
3198        fs.close().await.expect("Close failed");
3199        let device = fs.take_device().await;
3200        device.reopen(false);
3201        let fs = FxFilesystem::open(device).await.expect("open failed");
3202        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3203        let store =
3204            root_volume.volume("test", NO_OWNER, Some(crypt.clone())).await.expect("volume failed");
3205
3206        assert_eq!(store.last_object_id.lock().id, 2u64 << 32);
3207    }
3208
3209    #[fuchsia::test(threads = 10)]
3210    async fn test_lock_store() {
3211        let fs = test_filesystem().await;
3212        let crypt = Arc::new(InsecureCrypt::new());
3213
3214        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3215        let store = root_volume
3216            .new_volume("test", NO_OWNER, Some(crypt.clone()))
3217            .await
3218            .expect("new_volume failed");
3219        let mut transaction = fs
3220            .clone()
3221            .new_transaction(
3222                lock_keys![LockKey::object(
3223                    store.store_object_id(),
3224                    store.root_directory_object_id()
3225                )],
3226                Options::default(),
3227            )
3228            .await
3229            .expect("new_transaction failed");
3230        let root_directory =
3231            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3232        root_directory
3233            .create_child_file(&mut transaction, "test")
3234            .await
3235            .expect("create_child_file failed");
3236        transaction.commit().await.expect("commit failed");
3237        store.lock().await.expect("lock failed");
3238
3239        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3240        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3241    }
3242
3243    #[fuchsia::test(threads = 10)]
3244    async fn test_unlock_read_only() {
3245        let fs = test_filesystem().await;
3246        let crypt = Arc::new(InsecureCrypt::new());
3247
3248        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3249        let store = root_volume
3250            .new_volume("test", NO_OWNER, Some(crypt.clone()))
3251            .await
3252            .expect("new_volume failed");
3253        let mut transaction = fs
3254            .clone()
3255            .new_transaction(
3256                lock_keys![LockKey::object(
3257                    store.store_object_id(),
3258                    store.root_directory_object_id()
3259                )],
3260                Options::default(),
3261            )
3262            .await
3263            .expect("new_transaction failed");
3264        let root_directory =
3265            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3266        root_directory
3267            .create_child_file(&mut transaction, "test")
3268            .await
3269            .expect("create_child_file failed");
3270        transaction.commit().await.expect("commit failed");
3271        store.lock().await.expect("lock failed");
3272
3273        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
3274        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3275        store.lock_read_only();
3276        store.unlock_read_only(crypt).await.expect("unlock failed");
3277        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3278    }
3279
3280    #[fuchsia::test(threads = 10)]
3281    async fn test_key_rolled_when_unlocked() {
3282        let fs = test_filesystem().await;
3283        let crypt = Arc::new(InsecureCrypt::new());
3284
3285        let object_id;
3286        {
3287            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3288            let store = root_volume
3289                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3290                .await
3291                .expect("new_volume failed");
3292            let mut transaction = fs
3293                .clone()
3294                .new_transaction(
3295                    lock_keys![LockKey::object(
3296                        store.store_object_id(),
3297                        store.root_directory_object_id()
3298                    )],
3299                    Options::default(),
3300                )
3301                .await
3302                .expect("new_transaction failed");
3303            let root_directory = Directory::open(&store, store.root_directory_object_id())
3304                .await
3305                .expect("open failed");
3306            object_id = root_directory
3307                .create_child_file(&mut transaction, "test")
3308                .await
3309                .expect("create_child_file failed")
3310                .object_id();
3311            transaction.commit().await.expect("commit failed");
3312        }
3313
3314        fs.close().await.expect("Close failed");
3315        let mut device = fs.take_device().await;
3316
3317        // Repeatedly remount so that we can be sure that we can remount when there are many
3318        // mutations keys.
3319        for _ in 0..100 {
3320            device.reopen(false);
3321            let fs = FxFilesystem::open(device).await.expect("open failed");
3322            {
3323                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3324                let store = root_volume
3325                    .volume("test", NO_OWNER, Some(crypt.clone()))
3326                    .await
3327                    .expect("open_volume failed");
3328
3329                // The key should get rolled every time we unlock.
3330                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
3331
3332                // Make sure there's an encrypted mutation.
3333                let handle =
3334                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
3335                        .await
3336                        .expect("open_object failed");
3337                let buffer = handle.allocate_buffer(100).await;
3338                handle
3339                    .write_or_append(Some(0), buffer.as_ref())
3340                    .await
3341                    .expect("write_or_append failed");
3342            }
3343            fs.close().await.expect("Close failed");
3344            device = fs.take_device().await;
3345        }
3346    }
3347
3348    #[test]
3349    fn test_store_info_max_serialized_size() {
3350        let info = StoreInfo {
3351            guid: [0xff; 16],
3352            last_object_id: 0x1234567812345678,
3353            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
3354            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
3355            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
3356            // any size should fit.
3357            layers: vec![0x1234567812345678; 120],
3358            root_directory_object_id: 0x1234567812345678,
3359            graveyard_directory_object_id: 0x1234567812345678,
3360            object_count: 0x1234567812345678,
3361            mutations_key: Some(WrappedKey {
3362                wrapping_key_id: 0x1234567812345678,
3363                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
3364            }),
3365            mutations_cipher_offset: 0x1234567812345678,
3366            encrypted_mutations_object_id: 0x1234567812345678,
3367            object_id_key: Some(WrappedKey {
3368                wrapping_key_id: 0x1234567812345678,
3369                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
3370            }),
3371            internal_directory_object_id: INVALID_OBJECT_ID,
3372        };
3373        let mut serialized_info = Vec::new();
3374        info.serialize_with_version(&mut serialized_info).unwrap();
3375        assert!(
3376            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
3377            "{}",
3378            serialized_info.len()
3379        );
3380    }
3381
3382    async fn reopen_after_crypt_failure_inner(read_only: bool) {
3383        let fs = test_filesystem().await;
3384        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3385
3386        let store = {
3387            let crypt = Arc::new(InsecureCrypt::new());
3388            let store = root_volume
3389                .new_volume("vol", NO_OWNER, Some(crypt.clone()))
3390                .await
3391                .expect("new_volume failed");
3392            let root_directory = Directory::open(&store, store.root_directory_object_id())
3393                .await
3394                .expect("open failed");
3395            let mut transaction = fs
3396                .clone()
3397                .new_transaction(
3398                    lock_keys![LockKey::object(
3399                        store.store_object_id(),
3400                        root_directory.object_id()
3401                    )],
3402                    Options::default(),
3403                )
3404                .await
3405                .expect("new_transaction failed");
3406            root_directory
3407                .create_child_file(&mut transaction, "test")
3408                .await
3409                .expect("create_child_file failed");
3410            transaction.commit().await.expect("commit failed");
3411
3412            crypt.shutdown();
3413            let mut transaction = fs
3414                .clone()
3415                .new_transaction(
3416                    lock_keys![LockKey::object(
3417                        store.store_object_id(),
3418                        root_directory.object_id()
3419                    )],
3420                    Options::default(),
3421                )
3422                .await
3423                .expect("new_transaction failed");
3424            root_directory
3425                .create_child_file(&mut transaction, "test2")
3426                .await
3427                .map(|_| ())
3428                .expect_err("create_child_file should fail");
3429            store.lock().await.expect("lock failed");
3430            store
3431        };
3432
3433        let crypt = Arc::new(InsecureCrypt::new());
3434        if read_only {
3435            store.unlock_read_only(crypt).await.expect("unlock failed");
3436        } else {
3437            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3438        }
3439        let root_directory =
3440            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3441        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3442    }
3443
3444    #[fuchsia::test(threads = 10)]
3445    async fn test_reopen_after_crypt_failure() {
3446        reopen_after_crypt_failure_inner(false).await;
3447    }
3448
3449    #[fuchsia::test(threads = 10)]
3450    async fn test_reopen_read_only_after_crypt_failure() {
3451        reopen_after_crypt_failure_inner(true).await;
3452    }
3453
3454    #[fuchsia::test(threads = 10)]
3455    #[should_panic(expected = "Insufficient reservation space")]
3456    #[cfg(debug_assertions)]
3457    async fn large_transaction_causes_panic_in_debug_builds() {
3458        let fs = test_filesystem().await;
3459        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3460        let store = root_volume.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
3461        let root_directory =
3462            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3463        let mut transaction = fs
3464            .clone()
3465            .new_transaction(
3466                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
3467                Options::default(),
3468            )
3469            .await
3470            .expect("transaction");
3471        for i in 0..500 {
3472            root_directory
3473                .create_symlink(&mut transaction, b"link", &format!("{}", i))
3474                .await
3475                .expect("symlink");
3476        }
3477        assert_eq!(transaction.commit().await.expect("commit"), 0);
3478    }
3479
3480    #[fuchsia::test]
3481    async fn test_crypt_failure_does_not_fuse_journal() {
3482        let fs = test_filesystem().await;
3483
3484        struct Owner;
3485        #[async_trait]
3486        impl StoreOwner for Owner {
3487            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3488                store.lock().await
3489            }
3490        }
3491        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3492
3493        {
3494            // Create two stores and a record for each store, so the journal will need to flush them
3495            // both later.
3496            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3497            let store1 = root_volume
3498                .new_volume("vol1", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3499                .await
3500                .expect("new_volume failed");
3501            let crypt = Arc::new(InsecureCrypt::new());
3502            let store2 = root_volume
3503                .new_volume("vol2", Arc::downgrade(&owner), Some(crypt.clone()))
3504                .await
3505                .expect("new_volume failed");
3506            for store in [&store1, &store2] {
3507                let root_directory = Directory::open(store, store.root_directory_object_id())
3508                    .await
3509                    .expect("open failed");
3510                let mut transaction = fs
3511                    .clone()
3512                    .new_transaction(
3513                        lock_keys![LockKey::object(
3514                            store.store_object_id(),
3515                            root_directory.object_id()
3516                        )],
3517                        Options::default(),
3518                    )
3519                    .await
3520                    .expect("new_transaction failed");
3521                root_directory
3522                    .create_child_file(&mut transaction, "test")
3523                    .await
3524                    .expect("create_child_file failed");
3525                transaction.commit().await.expect("commit failed");
3526            }
3527            // Shut down the crypt instance for store2, and then compact.  Compaction should not
3528            // fail, and the store should become locked.
3529            crypt.shutdown();
3530            fs.journal().compact().await.expect("compact failed");
3531            // The store should now be locked.
3532            assert!(store2.is_locked());
3533        }
3534
3535        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
3536        // held in the journal.
3537        fs.close().await.expect("close failed");
3538        let device = fs.take_device().await;
3539        device.reopen(false);
3540        let fs = FxFilesystem::open(device).await.expect("open failed");
3541        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3542
3543        for volume_name in ["vol1", "vol2"] {
3544            let store = root_volume
3545                .volume(volume_name, NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3546                .await
3547                .expect("open volume failed");
3548            let root_directory = Directory::open(&store, store.root_directory_object_id())
3549                .await
3550                .expect("open failed");
3551            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3552        }
3553
3554        fs.close().await.expect("close failed");
3555    }
3556
3557    #[fuchsia::test]
3558    async fn test_crypt_failure_during_unlock_race() {
3559        let fs = test_filesystem().await;
3560
3561        struct Owner;
3562        #[async_trait]
3563        impl StoreOwner for Owner {
3564            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3565                store.lock().await
3566            }
3567        }
3568        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3569
3570        let store_object_id = {
3571            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3572            let store = root_volume
3573                .new_volume("vol", Arc::downgrade(&owner), Some(Arc::new(InsecureCrypt::new())))
3574                .await
3575                .expect("new_volume failed");
3576            let root_directory = Directory::open(&store, store.root_directory_object_id())
3577                .await
3578                .expect("open failed");
3579            let mut transaction = fs
3580                .clone()
3581                .new_transaction(
3582                    lock_keys![LockKey::object(
3583                        store.store_object_id(),
3584                        root_directory.object_id()
3585                    )],
3586                    Options::default(),
3587                )
3588                .await
3589                .expect("new_transaction failed");
3590            root_directory
3591                .create_child_file(&mut transaction, "test")
3592                .await
3593                .expect("create_child_file failed");
3594            transaction.commit().await.expect("commit failed");
3595            store.store_object_id()
3596        };
3597
3598        fs.close().await.expect("close failed");
3599        let device = fs.take_device().await;
3600        device.reopen(false);
3601
3602        let fs = FxFilesystem::open(device).await.expect("open failed");
3603        {
3604            let fs_clone = fs.clone();
3605            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3606
3607            let crypt = Arc::new(InsecureCrypt::new());
3608            let crypt_clone = crypt.clone();
3609            join!(
3610                async move {
3611                    // Unlock might fail, so ignore errors.
3612                    let _ =
3613                        root_volume.volume("vol", Arc::downgrade(&owner), Some(crypt_clone)).await;
3614                },
3615                async move {
3616                    // Block until unlock is finished but before flushing due to unlock is finished, to
3617                    // maximize the chances of weirdness.
3618                    let keys = lock_keys![LockKey::flush(store_object_id)];
3619                    let _ = fs_clone.lock_manager().write_lock(keys).await;
3620                    crypt.shutdown();
3621                }
3622            );
3623        }
3624
3625        fs.close().await.expect("close failed");
3626        let device = fs.take_device().await;
3627        device.reopen(false);
3628
3629        let fs = FxFilesystem::open(device).await.expect("open failed");
3630        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3631        let store = root_volume
3632            .volume("vol", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3633            .await
3634            .expect("open volume failed");
3635        let root_directory =
3636            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3637        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3638
3639        fs.close().await.expect("close failed");
3640    }
3641}