fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::debug_assert_not_too_long;
27use crate::errors::FxfsError;
28use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
29use crate::log::*;
30use crate::lsm_tree::cache::NullCache;
31use crate::lsm_tree::types::Layer;
32use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
33use crate::object_store::allocator::Allocator;
34use crate::object_store::extent_record::{
35    ExtentKey, ExtentMode, ExtentValue, DEFAULT_DATA_ATTRIBUTE_ID,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    lock_keys, AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43,
49    MutationV46, ObjectStoreMutation, Options, Transaction, TxnMutation,
50    TRANSACTION_MAX_JOURNAL_USAGE,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, Item, ItemRef, NewChildStoreOptions,
54    ObjectStore, INVALID_OBJECT_ID,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, LATEST_VERSION};
59use anyhow::{anyhow, bail, ensure, Context, Error};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::future::poll_fn;
64use futures::FutureExt as _;
65use once_cell::sync::OnceCell;
66use rand::Rng;
67use rustc_hash::FxHashMap as HashMap;
68use serde::{Deserialize, Serialize};
69use static_assertions::const_assert;
70use std::clone::Clone;
71use std::collections::HashSet;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::Arc;
75use std::task::{Poll, Waker};
76
77// The journal file is written to in blocks of this size.
78pub const BLOCK_SIZE: u64 = 4096;
79
80// The journal file is extended by this amount when necessary.
81const CHUNK_SIZE: u64 = 131_072;
82const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
83
84// See the comment for the `reclaim_size` member of Inner.
85pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
86
87// Temporary space that should be reserved for the journal.  For example: space that is currently
88// used in the journal file but cannot be deallocated yet because we are flushing.
89pub const RESERVED_SPACE: u64 = 1_048_576;
90
91// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
92// journal stream, at which point any half-complete transactions are discarded.  We indicate a
93// journal reset by XORing the previous block's checksum with this mask, and using that value as a
94// seed for the next journal block.
95const RESET_XOR: u64 = 0xffffffffffffffff;
96
97// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
98// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
99// every block.
100pub type JournalCheckpoint = JournalCheckpointV32;
101
102#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
103pub struct JournalCheckpointV32 {
104    pub file_offset: u64,
105
106    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
107    // block.
108    pub checksum: Checksum,
109
110    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
111    // This can change across reset events so we store it along with the offset and checksum to
112    // know which version to deserialize.
113    pub version: Version,
114}
115
116pub type JournalRecord = JournalRecordV46;
117
118#[allow(clippy::large_enum_variant)]
119#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
120#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
121pub enum JournalRecordV46 {
122    // Indicates no more records in this block.
123    EndBlock,
124    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
125    // allocator.
126    Mutation { object_id: u64, mutation: MutationV46 },
127    // Commits records in the transaction.
128    Commit,
129    // Discard all mutations with offsets greater than or equal to the given offset.
130    Discard(u64),
131    // Indicates the device was flushed at the given journal offset.
132    // Note that this really means that at this point in the journal offset, we can be certain that
133    // there's no remaining buffered data in the block device; the buffers and the disk contents are
134    // consistent.
135    // We insert one of these records *after* a flush along with the *next* transaction to go
136    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
137    // on the next mount will serve the same purpose and count as a flush, although it is necessary
138    // to defensively flush the device before replaying the journal (if possible, i.e. not
139    // read-only) in case the block device connection was reused.
140    DidFlushDevice(u64),
141    // Checksums for a data range written by this transaction. A transaction is only valid if these
142    // checksums are right. The range is the device offset the checksums are for.
143    //
144    // A boolean indicates whether this range is being written to for the first time. For overwrite
145    // extents, we only check the checksums for a block if it has been written to for the first
146    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
147    // matter. For copy-on-write extents, the bool is always true.
148    DataChecksums(Range<u64>, ChecksumsV38, bool),
149}
150
151#[allow(clippy::large_enum_variant)]
152#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
153#[migrate_to_version(JournalRecordV46)]
154pub enum JournalRecordV43 {
155    EndBlock,
156    Mutation { object_id: u64, mutation: MutationV43 },
157    Commit,
158    Discard(u64),
159    DidFlushDevice(u64),
160    DataChecksums(Range<u64>, ChecksumsV38, bool),
161}
162
163#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
164#[migrate_to_version(JournalRecordV43)]
165pub enum JournalRecordV42 {
166    EndBlock,
167    Mutation { object_id: u64, mutation: MutationV41 },
168    Commit,
169    Discard(u64),
170    DidFlushDevice(u64),
171    DataChecksums(Range<u64>, ChecksumsV38, bool),
172}
173
174#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
175pub enum JournalRecordV41 {
176    EndBlock,
177    Mutation { object_id: u64, mutation: MutationV41 },
178    Commit,
179    Discard(u64),
180    DidFlushDevice(u64),
181    DataChecksums(Range<u64>, ChecksumsV38),
182}
183
184impl From<JournalRecordV41> for JournalRecordV42 {
185    fn from(record: JournalRecordV41) -> Self {
186        match record {
187            JournalRecordV41::EndBlock => Self::EndBlock,
188            JournalRecordV41::Mutation { object_id, mutation } => {
189                Self::Mutation { object_id, mutation: mutation.into() }
190            }
191            JournalRecordV41::Commit => Self::Commit,
192            JournalRecordV41::Discard(offset) => Self::Discard(offset),
193            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
194            JournalRecordV41::DataChecksums(range, sums) => {
195                // At the time of writing the only extents written by real systems are CoW extents
196                // so the new bool is always true.
197                Self::DataChecksums(range, sums, true)
198            }
199        }
200    }
201}
202
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(JournalRecordV41)]
205pub enum JournalRecordV40 {
206    EndBlock,
207    Mutation { object_id: u64, mutation: MutationV40 },
208    Commit,
209    Discard(u64),
210    DidFlushDevice(u64),
211    DataChecksums(Range<u64>, ChecksumsV38),
212}
213
214pub(super) fn journal_handle_options() -> HandleOptions {
215    HandleOptions { skip_journal_checks: true, ..Default::default() }
216}
217
218/// The journal records a stream of mutations that are to be applied to other objects.  At mount
219/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
220/// without having to make a large number of writes; they can be deferred to a later time (e.g.
221/// when a sufficient number have been queued).  It also provides support for transactions, the
222/// ability to have mutations that are to be applied atomically together.
223pub struct Journal {
224    objects: Arc<ObjectManager>,
225    handle: OnceCell<DataObjectHandle<ObjectStore>>,
226    super_block_manager: SuperBlockManager,
227    inner: Mutex<Inner>,
228    writer_mutex: Mutex<()>,
229    sync_mutex: futures::lock::Mutex<()>,
230    trace: AtomicBool,
231
232    // This event is used when we are waiting for a compaction to free up journal space.
233    reclaim_event: Event,
234}
235
236struct Inner {
237    super_block_header: SuperBlockHeader,
238
239    // The offset that we can zero the journal up to now that it is no longer needed.
240    zero_offset: Option<u64>,
241
242    // The journal offset that we most recently flushed to the device.
243    device_flushed_offset: u64,
244
245    // If true, indicates a DidFlushDevice record is pending.
246    needs_did_flush_device: bool,
247
248    // The writer for the journal.
249    writer: JournalWriter,
250
251    // Set when a reset is encountered during a read.
252    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
253    output_reset_version: bool,
254
255    // Waker for the flush task.
256    flush_waker: Option<Waker>,
257
258    // Indicates the journal has been terminated.
259    terminate: bool,
260
261    // Latched error indicating reason for journal termination if not graceful.
262    terminate_reason: Option<Error>,
263
264    // Disable compactions.
265    disable_compactions: bool,
266
267    // True if compactions are running.
268    compaction_running: bool,
269
270    // Waker for the sync task for when it's waiting for the flush task to finish.
271    sync_waker: Option<Waker>,
272
273    // The last offset we flushed to the journal file.
274    flushed_offset: u64,
275
276    // The last offset that should be considered valid in the journal file.  Most of the time, this
277    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
278    // up to the end of the last valid transaction; it won't include transactions that follow that
279    // have been discarded.
280    valid_to: u64,
281
282    // If, after replaying, we have to discard a number of mutations (because they don't validate),
283    // this offset specifies where we need to discard back to.  This is so that when we next replay,
284    // we ignore those mutations and continue with new good mutations.
285    discard_offset: Option<u64>,
286
287    // In the steady state, the journal should fluctuate between being approximately half of this
288    // number and this number.  New super-blocks will be written every time about half of this
289    // amount is written to the journal.
290    reclaim_size: u64,
291
292    image_builder_mode: bool,
293}
294
295impl Inner {
296    fn terminate(&mut self, reason: Option<Error>) {
297        self.terminate = true;
298
299        if let Some(err) = reason {
300            error!(error:? = err; "Terminating journal");
301            // Log previous error if one was already set, otherwise latch the error.
302            if let Some(prev_err) = self.terminate_reason.as_ref() {
303                error!(error:? = prev_err; "Journal previously terminated");
304            } else {
305                self.terminate_reason = Some(err);
306            }
307        }
308
309        if let Some(waker) = self.flush_waker.take() {
310            waker.wake();
311        }
312        if let Some(waker) = self.sync_waker.take() {
313            waker.wake();
314        }
315    }
316}
317
318pub struct JournalOptions {
319    /// In the steady state, the journal should fluctuate between being approximately half of this
320    /// number and this number.  New super-blocks will be written every time about half of this
321    /// amount is written to the journal.
322    pub reclaim_size: u64,
323}
324
325impl Default for JournalOptions {
326    fn default() -> Self {
327        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE }
328    }
329}
330
331struct JournaledTransactions {
332    transactions: Vec<JournaledTransaction>,
333    device_flushed_offset: u64,
334}
335
336#[derive(Debug, Default)]
337pub struct JournaledTransaction {
338    pub checkpoint: JournalCheckpoint,
339    pub root_parent_mutations: Vec<Mutation>,
340    pub root_mutations: Vec<Mutation>,
341    // List of (store_object_id, mutation).
342    pub non_root_mutations: Vec<(u64, Mutation)>,
343    pub end_offset: u64,
344    pub checksums: Vec<JournaledChecksums>,
345
346    // Records offset + 1 of the matching begin_flush transaction.  If the object is deleted, the
347    // offset will be STORE_DELETED.  The +1 is because we want to ignore the begin flush
348    // transaction; we don't need or want to replay it.
349    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
350}
351
352impl JournaledTransaction {
353    fn new(checkpoint: JournalCheckpoint) -> Self {
354        Self { checkpoint, ..Default::default() }
355    }
356}
357
358const STORE_DELETED: u64 = u64::MAX;
359
360#[derive(Debug)]
361pub struct JournaledChecksums {
362    pub device_range: Range<u64>,
363    pub checksums: Checksums,
364    pub first_write: bool,
365}
366
367/// Handles for journal-like objects have some additional functionality to manage their extents,
368/// since during replay we need to add extents as we find them.
369pub trait JournalHandle: ReadObjectHandle {
370    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
371    /// (which will be skipped if None is returned).
372    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
373    fn end_offset(&self) -> Option<u64>;
374    /// Adds an extent to the current end of the journal stream.
375    /// `added_offset` is the offset into the journal of the transaction which added this extent,
376    /// used for discard_extents.
377    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
378    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
379    fn discard_extents(&mut self, discard_offset: u64);
380}
381
382// Provide a stub implementation for DataObjectHandle so we can use it in
383// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
384// DataObjectHandle already knows where its extents live).
385impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
386    fn end_offset(&self) -> Option<u64> {
387        None
388    }
389    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
390        // NOP
391    }
392    fn discard_extents(&mut self, _discard_offset: u64) {
393        // NOP
394    }
395}
396
397#[fxfs_trace::trace]
398impl Journal {
399    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
400        let starting_checksum = rand::thread_rng().gen_range(1..u64::MAX);
401        Journal {
402            objects: objects,
403            handle: OnceCell::new(),
404            super_block_manager: SuperBlockManager::new(),
405            inner: Mutex::new(Inner {
406                super_block_header: SuperBlockHeader::default(),
407                zero_offset: None,
408                device_flushed_offset: 0,
409                needs_did_flush_device: false,
410                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
411                output_reset_version: false,
412                flush_waker: None,
413                terminate: false,
414                terminate_reason: None,
415                disable_compactions: false,
416                compaction_running: false,
417                sync_waker: None,
418                flushed_offset: 0,
419                valid_to: 0,
420                discard_offset: None,
421                reclaim_size: options.reclaim_size,
422                image_builder_mode: false,
423            }),
424            writer_mutex: Mutex::new(()),
425            sync_mutex: futures::lock::Mutex::new(()),
426            trace: AtomicBool::new(false),
427            reclaim_event: Event::new(),
428        }
429    }
430
431    pub fn set_trace(&self, trace: bool) {
432        let old_value = self.trace.swap(trace, Ordering::Relaxed);
433        if trace != old_value {
434            info!(trace; "J: trace");
435        }
436    }
437
438    pub fn set_image_builder_mode(&self, enabled: bool) {
439        self.inner.lock().image_builder_mode = enabled;
440    }
441
442    pub fn image_builder_mode(&self) -> bool {
443        self.inner.lock().image_builder_mode
444    }
445
446    /// Used during replay to validate a mutation.  This should return false if the mutation is not
447    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
448    /// data out-of-order, or because of a malicious actor.
449    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
450        match mutation {
451            Mutation::ObjectStore(ObjectStoreMutation {
452                item:
453                    Item {
454                        key:
455                            ObjectKey {
456                                data:
457                                    ObjectKeyData::Attribute(
458                                        _,
459                                        AttributeKey::Extent(ExtentKey { range }),
460                                    ),
461                                ..
462                            },
463                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
464                        ..
465                    },
466                ..
467            }) => {
468                if range.is_empty() || !range.is_aligned(block_size) {
469                    return false;
470                }
471                let len = range.length().unwrap();
472                if let ExtentMode::Cow(checksums) = mode {
473                    if checksums.len() > 0 {
474                        if len % checksums.len() as u64 != 0 {
475                            return false;
476                        }
477                        if (len / checksums.len() as u64) % block_size != 0 {
478                            return false;
479                        }
480                    }
481                }
482                if *device_offset % block_size != 0
483                    || *device_offset >= device_size
484                    || device_size - *device_offset < len
485                {
486                    return false;
487                }
488            }
489            Mutation::ObjectStore(_) => {}
490            Mutation::EncryptedObjectStore(_) => {}
491            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
492                return !device_range.is_empty()
493                    && *owner_object_id != INVALID_OBJECT_ID
494                    && device_range.end <= device_size;
495            }
496            Mutation::Allocator(AllocatorMutation::Deallocate {
497                device_range,
498                owner_object_id,
499            }) => {
500                return !device_range.is_empty()
501                    && *owner_object_id != INVALID_OBJECT_ID
502                    && device_range.end <= device_size;
503            }
504            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
505                return *owner_object_id != INVALID_OBJECT_ID;
506            }
507            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
508                return *owner_object_id != INVALID_OBJECT_ID;
509            }
510            Mutation::BeginFlush => {}
511            Mutation::EndFlush => {}
512            Mutation::DeleteVolume => {}
513            Mutation::UpdateBorrowed(_) => {}
514            Mutation::UpdateMutationsKey(_) => {}
515            Mutation::CreateInternalDir(owner_object_id) => {
516                return *owner_object_id != INVALID_OBJECT_ID;
517            }
518        }
519        true
520    }
521
522    // Assumes that `mutation` has been validated.
523    fn update_checksum_list(
524        &self,
525        journal_offset: u64,
526        mutation: &Mutation,
527        checksum_list: &mut ChecksumList,
528    ) -> Result<(), Error> {
529        match mutation {
530            Mutation::ObjectStore(_) => {}
531            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
532                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
533            }
534            _ => {}
535        }
536        Ok(())
537    }
538
539    /// Reads the latest super-block, and then replays journaled records.
540    #[trace]
541    pub async fn replay(
542        &self,
543        filesystem: Arc<FxFilesystem>,
544        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
545    ) -> Result<(), Error> {
546        let block_size = filesystem.block_size();
547
548        let (super_block, root_parent) =
549            self.super_block_manager.load(filesystem.device(), block_size).await?;
550
551        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
552
553        self.objects.set_root_parent_store(root_parent.clone());
554        let allocator =
555            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
556        if let Some(on_new_allocator) = on_new_allocator {
557            on_new_allocator(allocator.clone());
558        }
559        self.objects.set_allocator(allocator.clone());
560        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
561        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
562        {
563            let mut inner = self.inner.lock();
564            inner.super_block_header = super_block.clone();
565        }
566
567        let device = filesystem.device();
568
569        let mut handle;
570        {
571            let root_parent_layer = root_parent.tree().mutable_layer();
572            let mut iter = root_parent_layer
573                .seek(Bound::Included(&ObjectKey::attribute(
574                    super_block.journal_object_id,
575                    DEFAULT_DATA_ATTRIBUTE_ID,
576                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
577                        super_block.journal_checkpoint.file_offset,
578                        BLOCK_SIZE,
579                    ))),
580                )))
581                .await
582                .context("Failed to seek root parent store")?;
583            let start_offset = if let Some(ItemRef {
584                key:
585                    ObjectKey {
586                        data:
587                            ObjectKeyData::Attribute(
588                                DEFAULT_DATA_ATTRIBUTE_ID,
589                                AttributeKey::Extent(ExtentKey { range }),
590                            ),
591                        ..
592                    },
593                ..
594            }) = iter.get()
595            {
596                range.start
597            } else {
598                0
599            };
600            handle = BootstrapObjectHandle::new_with_start_offset(
601                super_block.journal_object_id,
602                device.clone(),
603                start_offset,
604            );
605            while let Some(item) = iter.get() {
606                if !match item.into() {
607                    Some((
608                        object_id,
609                        DEFAULT_DATA_ATTRIBUTE_ID,
610                        ExtentKey { range },
611                        ExtentValue::Some { device_offset, .. },
612                    )) if object_id == super_block.journal_object_id => {
613                        if let Some(end_offset) = handle.end_offset() {
614                            if range.start != end_offset {
615                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
616                                    "Unexpected journal extent {:?}, expected start: {}",
617                                    item, end_offset
618                                )));
619                            }
620                        }
621                        handle.push_extent(
622                            0, // We never discard extents from the root parent store.
623                            *device_offset
624                                ..*device_offset + range.length().context("Invalid extent")?,
625                        );
626                        true
627                    }
628                    _ => false,
629                } {
630                    break;
631                }
632                iter.advance().await.context("Failed to advance root parent store iterator")?;
633            }
634        }
635
636        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
637        let JournaledTransactions { mut transactions, device_flushed_offset } = self
638            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
639            .await
640            .context("Reading transactions for replay")?;
641
642        // Validate all the mutations.
643        let mut checksum_list = ChecksumList::new(device_flushed_offset);
644        let mut valid_to = reader.journal_file_checkpoint().file_offset;
645        let device_size = device.size();
646        'bad_replay: for JournaledTransaction {
647            checkpoint,
648            root_parent_mutations,
649            root_mutations,
650            non_root_mutations,
651            checksums,
652            ..
653        } in &transactions
654        {
655            for JournaledChecksums { device_range, checksums, first_write } in checksums {
656                checksum_list
657                    .push(
658                        checkpoint.file_offset,
659                        device_range.clone(),
660                        checksums.maybe_as_ref().context("Malformed checksums")?,
661                        *first_write,
662                    )
663                    .context("Pushing journal checksum records to checksum list")?;
664            }
665            for mutation in root_parent_mutations
666                .iter()
667                .chain(root_mutations)
668                .chain(non_root_mutations.iter().map(|(_, m)| m))
669            {
670                if !self.validate_mutation(mutation, block_size, device_size) {
671                    info!(mutation:?; "Stopping replay at bad mutation");
672                    valid_to = checkpoint.file_offset;
673                    break 'bad_replay;
674                }
675                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
676            }
677        }
678
679        // Validate the checksums.
680        let valid_to = checksum_list
681            .verify(device.as_ref(), valid_to)
682            .await
683            .context("Failed to validate checksums")?;
684
685        // Apply the mutations...
686
687        let mut last_checkpoint = reader.journal_file_checkpoint();
688        let mut journal_offsets = super_block.journal_file_offsets.clone();
689
690        // Start with the root-parent mutations, and also determine the journal offsets for all
691        // other objects.
692        for (index, JournaledTransaction { checkpoint, root_parent_mutations, end_flush, .. }) in
693            transactions.iter_mut().enumerate()
694        {
695            if checkpoint.file_offset >= valid_to {
696                last_checkpoint = checkpoint.clone();
697
698                // Truncate the transactions so we don't need to worry about them on the next pass.
699                transactions.truncate(index);
700                break;
701            }
702
703            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
704            for mutation in root_parent_mutations.drain(..) {
705                self.objects
706                    .apply_mutation(
707                        super_block.root_parent_store_object_id,
708                        mutation,
709                        &context,
710                        AssocObj::None,
711                    )
712                    .context("Failed to replay root parent store mutations")?;
713            }
714
715            if let Some((object_id, journal_offset)) = end_flush {
716                journal_offsets.insert(*object_id, *journal_offset);
717            }
718        }
719
720        // Now we can open the root store.
721        let root_store = ObjectStore::open(
722            &root_parent,
723            super_block.root_store_object_id,
724            Box::new(NullCache {}),
725        )
726        .await
727        .context("Unable to open root store")?;
728
729        ensure!(
730            !root_store.is_encrypted(),
731            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
732        );
733        self.objects.set_root_store(root_store);
734
735        let root_store_offset =
736            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
737
738        // Now replay the root store mutations.
739        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
740            if checkpoint.file_offset < root_store_offset {
741                continue;
742            }
743
744            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
745            for mutation in root_mutations.drain(..) {
746                self.objects
747                    .apply_mutation(
748                        super_block.root_store_object_id,
749                        mutation,
750                        &context,
751                        AssocObj::None,
752                    )
753                    .context("Failed to replay root store mutations")?;
754            }
755        }
756
757        // Now we can open the allocator.
758        allocator.open().await.context("Failed to open allocator")?;
759
760        // Now replay all other mutations.
761        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
762        {
763            self.objects
764                .replay_mutations(
765                    non_root_mutations,
766                    &journal_offsets,
767                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
768                    end_offset,
769                )
770                .await
771                .context("Failed to replay mutations")?;
772        }
773
774        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
775
776        let discarded_to =
777            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
778                Some(reader.journal_file_checkpoint().file_offset)
779            } else {
780                None
781            };
782
783        // Configure the journal writer so that we can continue.
784        {
785            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
786                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
787                    "journal replay cut short; journal finishes at {}, but super-block was \
788                     written at {}",
789                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
790                )));
791            }
792            let handle = ObjectStore::open_object(
793                &root_parent,
794                super_block.journal_object_id,
795                journal_handle_options(),
796                None,
797            )
798            .await
799            .with_context(|| {
800                format!(
801                    "Failed to open journal file (object id: {})",
802                    super_block.journal_object_id
803                )
804            })?;
805            let _ = self.handle.set(handle);
806            let mut inner = self.inner.lock();
807            reader.skip_to_end_of_block();
808            let mut writer_checkpoint = reader.journal_file_checkpoint();
809
810            // Make sure we don't accidentally use the reader from now onwards.
811            std::mem::drop(reader);
812
813            // Reset the stream to indicate that we've remounted the journal.
814            writer_checkpoint.checksum ^= RESET_XOR;
815            writer_checkpoint.version = LATEST_VERSION;
816            inner.flushed_offset = writer_checkpoint.file_offset;
817
818            // When we open the filesystem as writable, we flush the device.
819            inner.device_flushed_offset = inner.flushed_offset;
820
821            inner.writer.seek(writer_checkpoint);
822            inner.output_reset_version = true;
823            inner.valid_to = last_checkpoint.file_offset;
824            if last_checkpoint.file_offset < inner.flushed_offset {
825                inner.discard_offset = Some(last_checkpoint.file_offset);
826            }
827        }
828
829        self.objects
830            .on_replay_complete()
831            .await
832            .context("Failed to complete replay for object manager")?;
833
834        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
835        Ok(())
836    }
837
838    async fn read_transactions(
839        &self,
840        reader: &mut JournalReader,
841        end_offset: Option<u64>,
842        object_id_filter: u64,
843    ) -> Result<JournaledTransactions, Error> {
844        let mut transactions = Vec::new();
845        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
846            let super_block = &self.inner.lock().super_block_header;
847            (
848                super_block.super_block_journal_file_offset,
849                super_block.root_parent_store_object_id,
850                super_block.root_store_object_id,
851            )
852        };
853        let mut current_transaction = None;
854        let mut begin_flush_offsets = HashMap::default();
855        let mut stores_deleted = HashSet::new();
856        loop {
857            // Cache the checkpoint before we deserialize a record.
858            let checkpoint = reader.journal_file_checkpoint();
859            if let Some(end_offset) = end_offset {
860                if checkpoint.file_offset >= end_offset {
861                    break;
862                }
863            }
864            let result =
865                reader.deserialize().await.context("Failed to deserialize journal record")?;
866            match result {
867                ReadResult::Reset(_) => {
868                    if current_transaction.is_some() {
869                        current_transaction = None;
870                        transactions.pop();
871                    }
872                    let offset = reader.journal_file_checkpoint().file_offset;
873                    if offset > device_flushed_offset {
874                        device_flushed_offset = offset;
875                    }
876                }
877                ReadResult::Some(record) => {
878                    match record {
879                        JournalRecord::EndBlock => {
880                            reader.skip_to_end_of_block();
881                        }
882                        JournalRecord::Mutation { object_id, mutation } => {
883                            let current_transaction = match current_transaction.as_mut() {
884                                None => {
885                                    transactions.push(JournaledTransaction::new(checkpoint));
886                                    current_transaction = transactions.last_mut();
887                                    current_transaction.as_mut().unwrap()
888                                }
889                                Some(transaction) => transaction,
890                            };
891
892                            if stores_deleted.contains(&object_id) {
893                                bail!(anyhow!(FxfsError::Inconsistent)
894                                    .context("Encountered mutations for deleted store"));
895                            }
896
897                            match &mutation {
898                                Mutation::BeginFlush => {
899                                    begin_flush_offsets.insert(
900                                        object_id,
901                                        current_transaction.checkpoint.file_offset,
902                                    );
903                                }
904                                Mutation::EndFlush => {
905                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
906                                        // The +1 is because we don't want to replay the transaction
907                                        // containing the begin flush.
908                                        if current_transaction
909                                            .end_flush
910                                            .replace((object_id, offset + 1))
911                                            .is_some()
912                                        {
913                                            bail!(anyhow!(FxfsError::Inconsistent).context(
914                                                "Multiple EndFlush/DeleteVolume mutations in a \
915                                                 single transaction"
916                                            ));
917                                        }
918                                    }
919                                }
920                                Mutation::DeleteVolume => {
921                                    if current_transaction
922                                        .end_flush
923                                        .replace((object_id, STORE_DELETED))
924                                        .is_some()
925                                    {
926                                        bail!(anyhow!(FxfsError::Inconsistent).context(
927                                            "Multiple EndFlush/DeleteVolume mutations in a single \
928                                             transaction"
929                                        ));
930                                    }
931                                    stores_deleted.insert(object_id);
932                                }
933                                _ => {}
934                            }
935
936                            // If this mutation doesn't need to be applied, don't bother adding it
937                            // to the transaction.
938                            if (object_id_filter == INVALID_OBJECT_ID
939                                || object_id_filter == object_id)
940                                && self.should_apply(object_id, &current_transaction.checkpoint)
941                            {
942                                if object_id == root_parent_store_object_id {
943                                    current_transaction.root_parent_mutations.push(mutation);
944                                } else if object_id == root_store_object_id {
945                                    current_transaction.root_mutations.push(mutation);
946                                } else {
947                                    current_transaction
948                                        .non_root_mutations
949                                        .push((object_id, mutation));
950                                }
951                            }
952                        }
953                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
954                            let current_transaction = match current_transaction.as_mut() {
955                                None => {
956                                    transactions.push(JournaledTransaction::new(checkpoint));
957                                    current_transaction = transactions.last_mut();
958                                    current_transaction.as_mut().unwrap()
959                                }
960                                Some(transaction) => transaction,
961                            };
962                            current_transaction.checksums.push(JournaledChecksums {
963                                device_range,
964                                checksums,
965                                first_write,
966                            });
967                        }
968                        JournalRecord::Commit => {
969                            if let Some(JournaledTransaction {
970                                ref checkpoint,
971                                ref root_parent_mutations,
972                                ref mut end_offset,
973                                ..
974                            }) = current_transaction.take()
975                            {
976                                for mutation in root_parent_mutations {
977                                    // Snoop the mutations for any that might apply to the journal
978                                    // file so that we can pass them to the reader so that it can
979                                    // read the journal file.
980                                    if let Mutation::ObjectStore(ObjectStoreMutation {
981                                        item:
982                                            Item {
983                                                key:
984                                                    ObjectKey {
985                                                        object_id,
986                                                        data:
987                                                            ObjectKeyData::Attribute(
988                                                                DEFAULT_DATA_ATTRIBUTE_ID,
989                                                                AttributeKey::Extent(ExtentKey {
990                                                                    range,
991                                                                }),
992                                                            ),
993                                                        ..
994                                                    },
995                                                value:
996                                                    ObjectValue::Extent(ExtentValue::Some {
997                                                        device_offset,
998                                                        ..
999                                                    }),
1000                                                ..
1001                                            },
1002                                        ..
1003                                    }) = mutation
1004                                    {
1005                                        // Add the journal extents we find on the way to our
1006                                        // reader.
1007                                        let handle = reader.handle();
1008                                        if *object_id != handle.object_id() {
1009                                            continue;
1010                                        }
1011                                        if let Some(end_offset) = handle.end_offset() {
1012                                            if range.start != end_offset {
1013                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1014                                                    format!(
1015                                                        "Unexpected journal extent {:?} -> {}, \
1016                                                           expected start: {}",
1017                                                        range, device_offset, end_offset,
1018                                                    )
1019                                                ));
1020                                            }
1021                                        }
1022                                        handle.push_extent(
1023                                            checkpoint.file_offset,
1024                                            *device_offset
1025                                                ..*device_offset
1026                                                    + range.length().context("Invalid extent")?,
1027                                        );
1028                                    }
1029                                }
1030                                *end_offset = reader.journal_file_checkpoint().file_offset;
1031                            }
1032                        }
1033                        JournalRecord::Discard(offset) => {
1034                            if offset == 0 {
1035                                bail!(anyhow!(FxfsError::Inconsistent)
1036                                    .context("Invalid offset for Discard"));
1037                            }
1038                            if let Some(transaction) = current_transaction.as_ref() {
1039                                if transaction.checkpoint.file_offset < offset {
1040                                    // Odd, but OK.
1041                                    continue;
1042                                }
1043                            }
1044                            current_transaction = None;
1045                            while let Some(transaction) = transactions.last() {
1046                                if transaction.checkpoint.file_offset < offset {
1047                                    break;
1048                                }
1049                                transactions.pop();
1050                            }
1051                            reader.handle().discard_extents(offset);
1052                        }
1053                        JournalRecord::DidFlushDevice(offset) => {
1054                            if offset > device_flushed_offset {
1055                                device_flushed_offset = offset;
1056                            }
1057                        }
1058                    }
1059                }
1060                // This is expected when we reach the end of the journal stream.
1061                ReadResult::ChecksumMismatch => break,
1062            }
1063        }
1064
1065        // Discard any uncommitted transaction.
1066        if current_transaction.is_some() {
1067            transactions.pop();
1068        }
1069
1070        Ok(JournaledTransactions { transactions, device_flushed_offset })
1071    }
1072
1073    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1074    /// root store but no further child stores).
1075    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1076        // The following constants are only used at format time. When mounting, the recorded values
1077        // in the superblock should be used.  The root parent store does not have a parent, but
1078        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1079        // the same object ID) with any objects in the root store that use the journal to track
1080        // mutations.
1081        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1082        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1083        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1084
1085        info!(device_size = filesystem.device().size(); "Formatting");
1086
1087        let checkpoint = JournalCheckpoint {
1088            version: LATEST_VERSION,
1089            ..self.inner.lock().writer.journal_file_checkpoint()
1090        };
1091
1092        let root_parent = ObjectStore::new_empty(
1093            None,
1094            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1095            filesystem.clone(),
1096            Box::new(NullCache {}),
1097        );
1098        self.objects.set_root_parent_store(root_parent.clone());
1099
1100        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1101        self.objects.set_allocator(allocator.clone());
1102        self.objects.init_metadata_reservation()?;
1103
1104        let journal_handle;
1105        let super_block_a_handle;
1106        let super_block_b_handle;
1107        let root_store;
1108        let mut transaction = filesystem
1109            .clone()
1110            .new_transaction(
1111                lock_keys![],
1112                Options { skip_journal_checks: true, ..Default::default() },
1113            )
1114            .await?;
1115        root_store = root_parent
1116            .new_child_store(
1117                &mut transaction,
1118                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1119                Box::new(NullCache {}),
1120            )
1121            .await
1122            .context("new_child_store")?;
1123        self.objects.set_root_store(root_store.clone());
1124
1125        allocator.create(&mut transaction).await?;
1126
1127        // Create the super-block objects...
1128        super_block_a_handle = ObjectStore::create_object_with_id(
1129            &root_store,
1130            &mut transaction,
1131            SuperBlockInstance::A.object_id(),
1132            HandleOptions::default(),
1133            None,
1134        )
1135        .await
1136        .context("create super block")?;
1137        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1138        super_block_a_handle
1139            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1140            .await
1141            .context("extend super block")?;
1142        super_block_b_handle = ObjectStore::create_object_with_id(
1143            &root_store,
1144            &mut transaction,
1145            SuperBlockInstance::B.object_id(),
1146            HandleOptions::default(),
1147            None,
1148        )
1149        .await
1150        .context("create super block")?;
1151        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1152        super_block_b_handle
1153            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1154            .await
1155            .context("extend super block")?;
1156
1157        // the journal object...
1158        journal_handle = ObjectStore::create_object(
1159            &root_parent,
1160            &mut transaction,
1161            journal_handle_options(),
1162            None,
1163        )
1164        .await
1165        .context("create journal")?;
1166        if !self.inner.lock().image_builder_mode {
1167            let mut file_range = 0..self.chunk_size();
1168            journal_handle
1169                .preallocate_range(&mut transaction, &mut file_range)
1170                .await
1171                .context("preallocate journal")?;
1172            if file_range.start < file_range.end {
1173                bail!("preallocate_range returned too little space");
1174            }
1175        }
1176
1177        // Write the root store object info.
1178        root_store.create(&mut transaction).await?;
1179
1180        // The root parent graveyard.
1181        root_parent
1182            .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1183
1184        transaction.commit().await?;
1185
1186        self.inner.lock().super_block_header = SuperBlockHeader::new(
1187            root_parent.store_object_id(),
1188            root_parent.graveyard_directory_object_id(),
1189            root_store.store_object_id(),
1190            allocator.object_id(),
1191            journal_handle.object_id(),
1192            checkpoint,
1193            /* earliest_version: */ LATEST_VERSION,
1194        );
1195
1196        // Initialize the journal writer.
1197        let _ = self.handle.set(journal_handle);
1198        Ok(())
1199    }
1200
1201    /// Normally we allocate the journal when creating the filesystem.
1202    /// This is used image_builder_mode when journal allocation is done last.
1203    pub async fn allocate_journal(&self) -> Result<(), Error> {
1204        let handle = self.handle.get().unwrap();
1205        let filesystem = handle.store().filesystem();
1206        let mut transaction = filesystem
1207            .clone()
1208            .new_transaction(
1209                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1210                Options { skip_journal_checks: true, ..Default::default() },
1211            )
1212            .await?;
1213        let mut file_range = 0..self.chunk_size();
1214        self.handle
1215            .get()
1216            .unwrap()
1217            .preallocate_range(&mut transaction, &mut file_range)
1218            .await
1219            .context("preallocate journal")?;
1220        if file_range.start < file_range.end {
1221            bail!("preallocate_range returned too little space");
1222        }
1223        transaction.commit().await?;
1224        Ok(())
1225    }
1226
1227    pub async fn init_superblocks(&self) -> Result<(), Error> {
1228        // Overwrite both superblocks.
1229        for _ in 0..2 {
1230            self.write_super_block().await?;
1231        }
1232        Ok(())
1233    }
1234
1235    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1236    /// flush.
1237    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1238    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1239    /// object has data to flush and is registered with ObjectManager).
1240    pub async fn read_transactions_for_object(
1241        &self,
1242        object_id: u64,
1243    ) -> Result<Vec<JournaledTransaction>, Error> {
1244        let handle = self.handle.get().expect("No journal handle");
1245        // Reopen the handle since JournalReader needs an owned handle.
1246        let handle = ObjectStore::open_object(
1247            handle.owner(),
1248            handle.object_id(),
1249            journal_handle_options(),
1250            None,
1251        )
1252        .await?;
1253
1254        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1255            Some(checkpoint) => checkpoint,
1256            None => return Ok(vec![]),
1257        };
1258        let mut reader = JournalReader::new(handle, &checkpoint);
1259        // Record the current end offset and only read to there, so we don't accidentally read any
1260        // partially flushed blocks.
1261        let end_offset = self.inner.lock().valid_to;
1262        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1263    }
1264
1265    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1266    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1267        if transaction.is_empty() {
1268            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1269        }
1270
1271        self.pre_commit(transaction).await?;
1272        Ok(self.write_and_apply_mutations(transaction))
1273    }
1274
1275    // Before we commit, we might need to extend the journal or write pending records to the
1276    // journal.
1277    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1278        let handle;
1279
1280        let (size, zero_offset) = {
1281            let mut inner = self.inner.lock();
1282
1283            // If this is the first write after a RESET, we need to output version first.
1284            if std::mem::take(&mut inner.output_reset_version) {
1285                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1286            }
1287
1288            if let Some(discard_offset) = inner.discard_offset {
1289                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1290                inner.discard_offset = None;
1291            }
1292
1293            if inner.needs_did_flush_device {
1294                let offset = inner.device_flushed_offset;
1295                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1296                inner.needs_did_flush_device = false;
1297            }
1298
1299            handle = match self.handle.get() {
1300                None => return Ok(()),
1301                Some(x) => x,
1302            };
1303
1304            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1305
1306            let size = handle.get_size();
1307            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1308
1309            if size.is_none()
1310                && inner.zero_offset.is_none()
1311                && !self.objects.needs_borrow_for_journal(file_offset)
1312            {
1313                return Ok(());
1314            }
1315
1316            (size, inner.zero_offset)
1317        };
1318
1319        let mut transaction = handle
1320            .new_transaction_with_options(Options {
1321                skip_journal_checks: true,
1322                borrow_metadata_space: true,
1323                allocator_reservation: Some(self.objects.metadata_reservation()),
1324                txn_guard: Some(transaction.txn_guard()),
1325                ..Default::default()
1326            })
1327            .await?;
1328        if let Some(size) = size {
1329            handle
1330                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1331                .await?;
1332        }
1333        if let Some(zero_offset) = zero_offset {
1334            handle.zero(&mut transaction, 0..zero_offset).await?;
1335        }
1336
1337        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1338        // instead we just apply the transaction directly here.
1339        self.write_and_apply_mutations(&mut transaction);
1340
1341        let mut inner = self.inner.lock();
1342
1343        // Make sure the transaction to extend the journal made it to the journal within the old
1344        // size, since otherwise, it won't be possible to replay.
1345        if let Some(size) = size {
1346            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1347        }
1348
1349        if inner.zero_offset == zero_offset {
1350            inner.zero_offset = None;
1351        }
1352
1353        Ok(())
1354    }
1355
1356    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1357    // all records should be applied because the object store or allocator might already contain the
1358    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1359    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1360        let super_block_header = &self.inner.lock().super_block_header;
1361        let offset = super_block_header
1362            .journal_file_offsets
1363            .get(&object_id)
1364            .cloned()
1365            .unwrap_or(super_block_header.super_block_journal_file_offset);
1366        journal_file_checkpoint.file_offset >= offset
1367    }
1368
1369    /// Flushes previous writes to the device and then writes out a new super-block.
1370    /// Callers must ensure that we do not make concurrent calls.
1371    async fn write_super_block(&self) -> Result<(), Error> {
1372        let root_parent_store = self.objects.root_parent_store();
1373
1374        // We need to flush previous writes to the device since the new super-block we are writing
1375        // relies on written data being observable, and we also need to lock the root parent store
1376        // so that no new entries are written to it whilst we are writing the super-block, and for
1377        // that we use the write lock.
1378        let old_layers;
1379        let old_super_block_offset;
1380        let mut new_super_block_header;
1381        let checkpoint;
1382        let borrowed;
1383
1384        {
1385            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1386            {
1387                let _write_guard = self.writer_mutex.lock();
1388                (checkpoint, borrowed) = self.pad_to_block()?;
1389                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1390            }
1391            self.flush_device(checkpoint.file_offset)
1392                .await
1393                .context("flush failed when writing superblock")?;
1394        }
1395
1396        new_super_block_header = self.inner.lock().super_block_header.clone();
1397
1398        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1399
1400        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1401
1402        new_super_block_header.generation =
1403            new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1404        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1405        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1406        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1407        new_super_block_header.journal_file_offsets = journal_file_offsets;
1408        new_super_block_header.borrowed_metadata_space = borrowed;
1409
1410        self.super_block_manager
1411            .save(
1412                new_super_block_header.clone(),
1413                self.objects.root_parent_store().filesystem(),
1414                old_layers,
1415            )
1416            .await?;
1417        {
1418            let mut inner = self.inner.lock();
1419            inner.super_block_header = new_super_block_header;
1420            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1421        }
1422
1423        Ok(())
1424    }
1425
1426    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1427    /// unless the flush_device option is set, in which case data should have been persisted to
1428    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1429    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1430    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1431    /// borrowed metadata space at the point it was flushed.
1432    pub async fn sync(
1433        &self,
1434        options: SyncOptions<'_>,
1435    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1436        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1437
1438        let (checkpoint, borrowed) = {
1439            if let Some(precondition) = options.precondition {
1440                if !precondition() {
1441                    return Ok(None);
1442                }
1443            }
1444
1445            // This guard is required so that we don't insert an EndBlock record in the middle of a
1446            // transaction.
1447            let _guard = self.writer_mutex.lock();
1448
1449            self.pad_to_block()?
1450        };
1451
1452        if options.flush_device {
1453            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1454        }
1455
1456        Ok(Some((checkpoint, borrowed)))
1457    }
1458
1459    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1460    // needs to record where the last transaction ends and it's the next transaction that pays the
1461    // price of the padding.
1462    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1463        let mut inner = self.inner.lock();
1464        let checkpoint = inner.writer.journal_file_checkpoint();
1465        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1466            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1467            inner.writer.pad_to_block()?;
1468            if let Some(waker) = inner.flush_waker.take() {
1469                waker.wake();
1470            }
1471        }
1472        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1473    }
1474
1475    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1476        debug_assert_not_too_long!(poll_fn(|ctx| {
1477            let mut inner = self.inner.lock();
1478            if inner.flushed_offset >= checkpoint_offset {
1479                Poll::Ready(Ok(()))
1480            } else if inner.terminate {
1481                let context = inner
1482                    .terminate_reason
1483                    .as_ref()
1484                    .map(|e| format!("Journal closed with error: {:?}", e))
1485                    .unwrap_or_else(|| "Journal closed".to_string());
1486                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1487            } else {
1488                inner.sync_waker = Some(ctx.waker().clone());
1489                Poll::Pending
1490            }
1491        }))?;
1492
1493        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1494        if needs_flush {
1495            let trace = self.trace.load(Ordering::Relaxed);
1496            if trace {
1497                info!("J: start flush device");
1498            }
1499            self.handle.get().unwrap().flush_device().await?;
1500            if trace {
1501                info!("J: end flush device");
1502            }
1503
1504            // We need to write a DidFlushDevice record at some point, but if we are in the
1505            // process of shutting down the filesystem, we want to leave the journal clean to
1506            // avoid there being log messages complaining about unwritten journal data, so we
1507            // queue it up so that the next transaction will trigger this record to be written.
1508            // If we are shutting down, that will never happen but since the DidFlushDevice
1509            // message is purely advisory (it reduces the number of checksums we have to verify
1510            // during replay), it doesn't matter if it isn't written.
1511            {
1512                let mut inner = self.inner.lock();
1513                inner.device_flushed_offset = checkpoint_offset;
1514                inner.needs_did_flush_device = true;
1515            }
1516
1517            // Tell the allocator that we flushed the device so that it can now start using
1518            // space that was deallocated.
1519            self.objects.allocator().did_flush_device(checkpoint_offset).await;
1520            if trace {
1521                info!("J: did flush device");
1522            }
1523        }
1524
1525        Ok(())
1526    }
1527
1528    /// Returns a copy of the super-block header.
1529    pub fn super_block_header(&self) -> SuperBlockHeader {
1530        self.inner.lock().super_block_header.clone()
1531    }
1532
1533    /// Waits for there to be sufficient space in the journal.
1534    pub async fn check_journal_space(&self) -> Result<(), Error> {
1535        loop {
1536            debug_assert_not_too_long!({
1537                let inner = self.inner.lock();
1538                if inner.terminate {
1539                    // If the flush error is set, this will never make progress, since we can't
1540                    // extend the journal any more.
1541                    let context = inner
1542                        .terminate_reason
1543                        .as_ref()
1544                        .map(|e| format!("Journal closed with error: {:?}", e))
1545                        .unwrap_or_else(|| "Journal closed".to_string());
1546                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1547                }
1548                if self.objects.last_end_offset()
1549                    - inner.super_block_header.journal_checkpoint.file_offset
1550                    < inner.reclaim_size
1551                {
1552                    break Ok(());
1553                }
1554                if inner.image_builder_mode {
1555                    break Ok(());
1556                }
1557                if inner.disable_compactions {
1558                    break Err(
1559                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1560                    );
1561                }
1562                self.reclaim_event.listen()
1563            });
1564        }
1565    }
1566
1567    fn chunk_size(&self) -> u64 {
1568        CHUNK_SIZE
1569    }
1570
1571    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1572        let checkpoint_before;
1573        let checkpoint_after;
1574        {
1575            let _guard = self.writer_mutex.lock();
1576            checkpoint_before = {
1577                let mut inner = self.inner.lock();
1578                let checkpoint = inner.writer.journal_file_checkpoint();
1579                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1580                    self.objects.write_mutation(
1581                        *object_id,
1582                        mutation,
1583                        Writer(*object_id, &mut inner.writer),
1584                    );
1585                }
1586                checkpoint
1587            };
1588            let maybe_mutation =
1589                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1590                    "apply_transaction should not fail in live mode; \
1591                     filesystem will be in an inconsistent state",
1592                );
1593            checkpoint_after = {
1594                let mut inner = self.inner.lock();
1595                if let Some(mutation) = maybe_mutation {
1596                    inner
1597                        .writer
1598                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1599                        .unwrap();
1600                }
1601                for (device_range, checksums, first_write) in
1602                    transaction.take_checksums().into_iter()
1603                {
1604                    inner
1605                        .writer
1606                        .write_record(&JournalRecord::DataChecksums(
1607                            device_range,
1608                            Checksums::fletcher(checksums),
1609                            first_write,
1610                        ))
1611                        .unwrap();
1612                }
1613                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1614
1615                inner.writer.journal_file_checkpoint()
1616            };
1617        }
1618        self.objects.did_commit_transaction(
1619            transaction,
1620            &checkpoint_before,
1621            checkpoint_after.file_offset,
1622        );
1623
1624        if let Some(waker) = self.inner.lock().flush_waker.take() {
1625            waker.wake();
1626        }
1627
1628        checkpoint_before.file_offset
1629    }
1630
1631    /// This task will flush journal data to the device when there is data that needs flushing, and
1632    /// trigger compactions when short of journal space.  It will return after the terminate method
1633    /// has been called, or an error is encountered with either flushing or compaction.
1634    pub async fn flush_task(self: Arc<Self>) {
1635        let mut flush_fut = None;
1636        let mut compact_fut = None;
1637        let mut flush_error = false;
1638        poll_fn(|ctx| loop {
1639            {
1640                let mut inner = self.inner.lock();
1641                if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1642                    let flushable = inner.writer.flushable_bytes();
1643                    if flushable > 0 {
1644                        flush_fut = Some(self.flush(flushable).boxed());
1645                    }
1646                }
1647                if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1648                    return Poll::Ready(());
1649                }
1650                // The / 2 is here because after compacting, we cannot reclaim the space until the
1651                // _next_ time we flush the device since the super-block is not guaranteed to
1652                // persist until then.
1653                if compact_fut.is_none()
1654                    && !inner.terminate
1655                    && !inner.disable_compactions
1656                    && !inner.image_builder_mode
1657                    && self.objects.last_end_offset()
1658                        - inner.super_block_header.journal_checkpoint.file_offset
1659                        > inner.reclaim_size / 2
1660                {
1661                    compact_fut = Some(self.compact().boxed());
1662                    inner.compaction_running = true;
1663                }
1664                inner.flush_waker = Some(ctx.waker().clone());
1665            }
1666            let mut pending = true;
1667            if let Some(fut) = flush_fut.as_mut() {
1668                if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1669                    if let Err(e) = result {
1670                        self.inner.lock().terminate(Some(e.context("Flush error")));
1671                        self.reclaim_event.notify(usize::MAX);
1672                        flush_error = true;
1673                    }
1674                    flush_fut = None;
1675                    pending = false;
1676                }
1677            }
1678            if let Some(fut) = compact_fut.as_mut() {
1679                if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1680                    let mut inner = self.inner.lock();
1681                    if let Err(e) = result {
1682                        inner.terminate(Some(e.context("Compaction error")));
1683                    }
1684                    compact_fut = None;
1685                    inner.compaction_running = false;
1686                    self.reclaim_event.notify(usize::MAX);
1687                    pending = false;
1688                }
1689            }
1690            if pending {
1691                return Poll::Pending;
1692            }
1693        })
1694        .await;
1695    }
1696
1697    async fn flush(&self, amount: usize) -> Result<(), Error> {
1698        let handle = self.handle.get().unwrap();
1699        let mut buf = handle.allocate_buffer(amount).await;
1700        let offset = self.inner.lock().writer.take_flushable(buf.as_mut());
1701        let len = buf.len() as u64;
1702        self.handle.get().unwrap().overwrite(offset, buf.as_mut(), false).await?;
1703        let mut inner = self.inner.lock();
1704        if let Some(waker) = inner.sync_waker.take() {
1705            waker.wake();
1706        }
1707        inner.flushed_offset = offset + len;
1708        inner.valid_to = inner.flushed_offset;
1709        Ok(())
1710    }
1711
1712    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1713    /// fxfs.Debug.
1714    #[trace]
1715    pub async fn compact(&self) -> Result<(), Error> {
1716        let trace = self.trace.load(Ordering::Relaxed);
1717        debug!("Compaction starting");
1718        if trace {
1719            info!("J: start compaction");
1720        }
1721        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1722        self.inner.lock().super_block_header.earliest_version = earliest_version;
1723        self.write_super_block().await.context("Failed to write superblock")?;
1724        if trace {
1725            info!("J: end compaction");
1726        }
1727        debug!("Compaction finished");
1728        Ok(())
1729    }
1730
1731    pub async fn stop_compactions(&self) {
1732        loop {
1733            debug_assert_not_too_long!({
1734                let mut inner = self.inner.lock();
1735                inner.disable_compactions = true;
1736                if !inner.compaction_running {
1737                    return;
1738                }
1739                self.reclaim_event.listen()
1740            });
1741        }
1742    }
1743
1744    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1745    /// journal when queried.
1746    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1747        let this = Arc::downgrade(self);
1748        parent.record_lazy_child(name, move || {
1749            let this_clone = this.clone();
1750            async move {
1751                let inspector = fuchsia_inspect::Inspector::default();
1752                if let Some(this) = this_clone.upgrade() {
1753                    let (journal_min, journal_max, journal_reclaim_size) = {
1754                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1755                        let inner = this.inner.lock();
1756                        (
1757                            round_down(
1758                                inner.super_block_header.journal_checkpoint.file_offset,
1759                                BLOCK_SIZE,
1760                            ),
1761                            inner.flushed_offset,
1762                            inner.reclaim_size,
1763                        )
1764                    };
1765                    let root = inspector.root();
1766                    root.record_uint("journal_min_offset", journal_min);
1767                    root.record_uint("journal_max_offset", journal_max);
1768                    root.record_uint("journal_size", journal_max - journal_min);
1769                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1770
1771                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1772                    if let Some(x) = round_div(
1773                        100 * (journal_max - journal_min),
1774                        this.objects.allocator().get_disk_bytes(),
1775                    ) {
1776                        root.record_uint("journal_size_to_disk_size_percent", x);
1777                    }
1778                }
1779                Ok(inspector)
1780            }
1781            .boxed()
1782        });
1783    }
1784
1785    /// Terminate all journal activity.
1786    pub fn terminate(&self) {
1787        self.inner.lock().terminate(/*reason*/ None);
1788        self.reclaim_event.notify(usize::MAX);
1789    }
1790}
1791
1792/// Wrapper to allow records to be written to the journal.
1793pub struct Writer<'a>(u64, &'a mut JournalWriter);
1794
1795impl Writer<'_> {
1796    pub fn write(&mut self, mutation: Mutation) {
1797        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1798    }
1799}
1800
1801#[cfg(test)]
1802mod tests {
1803    use crate::filesystem::{FxFilesystem, SyncOptions};
1804    use crate::fsck::fsck;
1805    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1806    use crate::object_store::directory::Directory;
1807    use crate::object_store::transaction::Options;
1808    use crate::object_store::volume::root_volume;
1809    use crate::object_store::{lock_keys, HandleOptions, LockKey, ObjectStore, NO_OWNER};
1810    use fuchsia_async as fasync;
1811    use fuchsia_async::MonotonicDuration;
1812    use storage_device::fake_device::FakeDevice;
1813    use storage_device::DeviceHolder;
1814
1815    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1816
1817    #[fuchsia::test]
1818    async fn test_replay() {
1819        const TEST_DATA: &[u8] = b"hello";
1820
1821        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1822
1823        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1824
1825        let object_id = {
1826            let root_store = fs.root_store();
1827            let root_directory =
1828                Directory::open(&root_store, root_store.root_directory_object_id())
1829                    .await
1830                    .expect("open failed");
1831            let mut transaction = fs
1832                .clone()
1833                .new_transaction(
1834                    lock_keys![LockKey::object(
1835                        root_store.store_object_id(),
1836                        root_store.root_directory_object_id(),
1837                    )],
1838                    Options::default(),
1839                )
1840                .await
1841                .expect("new_transaction failed");
1842            let handle = root_directory
1843                .create_child_file(&mut transaction, "test")
1844                .await
1845                .expect("create_child_file failed");
1846
1847            transaction.commit().await.expect("commit failed");
1848            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1849            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1850            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1851            // As this is the first sync, this will actually trigger a new super-block, but normally
1852            // this would not be the case.
1853            fs.sync(SyncOptions::default()).await.expect("sync failed");
1854            handle.object_id()
1855        };
1856
1857        {
1858            fs.close().await.expect("Close failed");
1859            let device = fs.take_device().await;
1860            device.reopen(false);
1861            let fs = FxFilesystem::open(device).await.expect("open failed");
1862            let handle = ObjectStore::open_object(
1863                &fs.root_store(),
1864                object_id,
1865                HandleOptions::default(),
1866                None,
1867            )
1868            .await
1869            .expect("open_object failed");
1870            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1871            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1872            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1873            fsck(fs.clone()).await.expect("fsck failed");
1874            fs.close().await.expect("Close failed");
1875        }
1876    }
1877
1878    #[fuchsia::test]
1879    async fn test_reset() {
1880        const TEST_DATA: &[u8] = b"hello";
1881
1882        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
1883
1884        let mut object_ids = Vec::new();
1885
1886        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1887        {
1888            let root_store = fs.root_store();
1889            let root_directory =
1890                Directory::open(&root_store, root_store.root_directory_object_id())
1891                    .await
1892                    .expect("open failed");
1893            let mut transaction = fs
1894                .clone()
1895                .new_transaction(
1896                    lock_keys![LockKey::object(
1897                        root_store.store_object_id(),
1898                        root_store.root_directory_object_id(),
1899                    )],
1900                    Options::default(),
1901                )
1902                .await
1903                .expect("new_transaction failed");
1904            let handle = root_directory
1905                .create_child_file(&mut transaction, "test")
1906                .await
1907                .expect("create_child_file failed");
1908            transaction.commit().await.expect("commit failed");
1909            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1910            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1911            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1912            fs.sync(SyncOptions::default()).await.expect("sync failed");
1913            object_ids.push(handle.object_id());
1914
1915            // Create a lot of objects but don't sync at the end. This should leave the filesystem
1916            // with a half finished transaction that cannot be replayed.
1917            for i in 0..1000 {
1918                let mut transaction = fs
1919                    .clone()
1920                    .new_transaction(
1921                        lock_keys![LockKey::object(
1922                            root_store.store_object_id(),
1923                            root_store.root_directory_object_id(),
1924                        )],
1925                        Options::default(),
1926                    )
1927                    .await
1928                    .expect("new_transaction failed");
1929                let handle = root_directory
1930                    .create_child_file(&mut transaction, &format!("{}", i))
1931                    .await
1932                    .expect("create_child_file failed");
1933                transaction.commit().await.expect("commit failed");
1934                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1935                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1936                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1937                object_ids.push(handle.object_id());
1938            }
1939        }
1940        fs.close().await.expect("fs close failed");
1941        let device = fs.take_device().await;
1942        device.reopen(false);
1943        let fs = FxFilesystem::open(device).await.expect("open failed");
1944        fsck(fs.clone()).await.expect("fsck failed");
1945        {
1946            let root_store = fs.root_store();
1947            // Check the first two objects which should exist.
1948            for &object_id in &object_ids[0..1] {
1949                let handle = ObjectStore::open_object(
1950                    &root_store,
1951                    object_id,
1952                    HandleOptions::default(),
1953                    None,
1954                )
1955                .await
1956                .expect("open_object failed");
1957                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1958                assert_eq!(
1959                    handle.read(0, buf.as_mut()).await.expect("read failed"),
1960                    TEST_DATA.len()
1961                );
1962                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1963            }
1964
1965            // Write one more object and sync.
1966            let root_directory =
1967                Directory::open(&root_store, root_store.root_directory_object_id())
1968                    .await
1969                    .expect("open failed");
1970            let mut transaction = fs
1971                .clone()
1972                .new_transaction(
1973                    lock_keys![LockKey::object(
1974                        root_store.store_object_id(),
1975                        root_store.root_directory_object_id(),
1976                    )],
1977                    Options::default(),
1978                )
1979                .await
1980                .expect("new_transaction failed");
1981            let handle = root_directory
1982                .create_child_file(&mut transaction, "test2")
1983                .await
1984                .expect("create_child_file failed");
1985            transaction.commit().await.expect("commit failed");
1986            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1987            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1988            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1989            fs.sync(SyncOptions::default()).await.expect("sync failed");
1990            object_ids.push(handle.object_id());
1991        }
1992
1993        fs.close().await.expect("close failed");
1994        let device = fs.take_device().await;
1995        device.reopen(false);
1996        let fs = FxFilesystem::open(device).await.expect("open failed");
1997        {
1998            fsck(fs.clone()).await.expect("fsck failed");
1999
2000            // Check the first two and the last objects.
2001            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2002                let handle = ObjectStore::open_object(
2003                    &fs.root_store(),
2004                    object_id,
2005                    HandleOptions::default(),
2006                    None,
2007                )
2008                .await
2009                .unwrap_or_else(|e| {
2010                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2011                });
2012                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2013                assert_eq!(
2014                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2015                    TEST_DATA.len()
2016                );
2017                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2018            }
2019        }
2020        fs.close().await.expect("close failed");
2021    }
2022
2023    #[fuchsia::test]
2024    async fn test_discard() {
2025        let device = {
2026            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2027            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2028            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2029
2030            let store =
2031                root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2032            let root_directory = Directory::open(&store, store.root_directory_object_id())
2033                .await
2034                .expect("open failed");
2035
2036            // Create enough data so that another journal extent is used.
2037            let mut i = 0;
2038            loop {
2039                let mut transaction = fs
2040                    .clone()
2041                    .new_transaction(
2042                        lock_keys![LockKey::object(
2043                            store.store_object_id(),
2044                            store.root_directory_object_id()
2045                        )],
2046                        Options::default(),
2047                    )
2048                    .await
2049                    .expect("new_transaction failed");
2050                root_directory
2051                    .create_child_file(&mut transaction, &format!("a {i}"))
2052                    .await
2053                    .expect("create_child_file failed");
2054                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2055                    break;
2056                }
2057                i += 1;
2058            }
2059
2060            // Compact and then disable compactions.
2061            fs.journal().compact().await.expect("compact failed");
2062            fs.journal().stop_compactions().await;
2063
2064            // Keep going until we need another journal extent.
2065            let mut i = 0;
2066            loop {
2067                let mut transaction = fs
2068                    .clone()
2069                    .new_transaction(
2070                        lock_keys![LockKey::object(
2071                            store.store_object_id(),
2072                            store.root_directory_object_id()
2073                        )],
2074                        Options::default(),
2075                    )
2076                    .await
2077                    .expect("new_transaction failed");
2078                root_directory
2079                    .create_child_file(&mut transaction, &format!("b {i}"))
2080                    .await
2081                    .expect("create_child_file failed");
2082                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2083                    break;
2084                }
2085                i += 1;
2086            }
2087
2088            // Allow the journal to flush, but we don't want to sync.
2089            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2090            // Because we're not gracefully closing the filesystem, a Discard record will be
2091            // emitted.
2092            fs.device().snapshot().expect("snapshot failed")
2093        };
2094
2095        let fs = FxFilesystem::open(device).await.expect("open failed");
2096
2097        {
2098            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2099
2100            let store = root_volume.volume("test", NO_OWNER, None).await.expect("volume failed");
2101
2102            let root_directory = Directory::open(&store, store.root_directory_object_id())
2103                .await
2104                .expect("open failed");
2105
2106            // Write one more transaction.
2107            let mut transaction = fs
2108                .clone()
2109                .new_transaction(
2110                    lock_keys![LockKey::object(
2111                        store.store_object_id(),
2112                        store.root_directory_object_id()
2113                    )],
2114                    Options::default(),
2115                )
2116                .await
2117                .expect("new_transaction failed");
2118            root_directory
2119                .create_child_file(&mut transaction, &format!("d"))
2120                .await
2121                .expect("create_child_file failed");
2122            transaction.commit().await.expect("commit failed");
2123        }
2124
2125        fs.close().await.expect("close failed");
2126        let device = fs.take_device().await;
2127        device.reopen(false);
2128
2129        let fs = FxFilesystem::open(device).await.expect("open failed");
2130        fsck(fs.clone()).await.expect("fsck failed");
2131        fs.close().await.expect("close failed");
2132    }
2133}
2134
2135#[cfg(fuzz)]
2136mod fuzz {
2137    use fuzz::fuzz;
2138
2139    #[fuzz]
2140    fn fuzz_journal_bytes(input: Vec<u8>) {
2141        use crate::filesystem::FxFilesystem;
2142        use fuchsia_async as fasync;
2143        use std::io::Write;
2144        use storage_device::fake_device::FakeDevice;
2145        use storage_device::DeviceHolder;
2146
2147        fasync::SendExecutor::new(4).run(async move {
2148            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2149            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2150            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2151            fs.close().await.expect("close failed");
2152            let device = fs.take_device().await;
2153            device.reopen(false);
2154            if let Ok(fs) = FxFilesystem::open(device).await {
2155                // `close()` can fail if there were objects to be tombstoned. If the said object is
2156                // corrupted, there will be an error when we compact the journal.
2157                let _ = fs.close().await;
2158            }
2159        });
2160    }
2161
2162    #[fuzz]
2163    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2164        use crate::filesystem::FxFilesystem;
2165        use fuchsia_async as fasync;
2166        use storage_device::fake_device::FakeDevice;
2167        use storage_device::DeviceHolder;
2168
2169        fasync::SendExecutor::new(4).run(async move {
2170            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2171            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2172            {
2173                let mut inner = fs.journal().inner.lock();
2174                for record in &input {
2175                    let _ = inner.writer.write_record(record);
2176                }
2177            }
2178            fs.close().await.expect("close failed");
2179            let device = fs.take_device().await;
2180            device.reopen(false);
2181            if let Ok(fs) = FxFilesystem::open(device).await {
2182                // `close()` can fail if there were objects to be tombstoned. If the said object is
2183                // corrupted, there will be an error when we compact the journal.
2184                let _ = fs.close().await;
2185            }
2186        });
2187    }
2188}