fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{fsck_volume_with_options, fsck_with_options, FsckOptions};
7use crate::log::*;
8use crate::object_store::allocator::{Allocator, Hold, Reservation};
9use crate::object_store::directory::Directory;
10use crate::object_store::graveyard::Graveyard;
11use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
12use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::transaction::{
15    self, lock_keys, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation,
16    ReadGuard, Transaction, WriteGuard, TRANSACTION_METADATA_MAX_AMOUNT,
17};
18use crate::object_store::volume::{root_volume, VOLUMES_DIRECTORY};
19use crate::object_store::{ObjectStore, NO_OWNER};
20use crate::range::RangeExt;
21use crate::serialized_types::{Version, LATEST_VERSION};
22use crate::{debug_assert_not_too_long, metrics};
23use anyhow::{bail, ensure, Context, Error};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use once_cell::sync::OnceCell;
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
41// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
42// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
43// needs to be excluded.
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47// The maximum number of transactions that can be in-flight at any time.
48const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
51// activity during boot is finished.  This is a rough heuristic and may need to change later if
52// performance is affected.
53const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55// After the initial trim, perform another trim every 24 hours.
56const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58/// Holds information on an Fxfs Filesystem
59pub struct Info {
60    pub total_bytes: u64,
61    pub used_bytes: u64,
62}
63
64pub type PostCommitHook =
65    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
66
67pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
68
69pub struct Options {
70    /// True if the filesystem is read-only.
71    pub read_only: bool,
72
73    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
74    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
75    /// size of unflushed journal contents).  This is exposed for testing purposes.
76    pub roll_metadata_key_byte_count: u64,
77
78    /// A callback that runs before every transaction is committed.  If this callback returns an
79    /// error then the transaction is failed with that error.
80    pub pre_commit_hook: PreCommitHook,
81
82    /// A callback that runs after every transaction has been committed.  This will be called whilst
83    /// a lock is held which will block more transactions from being committed.
84    pub post_commit_hook: PostCommitHook,
85
86    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
87    /// testing.
88    pub skip_initial_reap: bool,
89
90    // The first duration is how long after the filesystem has been mounted to perform an initial
91    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
92    // is done.
93    // Default values are (5 minutes, 24 hours).
94    pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
95
96    // If set, journal will not be used for writes. The user must call 'finalize' when finished.
97    // The provided superblock instance will be written upon finalize().
98    pub image_builder_mode: Option<SuperBlockInstance>,
99}
100
101impl Default for Options {
102    fn default() -> Self {
103        Options {
104            roll_metadata_key_byte_count: 128 * 1024 * 1024,
105            read_only: false,
106            pre_commit_hook: None,
107            post_commit_hook: None,
108            skip_initial_reap: false,
109            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
110            image_builder_mode: None,
111        }
112    }
113}
114
115/// The context in which a transaction is being applied.
116pub struct ApplyContext<'a, 'b> {
117    /// The mode indicates whether the transaction is being replayed.
118    pub mode: ApplyMode<'a, 'b>,
119
120    /// The transaction checkpoint for this mutation.
121    pub checkpoint: JournalCheckpoint,
122}
123
124/// A transaction can be applied during replay or on a live running system (in which case a
125/// transaction object will be available).
126pub enum ApplyMode<'a, 'b> {
127    Replay,
128    Live(&'a Transaction<'b>),
129}
130
131impl ApplyMode<'_, '_> {
132    pub fn is_replay(&self) -> bool {
133        matches!(self, ApplyMode::Replay)
134    }
135
136    pub fn is_live(&self) -> bool {
137        matches!(self, ApplyMode::Live(_))
138    }
139}
140
141/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
142/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
143#[async_trait]
144pub trait JournalingObject: Send + Sync {
145    /// This method get called when the transaction commits, which can either be during live
146    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
147    /// transaction will be None (See `super_block::read`).
148    fn apply_mutation(
149        &self,
150        mutation: Mutation,
151        context: &ApplyContext<'_, '_>,
152        assoc_obj: AssocObj<'_>,
153    ) -> Result<(), Error>;
154
155    /// Called when a transaction fails to commit.
156    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
157
158    /// Flushes in-memory changes to the device (to allow journal space to be freed).
159    ///
160    /// Also returns the earliest version of a struct in the filesystem.
161    async fn flush(&self) -> Result<Version, Error>;
162
163    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
164    /// gets written to the journal.
165    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
166        writer.write(mutation.clone());
167    }
168}
169
170#[derive(Default)]
171pub struct SyncOptions<'a> {
172    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
173    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
174    /// barrier, ensuring all previous journal writes are observable by future operations).
175    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
176    /// and it will return before the journal flush completes.  In other words, some journal
177    /// mutations may still be buffered in memory after this call returns.
178    pub flush_device: bool,
179
180    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
181    /// sync needs to proceed.
182    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
183}
184
185pub struct OpenFxFilesystem(Arc<FxFilesystem>);
186
187impl OpenFxFilesystem {
188    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
189    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
190    pub async fn take_device(self) -> DeviceHolder {
191        let fut = self.device.take_when_dropped();
192        std::mem::drop(self);
193        debug_assert_not_too_long!(fut)
194    }
195
196    /// Used to finalize a filesystem image when in image_builder_mode.
197    pub async fn finalize(&self) -> Result<(), Error> {
198        ensure!(
199            self.journal().image_builder_mode().is_some(),
200            "finalize() only valid in image_builder_mode."
201        );
202        self.journal().allocate_journal().await?;
203        self.journal().set_image_builder_mode(None);
204        self.journal().compact().await?;
205        Ok(())
206    }
207}
208
209impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
210    fn from(fs: Arc<FxFilesystem>) -> Self {
211        Self(fs)
212    }
213}
214
215impl Drop for OpenFxFilesystem {
216    fn drop(&mut self) {
217        if self.options.image_builder_mode.is_some()
218            && self.journal().image_builder_mode().is_some()
219        {
220            error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
221        }
222        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
223            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
224        }
225    }
226}
227
228impl std::ops::Deref for OpenFxFilesystem {
229    type Target = Arc<FxFilesystem>;
230
231    fn deref(&self) -> &Self::Target {
232        &self.0
233    }
234}
235
236pub struct FxFilesystemBuilder {
237    format: bool,
238    trace: bool,
239    options: Options,
240    journal_options: JournalOptions,
241    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
242    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
243    fsck_after_every_transaction: bool,
244}
245
246impl FxFilesystemBuilder {
247    pub fn new() -> Self {
248        Self {
249            format: false,
250            trace: false,
251            options: Options::default(),
252            journal_options: JournalOptions::default(),
253            on_new_allocator: None,
254            on_new_store: None,
255            fsck_after_every_transaction: false,
256        }
257    }
258
259    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
260    pub fn format(mut self, format: bool) -> Self {
261        self.format = format;
262        self
263    }
264
265    /// Enables or disables trace level logging. Defaults to `false`.
266    pub fn trace(mut self, trace: bool) -> Self {
267        self.trace = trace;
268        self
269    }
270
271    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
272    /// Incompatible with `format`.
273    pub fn read_only(mut self, read_only: bool) -> Self {
274        self.options.read_only = read_only;
275        self
276    }
277
278    /// For image building and in-place migration.
279    ///
280    /// This mode avoids the initial write of super blocks and skips the journal for all
281    /// transactions. The user *must* call `finalize()` before closing the filesystem to trigger
282    /// a compaction of in-memory data structures, a minimal journal and a write to one
283    /// superblock (as specified).
284    pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
285        self.options.image_builder_mode = mode;
286        self
287    }
288
289    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
290    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
291        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
292        self
293    }
294
295    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
296    pub fn pre_commit_hook(
297        mut self,
298        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
299    ) -> Self {
300        self.options.pre_commit_hook = Some(Box::new(hook));
301        self
302    }
303
304    /// Sets a callback that runs after every transaction has been committed. See
305    /// `Options::post_commit_hook`.
306    pub fn post_commit_hook(
307        mut self,
308        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
309    ) -> Self {
310        self.options.post_commit_hook = Some(Box::new(hook));
311        self
312    }
313
314    /// Sets whether to do an initial reap of the graveyard at mount time. See
315    /// `Options::skip_initial_reap`. Defaults to `false`.
316    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
317        self.options.skip_initial_reap = skip_initial_reap;
318        self
319    }
320
321    /// Sets the options for the journal.
322    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
323        self.journal_options = journal_options;
324        self
325    }
326
327    /// Sets a method to be called immediately after creating the allocator.
328    pub fn on_new_allocator(
329        mut self,
330        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
331    ) -> Self {
332        self.on_new_allocator = Some(Box::new(on_new_allocator));
333        self
334    }
335
336    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
337    pub fn on_new_store(
338        mut self,
339        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
340    ) -> Self {
341        self.on_new_store = Some(Box::new(on_new_store));
342        self
343    }
344
345    /// Enables or disables running fsck after every transaction. Defaults to `false`.
346    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
347        self.fsck_after_every_transaction = fsck_after_every_transaction;
348        self
349    }
350
351    pub fn trim_config(
352        mut self,
353        delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
354    ) -> Self {
355        self.options.trim_config = delay_and_interval;
356        self
357    }
358
359    /// Constructs an `FxFilesystem` object with the specified settings.
360    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
361        let read_only = self.options.read_only;
362        if self.format && read_only {
363            bail!("Cannot initialize a filesystem as read-only");
364        }
365
366        let objects = Arc::new(ObjectManager::new(self.on_new_store));
367        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
368
369        let image_builder_mode = self.options.image_builder_mode;
370
371        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
372        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
373        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
374
375        let mut fsck_after_every_transaction = None;
376        let mut filesystem_options = self.options;
377        if self.fsck_after_every_transaction {
378            let instance =
379                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
380            fsck_after_every_transaction = Some(instance.clone());
381            filesystem_options.post_commit_hook =
382                Some(Box::new(move || instance.clone().run().boxed()));
383        }
384
385        if !read_only && !self.format {
386            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
387            // before replay.
388            device.flush().await.context("Device flush failed")?;
389        }
390
391        let filesystem = Arc::new(FxFilesystem {
392            device,
393            block_size,
394            objects: objects.clone(),
395            journal,
396            commit_mutex: futures::lock::Mutex::new(()),
397            lock_manager: LockManager::new(),
398            flush_task: Mutex::new(None),
399            trim_task: Mutex::new(None),
400            closed: AtomicBool::new(true),
401            shutdown_event: Event::new(),
402            trace: self.trace,
403            graveyard: Graveyard::new(objects.clone()),
404            completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
405            options: filesystem_options,
406            in_flight_transactions: AtomicU64::new(0),
407            transaction_limit_event: Event::new(),
408        });
409
410        filesystem.journal().set_image_builder_mode(image_builder_mode);
411
412        filesystem.journal.set_trace(self.trace);
413        if self.format {
414            filesystem.journal.init_empty(filesystem.clone()).await?;
415            if image_builder_mode.is_none() {
416                // The filesystem isn't valid until superblocks are written but we want to defer
417                // that until last when migrating filesystems or building system images.
418                filesystem.journal.init_superblocks().await?;
419
420                // Start the graveyard's background reaping task.
421                filesystem.graveyard.clone().reap_async();
422            }
423
424            // Create the root volume directory.
425            let root_store = filesystem.root_store();
426            root_store.set_trace(self.trace);
427            let root_directory =
428                Directory::open(&root_store, root_store.root_directory_object_id())
429                    .await
430                    .context("Unable to open root volume directory")?;
431            let mut transaction = filesystem
432                .clone()
433                .new_transaction(
434                    lock_keys![LockKey::object(
435                        root_store.store_object_id(),
436                        root_directory.object_id()
437                    )],
438                    transaction::Options::default(),
439                )
440                .await?;
441            let volume_directory =
442                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
443            transaction.commit().await?;
444            objects.set_volume_directory(volume_directory);
445        } else {
446            filesystem
447                .journal
448                .replay(filesystem.clone(), self.on_new_allocator)
449                .await
450                .context("Journal replay failed")?;
451            filesystem.root_store().set_trace(self.trace);
452
453            if !read_only {
454                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
455                // that can trigger a flush which can add more entries to the graveyard which might
456                // get caught in the initial reap and cause objects to be prematurely tombstoned.
457                for store in objects.unlocked_stores() {
458                    filesystem.graveyard.initial_reap(&store).await?;
459                }
460            }
461        }
462
463        // This must be after we've formatted the filesystem; it will fail during format otherwise.
464        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
465            fsck_after_every_transaction
466                .fs
467                .set(Arc::downgrade(&filesystem))
468                .unwrap_or_else(|_| unreachable!());
469        }
470
471        filesystem.closed.store(false, Ordering::SeqCst);
472
473        if !read_only && image_builder_mode.is_none() {
474            // Start the background tasks.
475            filesystem.graveyard.clone().reap_async();
476
477            if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
478                filesystem.start_trim_task(delay, interval);
479            }
480        }
481
482        Ok(filesystem.into())
483    }
484}
485
486pub struct FxFilesystem {
487    block_size: u64,
488    objects: Arc<ObjectManager>,
489    journal: Arc<Journal>,
490    commit_mutex: futures::lock::Mutex<()>,
491    lock_manager: LockManager,
492    flush_task: Mutex<Option<fasync::Task<()>>>,
493    trim_task: Mutex<Option<fasync::Task<()>>>,
494    closed: AtomicBool,
495    // An event that is signalled when the filesystem starts to shut down.
496    shutdown_event: Event,
497    trace: bool,
498    graveyard: Arc<Graveyard>,
499    completed_transactions: UintProperty,
500    options: Options,
501
502    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
503    in_flight_transactions: AtomicU64,
504
505    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
506    // limit.
507    transaction_limit_event: Event,
508
509    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
510    // filesystem has dropped all other members first (Rust drops members in declaration order).
511    device: DeviceHolder,
512}
513
514#[fxfs_trace::trace]
515impl FxFilesystem {
516    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
517        FxFilesystemBuilder::new().format(true).open(device).await
518    }
519
520    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
521        FxFilesystemBuilder::new().open(device).await
522    }
523
524    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
525        self.objects.root_parent_store()
526    }
527
528    pub async fn close(&self) -> Result<(), Error> {
529        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
530        self.shutdown_event.notify(usize::MAX);
531        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
532        let trim_task = self.trim_task.lock().take();
533        if let Some(task) = trim_task {
534            debug_assert_not_too_long!(task);
535        }
536        self.journal.stop_compactions().await;
537        let sync_status =
538            self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
539        match &sync_status {
540            Ok(checkpoint) => info!(
541                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
542                 reservation_required={}, borrowed={})",
543                checkpoint.as_ref().unwrap().0.file_offset,
544                self.object_manager().metadata_reservation(),
545                self.object_manager().required_reservation(),
546                self.object_manager().borrowed_metadata_space(),
547            ),
548            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
549        }
550        self.journal.terminate();
551        let flush_task = self.flush_task.lock().take();
552        if let Some(task) = flush_task {
553            debug_assert_not_too_long!(task);
554        }
555        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
556        // crash instead of exiting gracefully.
557        self.device().close().await.context("Failed to close device")?;
558        sync_status.map(|_| ())
559    }
560
561    pub fn device(&self) -> Arc<dyn Device> {
562        Arc::clone(&self.device)
563    }
564
565    pub fn root_store(&self) -> Arc<ObjectStore> {
566        self.objects.root_store()
567    }
568
569    pub fn allocator(&self) -> Arc<Allocator> {
570        self.objects.allocator()
571    }
572
573    pub fn object_manager(&self) -> &Arc<ObjectManager> {
574        &self.objects
575    }
576
577    pub fn journal(&self) -> &Arc<Journal> {
578        &self.journal
579    }
580
581    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
582        self.journal.sync(options).await.map(|_| ())
583    }
584
585    pub fn block_size(&self) -> u64 {
586        self.block_size
587    }
588
589    pub fn get_info(&self) -> Info {
590        Info {
591            total_bytes: self.device.size(),
592            used_bytes: self.object_manager().allocator().get_used_bytes().0,
593        }
594    }
595
596    pub fn super_block_header(&self) -> SuperBlockHeader {
597        self.journal.super_block_header()
598    }
599
600    pub fn graveyard(&self) -> &Arc<Graveyard> {
601        &self.graveyard
602    }
603
604    pub fn trace(&self) -> bool {
605        self.trace
606    }
607
608    pub fn options(&self) -> &Options {
609        &self.options
610    }
611
612    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
613    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
614    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
615    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
616    /// by the same task since that can lead to deadlocks if another task tries to take a write
617    /// lock.
618    pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
619        TxnGuard::Owned(
620            self.lock_manager
621                .read_lock(lock_keys!(LockKey::Filesystem))
622                .await
623                .into_owned(self.clone()),
624        )
625    }
626
627    pub async fn new_transaction<'a>(
628        self: Arc<Self>,
629        locks: LockKeys,
630        options: transaction::Options<'a>,
631    ) -> Result<Transaction<'a>, Error> {
632        let guard = if let Some(guard) = options.txn_guard.as_ref() {
633            TxnGuard::Borrowed(guard)
634        } else {
635            self.txn_guard().await
636        };
637        Transaction::new(guard, options, locks).await
638    }
639
640    #[trace]
641    pub async fn commit_transaction(
642        &self,
643        transaction: &mut Transaction<'_>,
644        callback: &mut (dyn FnMut(u64) + Send),
645    ) -> Result<u64, Error> {
646        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
647            hook(transaction)?;
648        }
649        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
650        self.maybe_start_flush_task();
651        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
652        let journal_offset = if self.journal().image_builder_mode().is_some() {
653            let journal_checkpoint =
654                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
655            let maybe_mutation = self
656                .object_manager()
657                .apply_transaction(transaction, &journal_checkpoint)
658                .expect("Transactions must not fail in image_builder_mode");
659            if let Some(mutation) = maybe_mutation {
660                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
661                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
662                // metadata reservations. As we are image-building and not using the journal,
663                // we don't track this.
664            }
665            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
666            0
667        } else {
668            self.journal.commit(transaction).await?
669        };
670        self.completed_transactions.add(1);
671
672        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
673        // that except if there's a post-commit-hook (which there usually won't be).  We can
674        // consider changing this if we need to for performance, but we'd need to double check that
675        // callers don't depend on this.
676        callback(journal_offset);
677
678        if let Some(hook) = self.options.post_commit_hook.as_ref() {
679            hook().await;
680        }
681
682        Ok(journal_offset)
683    }
684
685    pub fn lock_manager(&self) -> &LockManager {
686        &self.lock_manager
687    }
688
689    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
690        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
691            self.sub_transaction();
692        }
693        // If we placed a hold for metadata space, return it now.
694        if let MetadataReservation::Hold(hold_amount) =
695            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
696        {
697            let hold = transaction
698                .allocator_reservation
699                .unwrap()
700                .reserve(0)
701                .expect("Zero should always succeed.");
702            hold.add(hold_amount);
703        }
704        self.objects.drop_transaction(transaction);
705        self.lock_manager.drop_transaction(transaction);
706    }
707
708    fn maybe_start_flush_task(&self) {
709        let mut flush_task = self.flush_task.lock();
710        if flush_task.is_none() {
711            let journal = self.journal.clone();
712            *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
713        }
714    }
715
716    // Returns the number of bytes trimmed.
717    async fn do_trim(&self) -> Result<usize, Error> {
718        const MAX_EXTENTS_PER_BATCH: usize = 8;
719        const MAX_EXTENT_SIZE: usize = 256 * 1024;
720        let mut offset = 0;
721        let mut bytes_trimmed = 0;
722        loop {
723            if self.closed.load(Ordering::Relaxed) {
724                info!("Filesystem is closed, nothing to trim");
725                return Ok(bytes_trimmed);
726            }
727            let allocator = self.allocator();
728            let trimmable_extents =
729                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
730            for device_range in trimmable_extents.extents() {
731                self.device.trim(device_range.clone()).await?;
732                bytes_trimmed += device_range.length()? as usize;
733            }
734            if let Some(device_range) = trimmable_extents.extents().last() {
735                offset = device_range.end;
736            } else {
737                break;
738            }
739        }
740        Ok(bytes_trimmed)
741    }
742
743    fn start_trim_task(
744        self: &Arc<Self>,
745        delay: std::time::Duration,
746        interval: std::time::Duration,
747    ) {
748        if !self.device.supports_trim() {
749            info!("Device does not support trim; not scheduling trimming");
750            return;
751        }
752        let this = self.clone();
753        let mut next_timer = delay;
754        *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
755            loop {
756                let shutdown_listener = this.shutdown_event.listen();
757                // Note that we need to check if the filesystem was closed after we start listening
758                // to the shutdown event, but before we start waiting on `timer`, because otherwise
759                // we might start listening on `shutdown_event` *after* the event was signaled, and
760                // so `shutdown_listener` will never fire, and this task will get stuck until
761                // `timer` expires.
762                if this.closed.load(Ordering::SeqCst) {
763                    return;
764                }
765                futures::select!(
766                    () = fasync::Timer::new(next_timer.clone()).fuse() => {},
767                    () = shutdown_listener.fuse() => return,
768                );
769                let start_time = std::time::Instant::now();
770                let res = this.do_trim().await;
771                let duration = std::time::Instant::now() - start_time;
772                next_timer = interval.clone();
773                match res {
774                    Ok(bytes_trimmed) => info!(
775                        "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
776                        {next_timer:?}",
777                    ),
778                    Err(e) => error!(e:?; "Failed to trim"),
779                }
780            }
781        }));
782    }
783
784    pub(crate) async fn reservation_for_transaction<'a>(
785        self: &Arc<Self>,
786        options: transaction::Options<'a>,
787    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
788        if self.options.image_builder_mode.is_some() {
789            // Image builder mode avoids the journal so reservation tracking for metadata overheads
790            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
791            return Ok((MetadataReservation::Borrowed, None, None));
792        }
793        if !options.skip_journal_checks {
794            self.maybe_start_flush_task();
795            self.journal.check_journal_space().await?;
796        }
797
798        // We support three options for metadata space reservation:
799        //
800        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
801        //      be used on the understanding that eventually, potentially after a full compaction,
802        //      there should be no net increase in space used.  For example, unlinking an object
803        //      should eventually decrease the amount of space used and setting most attributes
804        //      should not result in any change.
805        //
806        //   2. A reservation is provided in which case we'll place a hold on some of it for
807        //      metadata.
808        //
809        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
810        //      and will return NoSpace if that fails.
811        let mut hold = None;
812        let metadata_reservation = if options.borrow_metadata_space {
813            MetadataReservation::Borrowed
814        } else {
815            match options.allocator_reservation {
816                Some(reservation) => {
817                    hold = Some(
818                        reservation
819                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
820                            .ok_or(FxfsError::NoSpace)?,
821                    );
822                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
823                }
824                None => {
825                    let reservation = self
826                        .allocator()
827                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
828                        .ok_or(FxfsError::NoSpace)?;
829                    MetadataReservation::Reservation(reservation)
830                }
831            }
832        };
833        Ok((metadata_reservation, options.allocator_reservation, hold))
834    }
835
836    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
837        if skip_journal_checks {
838            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
839        } else {
840            let inc = || {
841                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
842                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
843                    match self.in_flight_transactions.compare_exchange_weak(
844                        in_flights,
845                        in_flights + 1,
846                        Ordering::Relaxed,
847                        Ordering::Relaxed,
848                    ) {
849                        Ok(_) => return true,
850                        Err(x) => in_flights = x,
851                    }
852                }
853                return false;
854            };
855            while !inc() {
856                let listener = self.transaction_limit_event.listen();
857                if inc() {
858                    break;
859                }
860                listener.await;
861            }
862        }
863    }
864
865    pub(crate) fn sub_transaction(&self) {
866        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
867        assert!(old != 0);
868        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
869            self.transaction_limit_event.notify(usize::MAX);
870        }
871    }
872
873    pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
874        let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
875        TruncateGuard(self.lock_manager().write_lock(keys).await)
876    }
877}
878
879pub enum TxnGuard<'a> {
880    Borrowed(&'a TxnGuard<'a>),
881    Owned(ReadGuard<'static>),
882}
883
884impl TxnGuard<'_> {
885    pub fn fs(&self) -> &Arc<FxFilesystem> {
886        match self {
887            TxnGuard::Borrowed(b) => b.fs(),
888            TxnGuard::Owned(o) => o.fs().unwrap(),
889        }
890    }
891}
892
893/// A wrapper around a guard that needs to be taken when truncating an object.
894#[allow(dead_code)]
895pub struct TruncateGuard<'a>(WriteGuard<'a>);
896
897/// Helper method for making a new filesystem.
898pub async fn mkfs(device: DeviceHolder) -> Result<(), Error> {
899    let fs = FxFilesystem::new_empty(device).await?;
900    fs.close().await
901}
902
903/// Helper method for making a new filesystem with a single named volume.
904/// This shouldn't be used in production; instead volumes should be created with the Volumes
905/// protocol.
906pub async fn mkfs_with_volume(
907    device: DeviceHolder,
908    volume_name: &str,
909    crypt: Option<Arc<dyn Crypt>>,
910) -> Result<(), Error> {
911    let fs = FxFilesystem::new_empty(device).await?;
912    {
913        // expect instead of propagating errors here, since otherwise we could drop |fs| before
914        // close is called, which leads to confusing and unrelated error messages.
915        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
916        root_volume.new_volume(volume_name, NO_OWNER, crypt).await.expect("Create volume failed");
917    }
918    fs.close().await?;
919    Ok(())
920}
921
922struct FsckAfterEveryTransaction {
923    fs: OnceCell<Weak<FxFilesystem>>,
924    old_hook: PostCommitHook,
925}
926
927impl FsckAfterEveryTransaction {
928    fn new(old_hook: PostCommitHook) -> Arc<Self> {
929        Arc::new(Self { fs: OnceCell::new(), old_hook })
930    }
931
932    async fn run(self: Arc<Self>) {
933        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
934            let options = FsckOptions {
935                fail_on_warning: true,
936                no_lock: true,
937                quiet: true,
938                ..Default::default()
939            };
940            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
941            let object_manager = fs.object_manager();
942            for store in object_manager.unlocked_stores() {
943                let store_id = store.store_object_id();
944                if !object_manager.is_system_store(store_id) {
945                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
946                        .await
947                        .expect("fsck_volume_with_options failed");
948                }
949            }
950        }
951        if let Some(old_hook) = self.old_hook.as_ref() {
952            old_hook().await;
953        }
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
960    use crate::fsck::{fsck, fsck_volume};
961    use crate::log::*;
962    use crate::lsm_tree::types::Item;
963    use crate::lsm_tree::Operation;
964    use crate::object_handle::{
965        ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
966    };
967    use crate::object_store::directory::{replace_child, Directory};
968    use crate::object_store::journal::super_block::SuperBlockInstance;
969    use crate::object_store::journal::JournalOptions;
970    use crate::object_store::transaction::{lock_keys, LockKey, Options};
971    use crate::object_store::volume::root_volume;
972    use crate::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, NO_OWNER};
973    use crate::range::RangeExt;
974    use fuchsia_async as fasync;
975    use fuchsia_sync::Mutex;
976    use futures::future::join_all;
977    use futures::stream::{FuturesUnordered, TryStreamExt};
978    use fxfs_insecure_crypto::InsecureCrypt;
979    use rustc_hash::FxHashMap as HashMap;
980    use std::ops::Range;
981    use std::sync::Arc;
982    use std::time::Duration;
983    use storage_device::fake_device::FakeDevice;
984    use storage_device::DeviceHolder;
985
986    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
987
988    #[fuchsia::test(threads = 10)]
989    async fn test_compaction() {
990        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
991
992        // If compaction is not working correctly, this test will run out of space.
993        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
994        let root_store = fs.root_store();
995        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
996            .await
997            .expect("open failed");
998
999        let mut tasks = Vec::new();
1000        for i in 0..2 {
1001            let mut transaction = fs
1002                .clone()
1003                .new_transaction(
1004                    lock_keys![LockKey::object(
1005                        root_store.store_object_id(),
1006                        root_directory.object_id()
1007                    )],
1008                    Options::default(),
1009                )
1010                .await
1011                .expect("new_transaction failed");
1012            let handle = root_directory
1013                .create_child_file(&mut transaction, &format!("{}", i))
1014                .await
1015                .expect("create_child_file failed");
1016            transaction.commit().await.expect("commit failed");
1017            tasks.push(fasync::Task::spawn(async move {
1018                const TEST_DATA: &[u8] = b"hello";
1019                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1020                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1021                for _ in 0..1500 {
1022                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1023                }
1024            }));
1025        }
1026        join_all(tasks).await;
1027        fs.sync(SyncOptions::default()).await.expect("sync failed");
1028
1029        fsck(fs.clone()).await.expect("fsck failed");
1030        fs.close().await.expect("Close failed");
1031    }
1032
1033    #[fuchsia::test(threads = 10)]
1034    async fn test_replay_is_identical() {
1035        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1036        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1037
1038        // Reopen the store, but set reclaim size to a very large value which will effectively
1039        // stop the journal from flushing and allows us to track all the mutations to the store.
1040        fs.close().await.expect("close failed");
1041        let device = fs.take_device().await;
1042        device.reopen(false);
1043
1044        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1045
1046        impl<K: Clone, V: Clone> Mutations<K, V> {
1047            fn new() -> Self {
1048                Mutations(Mutex::new(Vec::new()))
1049            }
1050
1051            fn push(&self, operation: Operation, item: &Item<K, V>) {
1052                self.0.lock().push((operation, item.clone()));
1053            }
1054        }
1055
1056        let open_fs = |device,
1057                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1058                       allocator_mutations: Arc<Mutations<_, _>>| async {
1059            FxFilesystemBuilder::new()
1060                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1061                .on_new_allocator(move |allocator| {
1062                    let allocator_mutations = allocator_mutations.clone();
1063                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1064                        allocator_mutations.push(op, item)
1065                    })));
1066                })
1067                .on_new_store(move |store| {
1068                    let mutations = Arc::new(Mutations::new());
1069                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1070                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1071                        mutations.push(op, item)
1072                    })));
1073                })
1074                .open(device)
1075                .await
1076                .expect("open failed")
1077        };
1078
1079        let allocator_mutations = Arc::new(Mutations::new());
1080        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1081        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1082
1083        let root_store = fs.root_store();
1084        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1085            .await
1086            .expect("open failed");
1087
1088        let mut transaction = fs
1089            .clone()
1090            .new_transaction(
1091                lock_keys![LockKey::object(
1092                    root_store.store_object_id(),
1093                    root_directory.object_id()
1094                )],
1095                Options::default(),
1096            )
1097            .await
1098            .expect("new_transaction failed");
1099        let object = root_directory
1100            .create_child_file(&mut transaction, "test")
1101            .await
1102            .expect("create_child_file failed");
1103        transaction.commit().await.expect("commit failed");
1104
1105        // Append some data.
1106        let buf = object.allocate_buffer(10000).await;
1107        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1108
1109        // Overwrite some data.
1110        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1111
1112        // Truncate.
1113        object.truncate(3000).await.expect("truncate failed");
1114
1115        // Delete the object.
1116        let mut transaction = fs
1117            .clone()
1118            .new_transaction(
1119                lock_keys![
1120                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1121                    LockKey::object(root_store.store_object_id(), object.object_id()),
1122                ],
1123                Options::default(),
1124            )
1125            .await
1126            .expect("new_transaction failed");
1127
1128        replace_child(&mut transaction, None, (&root_directory, "test"))
1129            .await
1130            .expect("replace_child failed");
1131
1132        transaction.commit().await.expect("commit failed");
1133
1134        // Finally tombstone the object.
1135        root_store
1136            .tombstone_object(object.object_id(), Options::default())
1137            .await
1138            .expect("tombstone failed");
1139
1140        // Now reopen and check that replay produces the same set of mutations.
1141        fs.close().await.expect("close failed");
1142
1143        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1144
1145        let device = fs.take_device().await;
1146        device.reopen(false);
1147
1148        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1149        let replayed_allocator_mutations = Arc::new(Mutations::new());
1150        let fs = open_fs(
1151            device,
1152            replayed_object_mutations.clone(),
1153            replayed_allocator_mutations.clone(),
1154        )
1155        .await;
1156
1157        let m1 = object_mutations.lock();
1158        let m2 = replayed_object_mutations.lock();
1159        assert_eq!(m1.len(), m2.len());
1160        for (store_id, mutations) in &*m1 {
1161            let mutations = mutations.0.lock();
1162            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1163            assert_eq!(mutations.len(), replayed.len());
1164            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1165                assert_eq!(op1, op2);
1166                assert_eq!(i1.key, i2.key);
1167                assert_eq!(i1.value, i2.value);
1168                assert_eq!(i1.sequence, i2.sequence);
1169            }
1170        }
1171
1172        let a1 = allocator_mutations.0.lock();
1173        let a2 = replayed_allocator_mutations.0.lock();
1174        assert_eq!(a1.len(), a2.len());
1175        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1176            assert_eq!(op1, op2);
1177            assert_eq!(i1.key, i2.key);
1178            assert_eq!(i1.value, i2.value);
1179            assert_eq!(i1.sequence, i2.sequence);
1180        }
1181
1182        assert_eq!(
1183            fs.object_manager().metadata_reservation().amount(),
1184            metadata_reservation_amount
1185        );
1186    }
1187
1188    #[fuchsia::test]
1189    async fn test_max_in_flight_transactions() {
1190        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1191        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1192
1193        let transactions = FuturesUnordered::new();
1194        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1195            transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1196        }
1197        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1198
1199        // Trying to create another one should be blocked.
1200        let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1201        assert!(futures::poll!(&mut fut).is_pending());
1202
1203        // Dropping one should allow it to proceed.
1204        transactions.pop();
1205
1206        assert!(futures::poll!(&mut fut).is_ready());
1207    }
1208
1209    // If run on a single thread, the trim tasks starve out other work.
1210    #[fuchsia::test(threads = 10)]
1211    async fn test_continuously_trim() {
1212        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1213        let fs = FxFilesystemBuilder::new()
1214            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1215            .format(true)
1216            .open(device)
1217            .await
1218            .expect("open failed");
1219        // Do a small sleep so trim has time to get going.
1220        fasync::Timer::new(Duration::from_millis(10)).await;
1221
1222        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1223        // regular usage isn't affected by trim.
1224        let root_store = fs.root_store();
1225        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1226            .await
1227            .expect("open failed");
1228        for _ in 0..100 {
1229            let mut transaction = fs
1230                .clone()
1231                .new_transaction(
1232                    lock_keys![LockKey::object(
1233                        root_store.store_object_id(),
1234                        root_directory.object_id()
1235                    )],
1236                    Options::default(),
1237                )
1238                .await
1239                .expect("new_transaction failed");
1240            let object = root_directory
1241                .create_child_file(&mut transaction, "test")
1242                .await
1243                .expect("create_child_file failed");
1244            transaction.commit().await.expect("commit failed");
1245
1246            {
1247                let buf = object.allocate_buffer(1024).await;
1248                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1249            }
1250            std::mem::drop(object);
1251
1252            let mut transaction = root_directory
1253                .acquire_context_for_replace(None, "test", true)
1254                .await
1255                .expect("acquire_context_for_replace failed")
1256                .transaction;
1257            replace_child(&mut transaction, None, (&root_directory, "test"))
1258                .await
1259                .expect("replace_child failed");
1260            transaction.commit().await.expect("commit failed");
1261        }
1262        fs.close().await.expect("close failed");
1263    }
1264
1265    #[fuchsia::test]
1266    async fn test_power_fail() {
1267        // This test randomly discards blocks, so we run it a few times to increase the chances
1268        // of catching an issue in a single run.
1269        for _ in 0..10 {
1270            let (store_id, device, test_file_object_id) = {
1271                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1272                let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1273                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1274
1275                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1276                    .await
1277                    .expect("sync failed");
1278
1279                let store = root_volume
1280                    .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1281                    .await
1282                    .expect("new_volume failed");
1283                let root_directory = Directory::open(&store, store.root_directory_object_id())
1284                    .await
1285                    .expect("open failed");
1286
1287                // Create a number of files with the goal of using up more than one journal block.
1288                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1289                    let fs = store.filesystem();
1290                    let root_directory = Directory::open(store, store.root_directory_object_id())
1291                        .await
1292                        .expect("open failed");
1293                    for i in 0..100 {
1294                        let mut transaction = fs
1295                            .clone()
1296                            .new_transaction(
1297                                lock_keys![LockKey::object(
1298                                    store.store_object_id(),
1299                                    store.root_directory_object_id()
1300                                )],
1301                                Options::default(),
1302                            )
1303                            .await
1304                            .expect("new_transaction failed");
1305                        root_directory
1306                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1307                            .await
1308                            .expect("create_child_file failed");
1309                        transaction.commit().await.expect("commit failed");
1310                    }
1311                }
1312
1313                // Create one batch of files.
1314                create_files(&store, "A").await;
1315
1316                // Create a file and write something to it.  This will make sure there's a
1317                // transaction present that includes a checksum.
1318                let mut transaction = fs
1319                    .clone()
1320                    .new_transaction(
1321                        lock_keys![LockKey::object(
1322                            store.store_object_id(),
1323                            store.root_directory_object_id()
1324                        )],
1325                        Options::default(),
1326                    )
1327                    .await
1328                    .expect("new_transaction failed");
1329                let object = root_directory
1330                    .create_child_file(&mut transaction, "test")
1331                    .await
1332                    .expect("create_child_file failed");
1333                transaction.commit().await.expect("commit failed");
1334
1335                let mut transaction =
1336                    object.new_transaction().await.expect("new_transaction failed");
1337                let mut buffer = object.allocate_buffer(4096).await;
1338                buffer.as_mut_slice().fill(0xed);
1339                object
1340                    .txn_write(&mut transaction, 0, buffer.as_ref())
1341                    .await
1342                    .expect("txn_write failed");
1343                transaction.commit().await.expect("commit failed");
1344
1345                // Create another batch of files.
1346                create_files(&store, "B").await;
1347
1348                // Sync the device, but don't flush the device. We want to do this so we can
1349                // randomly discard blocks below.
1350                fs.sync(SyncOptions::default()).await.expect("sync failed");
1351
1352                // When we call `sync` above on the filesystem, it will pad the journal so that it
1353                // will get written, but it doesn't wait for the write to occur.  We wait for a
1354                // short time here to give allow time for the journal to be written.  Adding timers
1355                // isn't great, but this test already isn't deterministic since we randomly discard
1356                // blocks.
1357                fasync::Timer::new(Duration::from_millis(10)).await;
1358
1359                (
1360                    store.store_object_id(),
1361                    fs.device().snapshot().expect("snapshot failed"),
1362                    object.object_id(),
1363                )
1364            };
1365
1366            // Randomly discard blocks since the last flush.  This simulates what might happen in
1367            // the case of power-loss.  This will be an uncontrolled unmount.
1368            device
1369                .discard_random_since_last_flush()
1370                .expect("discard_random_since_last_flush failed");
1371
1372            let fs = FxFilesystem::open(device).await.expect("open failed");
1373            fsck(fs.clone()).await.expect("fsck failed");
1374
1375            let mut check_test_file = false;
1376
1377            // If we replayed and the store exists (i.e. the transaction that created the store
1378            // made it out), start by running fsck on it.
1379            let object_id = if fs.object_manager().store(store_id).is_some() {
1380                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1381                    .await
1382                    .expect("fsck_volume failed");
1383
1384                // Now we want to create another file, unmount cleanly, and then finally check that
1385                // the new file exists.  This checks that we can continue to use the filesystem
1386                // after an unclean unmount.
1387                let store = root_volume(fs.clone())
1388                    .await
1389                    .expect("root_volume failed")
1390                    .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1391                    .await
1392                    .expect("volume failed");
1393
1394                let root_directory = Directory::open(&store, store.root_directory_object_id())
1395                    .await
1396                    .expect("open failed");
1397
1398                let mut transaction = fs
1399                    .clone()
1400                    .new_transaction(
1401                        lock_keys![LockKey::object(
1402                            store.store_object_id(),
1403                            store.root_directory_object_id()
1404                        )],
1405                        Options::default(),
1406                    )
1407                    .await
1408                    .expect("new_transaction failed");
1409                let object = root_directory
1410                    .create_child_file(&mut transaction, &format!("C"))
1411                    .await
1412                    .expect("create_child_file failed");
1413                transaction.commit().await.expect("commit failed");
1414
1415                // Write again to the test file if it exists.
1416                if let Ok(test_file) = ObjectStore::open_object(
1417                    &store,
1418                    test_file_object_id,
1419                    HandleOptions::default(),
1420                    None,
1421                )
1422                .await
1423                {
1424                    // Check it has the contents we expect.
1425                    let mut buffer = test_file.allocate_buffer(4096).await;
1426                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1427                    if bytes == 4096 {
1428                        let expected = [0xed; 4096];
1429                        assert_eq!(buffer.as_slice(), &expected);
1430                    } else {
1431                        // If the write didn't make it, the file should have zero bytes.
1432                        assert_eq!(bytes, 0);
1433                    }
1434
1435                    // Modify the test file.
1436                    let mut transaction =
1437                        test_file.new_transaction().await.expect("new_transaction failed");
1438                    buffer.as_mut_slice().fill(0x37);
1439                    test_file
1440                        .txn_write(&mut transaction, 0, buffer.as_ref())
1441                        .await
1442                        .expect("txn_write failed");
1443                    transaction.commit().await.expect("commit failed");
1444                    check_test_file = true;
1445                }
1446
1447                object.object_id()
1448            } else {
1449                INVALID_OBJECT_ID
1450            };
1451
1452            // This will do a controlled unmount.
1453            fs.close().await.expect("close failed");
1454            let device = fs.take_device().await;
1455            device.reopen(false);
1456
1457            let fs = FxFilesystem::open(device).await.expect("open failed");
1458            fsck(fs.clone()).await.expect("fsck failed");
1459
1460            // As mentioned above, make sure that the object we created before the clean unmount
1461            // exists.
1462            if object_id != INVALID_OBJECT_ID {
1463                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1464                    .await
1465                    .expect("fsck_volume failed");
1466
1467                let store = root_volume(fs.clone())
1468                    .await
1469                    .expect("root_volume failed")
1470                    .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1471                    .await
1472                    .expect("volume failed");
1473                // We should be able to open the C object.
1474                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1475                    .await
1476                    .expect("open_object failed");
1477
1478                // If we made the modification to the test file, check it.
1479                if check_test_file {
1480                    info!("Checking test file for modification");
1481                    let test_file = ObjectStore::open_object(
1482                        &store,
1483                        test_file_object_id,
1484                        HandleOptions::default(),
1485                        None,
1486                    )
1487                    .await
1488                    .expect("open_object failed");
1489                    let mut buffer = test_file.allocate_buffer(4096).await;
1490                    assert_eq!(
1491                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1492                        4096
1493                    );
1494                    let expected = [0x37; 4096];
1495                    assert_eq!(buffer.as_slice(), &expected);
1496                }
1497            }
1498
1499            fs.close().await.expect("close failed");
1500        }
1501    }
1502
1503    #[fuchsia::test]
1504    async fn test_image_builder_mode_no_early_writes() {
1505        const BLOCK_SIZE: u32 = 4096;
1506        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1507        device.reopen(true);
1508        let fs = FxFilesystemBuilder::new()
1509            .format(true)
1510            .image_builder_mode(Some(SuperBlockInstance::A))
1511            .open(device)
1512            .await
1513            .expect("open failed");
1514        // Image builder mode only writes when data is written or fs.finalize() is called, so
1515        // we shouldn't see any errors here.
1516        fs.close().await.expect("closed");
1517    }
1518
1519    #[fuchsia::test]
1520    async fn test_image_builder_mode() {
1521        const BLOCK_SIZE: u32 = 4096;
1522        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1523        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1524
1525        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
1526        {
1527            let mut write_buf =
1528                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1529            write_buf.as_mut_slice().fill(0xf0);
1530            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1531        }
1532
1533        device.reopen(true);
1534
1535        let device = {
1536            let fs = FxFilesystemBuilder::new()
1537                .format(true)
1538                .image_builder_mode(Some(SuperBlockInstance::B))
1539                .open(device)
1540                .await
1541                .expect("open failed");
1542            {
1543                let root_store = fs.root_store();
1544                let root_directory =
1545                    Directory::open(&root_store, root_store.root_directory_object_id())
1546                        .await
1547                        .expect("open failed");
1548                // Create a file referencing existing data on device.
1549                let handle;
1550                {
1551                    let mut transaction = fs
1552                        .clone()
1553                        .new_transaction(
1554                            lock_keys![LockKey::object(
1555                                root_directory.store().store_object_id(),
1556                                root_directory.object_id()
1557                            )],
1558                            Options::default(),
1559                        )
1560                        .await
1561                        .expect("new transaction");
1562                    handle = root_directory
1563                        .create_child_file(&mut transaction, "test")
1564                        .await
1565                        .expect("create file");
1566                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1567                    transaction.commit().await.expect("commit");
1568                }
1569            }
1570            fs.device().reopen(false);
1571            fs.finalize().await.expect("finalize");
1572            fs.close().await.expect("close");
1573            fs.take_device().await
1574        };
1575        device.reopen(false);
1576        let fs = FxFilesystem::open(device).await.expect("open failed");
1577        fsck(fs.clone()).await.expect("fsck failed");
1578
1579        // Confirm that the test file points at the correct data.
1580        let root_store = fs.root_store();
1581        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1582            .await
1583            .expect("open failed");
1584        let (object_id, descriptor, _) =
1585            root_directory.lookup("test").await.expect("lookup failed").unwrap();
1586        assert_eq!(descriptor, ObjectDescriptor::File);
1587        let test_file =
1588            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1589                .await
1590                .expect("open failed");
1591        let mut read_buf =
1592            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1593        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1594        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1595        fs.close().await.expect("closed");
1596    }
1597}