1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::debug_assert_not_too_long;
27use crate::errors::FxfsError;
28use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
29use crate::log::*;
30use crate::lsm_tree::cache::NullCache;
31use crate::lsm_tree::types::Layer;
32use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
33use crate::object_store::allocator::Allocator;
34use crate::object_store::extent_record::{
35 ExtentKey, ExtentMode, ExtentValue, DEFAULT_DATA_ATTRIBUTE_ID,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 lock_keys, AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43,
49 MutationV46, ObjectStoreMutation, Options, Transaction, TxnMutation,
50 TRANSACTION_MAX_JOURNAL_USAGE,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, Item, ItemRef, NewChildStoreOptions,
54 ObjectStore, INVALID_OBJECT_ID,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, LATEST_VERSION};
59use anyhow::{anyhow, bail, ensure, Context, Error};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::future::poll_fn;
64use futures::FutureExt as _;
65use once_cell::sync::OnceCell;
66use rand::Rng;
67use rustc_hash::FxHashMap as HashMap;
68use serde::{Deserialize, Serialize};
69use static_assertions::const_assert;
70use std::clone::Clone;
71use std::collections::HashSet;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::Arc;
75use std::task::{Poll, Waker};
76
77pub const BLOCK_SIZE: u64 = 4096;
79
80const CHUNK_SIZE: u64 = 131_072;
82const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
83
84pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
86
87pub const RESERVED_SPACE: u64 = 1_048_576;
90
91const RESET_XOR: u64 = 0xffffffffffffffff;
96
97pub type JournalCheckpoint = JournalCheckpointV32;
101
102#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
103pub struct JournalCheckpointV32 {
104 pub file_offset: u64,
105
106 pub checksum: Checksum,
109
110 pub version: Version,
114}
115
116pub type JournalRecord = JournalRecordV46;
117
118#[allow(clippy::large_enum_variant)]
119#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
120#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
121pub enum JournalRecordV46 {
122 EndBlock,
124 Mutation { object_id: u64, mutation: MutationV46 },
127 Commit,
129 Discard(u64),
131 DidFlushDevice(u64),
141 DataChecksums(Range<u64>, ChecksumsV38, bool),
149}
150
151#[allow(clippy::large_enum_variant)]
152#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
153#[migrate_to_version(JournalRecordV46)]
154pub enum JournalRecordV43 {
155 EndBlock,
156 Mutation { object_id: u64, mutation: MutationV43 },
157 Commit,
158 Discard(u64),
159 DidFlushDevice(u64),
160 DataChecksums(Range<u64>, ChecksumsV38, bool),
161}
162
163#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
164#[migrate_to_version(JournalRecordV43)]
165pub enum JournalRecordV42 {
166 EndBlock,
167 Mutation { object_id: u64, mutation: MutationV41 },
168 Commit,
169 Discard(u64),
170 DidFlushDevice(u64),
171 DataChecksums(Range<u64>, ChecksumsV38, bool),
172}
173
174#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
175pub enum JournalRecordV41 {
176 EndBlock,
177 Mutation { object_id: u64, mutation: MutationV41 },
178 Commit,
179 Discard(u64),
180 DidFlushDevice(u64),
181 DataChecksums(Range<u64>, ChecksumsV38),
182}
183
184impl From<JournalRecordV41> for JournalRecordV42 {
185 fn from(record: JournalRecordV41) -> Self {
186 match record {
187 JournalRecordV41::EndBlock => Self::EndBlock,
188 JournalRecordV41::Mutation { object_id, mutation } => {
189 Self::Mutation { object_id, mutation: mutation.into() }
190 }
191 JournalRecordV41::Commit => Self::Commit,
192 JournalRecordV41::Discard(offset) => Self::Discard(offset),
193 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
194 JournalRecordV41::DataChecksums(range, sums) => {
195 Self::DataChecksums(range, sums, true)
198 }
199 }
200 }
201}
202
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(JournalRecordV41)]
205pub enum JournalRecordV40 {
206 EndBlock,
207 Mutation { object_id: u64, mutation: MutationV40 },
208 Commit,
209 Discard(u64),
210 DidFlushDevice(u64),
211 DataChecksums(Range<u64>, ChecksumsV38),
212}
213
214pub(super) fn journal_handle_options() -> HandleOptions {
215 HandleOptions { skip_journal_checks: true, ..Default::default() }
216}
217
218pub struct Journal {
224 objects: Arc<ObjectManager>,
225 handle: OnceCell<DataObjectHandle<ObjectStore>>,
226 super_block_manager: SuperBlockManager,
227 inner: Mutex<Inner>,
228 writer_mutex: Mutex<()>,
229 sync_mutex: futures::lock::Mutex<()>,
230 trace: AtomicBool,
231
232 reclaim_event: Event,
234}
235
236struct Inner {
237 super_block_header: SuperBlockHeader,
238
239 zero_offset: Option<u64>,
241
242 device_flushed_offset: u64,
244
245 needs_did_flush_device: bool,
247
248 writer: JournalWriter,
250
251 output_reset_version: bool,
254
255 flush_waker: Option<Waker>,
257
258 terminate: bool,
260
261 terminate_reason: Option<Error>,
263
264 disable_compactions: bool,
266
267 compaction_running: bool,
269
270 sync_waker: Option<Waker>,
272
273 flushed_offset: u64,
275
276 valid_to: u64,
281
282 discard_offset: Option<u64>,
286
287 reclaim_size: u64,
291
292 image_builder_mode: bool,
293}
294
295impl Inner {
296 fn terminate(&mut self, reason: Option<Error>) {
297 self.terminate = true;
298
299 if let Some(err) = reason {
300 error!(error:? = err; "Terminating journal");
301 if let Some(prev_err) = self.terminate_reason.as_ref() {
303 error!(error:? = prev_err; "Journal previously terminated");
304 } else {
305 self.terminate_reason = Some(err);
306 }
307 }
308
309 if let Some(waker) = self.flush_waker.take() {
310 waker.wake();
311 }
312 if let Some(waker) = self.sync_waker.take() {
313 waker.wake();
314 }
315 }
316}
317
318pub struct JournalOptions {
319 pub reclaim_size: u64,
323}
324
325impl Default for JournalOptions {
326 fn default() -> Self {
327 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE }
328 }
329}
330
331struct JournaledTransactions {
332 transactions: Vec<JournaledTransaction>,
333 device_flushed_offset: u64,
334}
335
336#[derive(Debug, Default)]
337pub struct JournaledTransaction {
338 pub checkpoint: JournalCheckpoint,
339 pub root_parent_mutations: Vec<Mutation>,
340 pub root_mutations: Vec<Mutation>,
341 pub non_root_mutations: Vec<(u64, Mutation)>,
343 pub end_offset: u64,
344 pub checksums: Vec<JournaledChecksums>,
345
346 pub end_flush: Option<(u64, u64)>,
350}
351
352impl JournaledTransaction {
353 fn new(checkpoint: JournalCheckpoint) -> Self {
354 Self { checkpoint, ..Default::default() }
355 }
356}
357
358const STORE_DELETED: u64 = u64::MAX;
359
360#[derive(Debug)]
361pub struct JournaledChecksums {
362 pub device_range: Range<u64>,
363 pub checksums: Checksums,
364 pub first_write: bool,
365}
366
367pub trait JournalHandle: ReadObjectHandle {
370 fn end_offset(&self) -> Option<u64>;
374 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
378 fn discard_extents(&mut self, discard_offset: u64);
380}
381
382impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
386 fn end_offset(&self) -> Option<u64> {
387 None
388 }
389 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
390 }
392 fn discard_extents(&mut self, _discard_offset: u64) {
393 }
395}
396
397#[fxfs_trace::trace]
398impl Journal {
399 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
400 let starting_checksum = rand::thread_rng().gen_range(1..u64::MAX);
401 Journal {
402 objects: objects,
403 handle: OnceCell::new(),
404 super_block_manager: SuperBlockManager::new(),
405 inner: Mutex::new(Inner {
406 super_block_header: SuperBlockHeader::default(),
407 zero_offset: None,
408 device_flushed_offset: 0,
409 needs_did_flush_device: false,
410 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
411 output_reset_version: false,
412 flush_waker: None,
413 terminate: false,
414 terminate_reason: None,
415 disable_compactions: false,
416 compaction_running: false,
417 sync_waker: None,
418 flushed_offset: 0,
419 valid_to: 0,
420 discard_offset: None,
421 reclaim_size: options.reclaim_size,
422 image_builder_mode: false,
423 }),
424 writer_mutex: Mutex::new(()),
425 sync_mutex: futures::lock::Mutex::new(()),
426 trace: AtomicBool::new(false),
427 reclaim_event: Event::new(),
428 }
429 }
430
431 pub fn set_trace(&self, trace: bool) {
432 let old_value = self.trace.swap(trace, Ordering::Relaxed);
433 if trace != old_value {
434 info!(trace; "J: trace");
435 }
436 }
437
438 pub fn set_image_builder_mode(&self, enabled: bool) {
439 self.inner.lock().image_builder_mode = enabled;
440 }
441
442 pub fn image_builder_mode(&self) -> bool {
443 self.inner.lock().image_builder_mode
444 }
445
446 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
450 match mutation {
451 Mutation::ObjectStore(ObjectStoreMutation {
452 item:
453 Item {
454 key:
455 ObjectKey {
456 data:
457 ObjectKeyData::Attribute(
458 _,
459 AttributeKey::Extent(ExtentKey { range }),
460 ),
461 ..
462 },
463 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
464 ..
465 },
466 ..
467 }) => {
468 if range.is_empty() || !range.is_aligned(block_size) {
469 return false;
470 }
471 let len = range.length().unwrap();
472 if let ExtentMode::Cow(checksums) = mode {
473 if checksums.len() > 0 {
474 if len % checksums.len() as u64 != 0 {
475 return false;
476 }
477 if (len / checksums.len() as u64) % block_size != 0 {
478 return false;
479 }
480 }
481 }
482 if *device_offset % block_size != 0
483 || *device_offset >= device_size
484 || device_size - *device_offset < len
485 {
486 return false;
487 }
488 }
489 Mutation::ObjectStore(_) => {}
490 Mutation::EncryptedObjectStore(_) => {}
491 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
492 return !device_range.is_empty()
493 && *owner_object_id != INVALID_OBJECT_ID
494 && device_range.end <= device_size;
495 }
496 Mutation::Allocator(AllocatorMutation::Deallocate {
497 device_range,
498 owner_object_id,
499 }) => {
500 return !device_range.is_empty()
501 && *owner_object_id != INVALID_OBJECT_ID
502 && device_range.end <= device_size;
503 }
504 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
505 return *owner_object_id != INVALID_OBJECT_ID;
506 }
507 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
508 return *owner_object_id != INVALID_OBJECT_ID;
509 }
510 Mutation::BeginFlush => {}
511 Mutation::EndFlush => {}
512 Mutation::DeleteVolume => {}
513 Mutation::UpdateBorrowed(_) => {}
514 Mutation::UpdateMutationsKey(_) => {}
515 Mutation::CreateInternalDir(owner_object_id) => {
516 return *owner_object_id != INVALID_OBJECT_ID;
517 }
518 }
519 true
520 }
521
522 fn update_checksum_list(
524 &self,
525 journal_offset: u64,
526 mutation: &Mutation,
527 checksum_list: &mut ChecksumList,
528 ) -> Result<(), Error> {
529 match mutation {
530 Mutation::ObjectStore(_) => {}
531 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
532 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
533 }
534 _ => {}
535 }
536 Ok(())
537 }
538
539 #[trace]
541 pub async fn replay(
542 &self,
543 filesystem: Arc<FxFilesystem>,
544 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
545 ) -> Result<(), Error> {
546 let block_size = filesystem.block_size();
547
548 let (super_block, root_parent) =
549 self.super_block_manager.load(filesystem.device(), block_size).await?;
550
551 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
552
553 self.objects.set_root_parent_store(root_parent.clone());
554 let allocator =
555 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
556 if let Some(on_new_allocator) = on_new_allocator {
557 on_new_allocator(allocator.clone());
558 }
559 self.objects.set_allocator(allocator.clone());
560 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
561 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
562 {
563 let mut inner = self.inner.lock();
564 inner.super_block_header = super_block.clone();
565 }
566
567 let device = filesystem.device();
568
569 let mut handle;
570 {
571 let root_parent_layer = root_parent.tree().mutable_layer();
572 let mut iter = root_parent_layer
573 .seek(Bound::Included(&ObjectKey::attribute(
574 super_block.journal_object_id,
575 DEFAULT_DATA_ATTRIBUTE_ID,
576 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
577 super_block.journal_checkpoint.file_offset,
578 BLOCK_SIZE,
579 ))),
580 )))
581 .await
582 .context("Failed to seek root parent store")?;
583 let start_offset = if let Some(ItemRef {
584 key:
585 ObjectKey {
586 data:
587 ObjectKeyData::Attribute(
588 DEFAULT_DATA_ATTRIBUTE_ID,
589 AttributeKey::Extent(ExtentKey { range }),
590 ),
591 ..
592 },
593 ..
594 }) = iter.get()
595 {
596 range.start
597 } else {
598 0
599 };
600 handle = BootstrapObjectHandle::new_with_start_offset(
601 super_block.journal_object_id,
602 device.clone(),
603 start_offset,
604 );
605 while let Some(item) = iter.get() {
606 if !match item.into() {
607 Some((
608 object_id,
609 DEFAULT_DATA_ATTRIBUTE_ID,
610 ExtentKey { range },
611 ExtentValue::Some { device_offset, .. },
612 )) if object_id == super_block.journal_object_id => {
613 if let Some(end_offset) = handle.end_offset() {
614 if range.start != end_offset {
615 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
616 "Unexpected journal extent {:?}, expected start: {}",
617 item, end_offset
618 )));
619 }
620 }
621 handle.push_extent(
622 0, *device_offset
624 ..*device_offset + range.length().context("Invalid extent")?,
625 );
626 true
627 }
628 _ => false,
629 } {
630 break;
631 }
632 iter.advance().await.context("Failed to advance root parent store iterator")?;
633 }
634 }
635
636 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
637 let JournaledTransactions { mut transactions, device_flushed_offset } = self
638 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
639 .await
640 .context("Reading transactions for replay")?;
641
642 let mut checksum_list = ChecksumList::new(device_flushed_offset);
644 let mut valid_to = reader.journal_file_checkpoint().file_offset;
645 let device_size = device.size();
646 'bad_replay: for JournaledTransaction {
647 checkpoint,
648 root_parent_mutations,
649 root_mutations,
650 non_root_mutations,
651 checksums,
652 ..
653 } in &transactions
654 {
655 for JournaledChecksums { device_range, checksums, first_write } in checksums {
656 checksum_list
657 .push(
658 checkpoint.file_offset,
659 device_range.clone(),
660 checksums.maybe_as_ref().context("Malformed checksums")?,
661 *first_write,
662 )
663 .context("Pushing journal checksum records to checksum list")?;
664 }
665 for mutation in root_parent_mutations
666 .iter()
667 .chain(root_mutations)
668 .chain(non_root_mutations.iter().map(|(_, m)| m))
669 {
670 if !self.validate_mutation(mutation, block_size, device_size) {
671 info!(mutation:?; "Stopping replay at bad mutation");
672 valid_to = checkpoint.file_offset;
673 break 'bad_replay;
674 }
675 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
676 }
677 }
678
679 let valid_to = checksum_list
681 .verify(device.as_ref(), valid_to)
682 .await
683 .context("Failed to validate checksums")?;
684
685 let mut last_checkpoint = reader.journal_file_checkpoint();
688 let mut journal_offsets = super_block.journal_file_offsets.clone();
689
690 for (index, JournaledTransaction { checkpoint, root_parent_mutations, end_flush, .. }) in
693 transactions.iter_mut().enumerate()
694 {
695 if checkpoint.file_offset >= valid_to {
696 last_checkpoint = checkpoint.clone();
697
698 transactions.truncate(index);
700 break;
701 }
702
703 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
704 for mutation in root_parent_mutations.drain(..) {
705 self.objects
706 .apply_mutation(
707 super_block.root_parent_store_object_id,
708 mutation,
709 &context,
710 AssocObj::None,
711 )
712 .context("Failed to replay root parent store mutations")?;
713 }
714
715 if let Some((object_id, journal_offset)) = end_flush {
716 journal_offsets.insert(*object_id, *journal_offset);
717 }
718 }
719
720 let root_store = ObjectStore::open(
722 &root_parent,
723 super_block.root_store_object_id,
724 Box::new(NullCache {}),
725 )
726 .await
727 .context("Unable to open root store")?;
728
729 ensure!(
730 !root_store.is_encrypted(),
731 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
732 );
733 self.objects.set_root_store(root_store);
734
735 let root_store_offset =
736 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
737
738 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
740 if checkpoint.file_offset < root_store_offset {
741 continue;
742 }
743
744 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
745 for mutation in root_mutations.drain(..) {
746 self.objects
747 .apply_mutation(
748 super_block.root_store_object_id,
749 mutation,
750 &context,
751 AssocObj::None,
752 )
753 .context("Failed to replay root store mutations")?;
754 }
755 }
756
757 allocator.open().await.context("Failed to open allocator")?;
759
760 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
762 {
763 self.objects
764 .replay_mutations(
765 non_root_mutations,
766 &journal_offsets,
767 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
768 end_offset,
769 )
770 .await
771 .context("Failed to replay mutations")?;
772 }
773
774 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
775
776 let discarded_to =
777 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
778 Some(reader.journal_file_checkpoint().file_offset)
779 } else {
780 None
781 };
782
783 {
785 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
786 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
787 "journal replay cut short; journal finishes at {}, but super-block was \
788 written at {}",
789 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
790 )));
791 }
792 let handle = ObjectStore::open_object(
793 &root_parent,
794 super_block.journal_object_id,
795 journal_handle_options(),
796 None,
797 )
798 .await
799 .with_context(|| {
800 format!(
801 "Failed to open journal file (object id: {})",
802 super_block.journal_object_id
803 )
804 })?;
805 let _ = self.handle.set(handle);
806 let mut inner = self.inner.lock();
807 reader.skip_to_end_of_block();
808 let mut writer_checkpoint = reader.journal_file_checkpoint();
809
810 std::mem::drop(reader);
812
813 writer_checkpoint.checksum ^= RESET_XOR;
815 writer_checkpoint.version = LATEST_VERSION;
816 inner.flushed_offset = writer_checkpoint.file_offset;
817
818 inner.device_flushed_offset = inner.flushed_offset;
820
821 inner.writer.seek(writer_checkpoint);
822 inner.output_reset_version = true;
823 inner.valid_to = last_checkpoint.file_offset;
824 if last_checkpoint.file_offset < inner.flushed_offset {
825 inner.discard_offset = Some(last_checkpoint.file_offset);
826 }
827 }
828
829 self.objects
830 .on_replay_complete()
831 .await
832 .context("Failed to complete replay for object manager")?;
833
834 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
835 Ok(())
836 }
837
838 async fn read_transactions(
839 &self,
840 reader: &mut JournalReader,
841 end_offset: Option<u64>,
842 object_id_filter: u64,
843 ) -> Result<JournaledTransactions, Error> {
844 let mut transactions = Vec::new();
845 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
846 let super_block = &self.inner.lock().super_block_header;
847 (
848 super_block.super_block_journal_file_offset,
849 super_block.root_parent_store_object_id,
850 super_block.root_store_object_id,
851 )
852 };
853 let mut current_transaction = None;
854 let mut begin_flush_offsets = HashMap::default();
855 let mut stores_deleted = HashSet::new();
856 loop {
857 let checkpoint = reader.journal_file_checkpoint();
859 if let Some(end_offset) = end_offset {
860 if checkpoint.file_offset >= end_offset {
861 break;
862 }
863 }
864 let result =
865 reader.deserialize().await.context("Failed to deserialize journal record")?;
866 match result {
867 ReadResult::Reset(_) => {
868 if current_transaction.is_some() {
869 current_transaction = None;
870 transactions.pop();
871 }
872 let offset = reader.journal_file_checkpoint().file_offset;
873 if offset > device_flushed_offset {
874 device_flushed_offset = offset;
875 }
876 }
877 ReadResult::Some(record) => {
878 match record {
879 JournalRecord::EndBlock => {
880 reader.skip_to_end_of_block();
881 }
882 JournalRecord::Mutation { object_id, mutation } => {
883 let current_transaction = match current_transaction.as_mut() {
884 None => {
885 transactions.push(JournaledTransaction::new(checkpoint));
886 current_transaction = transactions.last_mut();
887 current_transaction.as_mut().unwrap()
888 }
889 Some(transaction) => transaction,
890 };
891
892 if stores_deleted.contains(&object_id) {
893 bail!(anyhow!(FxfsError::Inconsistent)
894 .context("Encountered mutations for deleted store"));
895 }
896
897 match &mutation {
898 Mutation::BeginFlush => {
899 begin_flush_offsets.insert(
900 object_id,
901 current_transaction.checkpoint.file_offset,
902 );
903 }
904 Mutation::EndFlush => {
905 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
906 if current_transaction
909 .end_flush
910 .replace((object_id, offset + 1))
911 .is_some()
912 {
913 bail!(anyhow!(FxfsError::Inconsistent).context(
914 "Multiple EndFlush/DeleteVolume mutations in a \
915 single transaction"
916 ));
917 }
918 }
919 }
920 Mutation::DeleteVolume => {
921 if current_transaction
922 .end_flush
923 .replace((object_id, STORE_DELETED))
924 .is_some()
925 {
926 bail!(anyhow!(FxfsError::Inconsistent).context(
927 "Multiple EndFlush/DeleteVolume mutations in a single \
928 transaction"
929 ));
930 }
931 stores_deleted.insert(object_id);
932 }
933 _ => {}
934 }
935
936 if (object_id_filter == INVALID_OBJECT_ID
939 || object_id_filter == object_id)
940 && self.should_apply(object_id, ¤t_transaction.checkpoint)
941 {
942 if object_id == root_parent_store_object_id {
943 current_transaction.root_parent_mutations.push(mutation);
944 } else if object_id == root_store_object_id {
945 current_transaction.root_mutations.push(mutation);
946 } else {
947 current_transaction
948 .non_root_mutations
949 .push((object_id, mutation));
950 }
951 }
952 }
953 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
954 let current_transaction = match current_transaction.as_mut() {
955 None => {
956 transactions.push(JournaledTransaction::new(checkpoint));
957 current_transaction = transactions.last_mut();
958 current_transaction.as_mut().unwrap()
959 }
960 Some(transaction) => transaction,
961 };
962 current_transaction.checksums.push(JournaledChecksums {
963 device_range,
964 checksums,
965 first_write,
966 });
967 }
968 JournalRecord::Commit => {
969 if let Some(JournaledTransaction {
970 ref checkpoint,
971 ref root_parent_mutations,
972 ref mut end_offset,
973 ..
974 }) = current_transaction.take()
975 {
976 for mutation in root_parent_mutations {
977 if let Mutation::ObjectStore(ObjectStoreMutation {
981 item:
982 Item {
983 key:
984 ObjectKey {
985 object_id,
986 data:
987 ObjectKeyData::Attribute(
988 DEFAULT_DATA_ATTRIBUTE_ID,
989 AttributeKey::Extent(ExtentKey {
990 range,
991 }),
992 ),
993 ..
994 },
995 value:
996 ObjectValue::Extent(ExtentValue::Some {
997 device_offset,
998 ..
999 }),
1000 ..
1001 },
1002 ..
1003 }) = mutation
1004 {
1005 let handle = reader.handle();
1008 if *object_id != handle.object_id() {
1009 continue;
1010 }
1011 if let Some(end_offset) = handle.end_offset() {
1012 if range.start != end_offset {
1013 bail!(anyhow!(FxfsError::Inconsistent).context(
1014 format!(
1015 "Unexpected journal extent {:?} -> {}, \
1016 expected start: {}",
1017 range, device_offset, end_offset,
1018 )
1019 ));
1020 }
1021 }
1022 handle.push_extent(
1023 checkpoint.file_offset,
1024 *device_offset
1025 ..*device_offset
1026 + range.length().context("Invalid extent")?,
1027 );
1028 }
1029 }
1030 *end_offset = reader.journal_file_checkpoint().file_offset;
1031 }
1032 }
1033 JournalRecord::Discard(offset) => {
1034 if offset == 0 {
1035 bail!(anyhow!(FxfsError::Inconsistent)
1036 .context("Invalid offset for Discard"));
1037 }
1038 if let Some(transaction) = current_transaction.as_ref() {
1039 if transaction.checkpoint.file_offset < offset {
1040 continue;
1042 }
1043 }
1044 current_transaction = None;
1045 while let Some(transaction) = transactions.last() {
1046 if transaction.checkpoint.file_offset < offset {
1047 break;
1048 }
1049 transactions.pop();
1050 }
1051 reader.handle().discard_extents(offset);
1052 }
1053 JournalRecord::DidFlushDevice(offset) => {
1054 if offset > device_flushed_offset {
1055 device_flushed_offset = offset;
1056 }
1057 }
1058 }
1059 }
1060 ReadResult::ChecksumMismatch => break,
1062 }
1063 }
1064
1065 if current_transaction.is_some() {
1067 transactions.pop();
1068 }
1069
1070 Ok(JournaledTransactions { transactions, device_flushed_offset })
1071 }
1072
1073 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1076 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1082 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1083 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1084
1085 info!(device_size = filesystem.device().size(); "Formatting");
1086
1087 let checkpoint = JournalCheckpoint {
1088 version: LATEST_VERSION,
1089 ..self.inner.lock().writer.journal_file_checkpoint()
1090 };
1091
1092 let root_parent = ObjectStore::new_empty(
1093 None,
1094 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1095 filesystem.clone(),
1096 Box::new(NullCache {}),
1097 );
1098 self.objects.set_root_parent_store(root_parent.clone());
1099
1100 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1101 self.objects.set_allocator(allocator.clone());
1102 self.objects.init_metadata_reservation()?;
1103
1104 let journal_handle;
1105 let super_block_a_handle;
1106 let super_block_b_handle;
1107 let root_store;
1108 let mut transaction = filesystem
1109 .clone()
1110 .new_transaction(
1111 lock_keys![],
1112 Options { skip_journal_checks: true, ..Default::default() },
1113 )
1114 .await?;
1115 root_store = root_parent
1116 .new_child_store(
1117 &mut transaction,
1118 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1119 Box::new(NullCache {}),
1120 )
1121 .await
1122 .context("new_child_store")?;
1123 self.objects.set_root_store(root_store.clone());
1124
1125 allocator.create(&mut transaction).await?;
1126
1127 super_block_a_handle = ObjectStore::create_object_with_id(
1129 &root_store,
1130 &mut transaction,
1131 SuperBlockInstance::A.object_id(),
1132 HandleOptions::default(),
1133 None,
1134 )
1135 .await
1136 .context("create super block")?;
1137 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1138 super_block_a_handle
1139 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1140 .await
1141 .context("extend super block")?;
1142 super_block_b_handle = ObjectStore::create_object_with_id(
1143 &root_store,
1144 &mut transaction,
1145 SuperBlockInstance::B.object_id(),
1146 HandleOptions::default(),
1147 None,
1148 )
1149 .await
1150 .context("create super block")?;
1151 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1152 super_block_b_handle
1153 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1154 .await
1155 .context("extend super block")?;
1156
1157 journal_handle = ObjectStore::create_object(
1159 &root_parent,
1160 &mut transaction,
1161 journal_handle_options(),
1162 None,
1163 )
1164 .await
1165 .context("create journal")?;
1166 if !self.inner.lock().image_builder_mode {
1167 let mut file_range = 0..self.chunk_size();
1168 journal_handle
1169 .preallocate_range(&mut transaction, &mut file_range)
1170 .await
1171 .context("preallocate journal")?;
1172 if file_range.start < file_range.end {
1173 bail!("preallocate_range returned too little space");
1174 }
1175 }
1176
1177 root_store.create(&mut transaction).await?;
1179
1180 root_parent
1182 .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1183
1184 transaction.commit().await?;
1185
1186 self.inner.lock().super_block_header = SuperBlockHeader::new(
1187 root_parent.store_object_id(),
1188 root_parent.graveyard_directory_object_id(),
1189 root_store.store_object_id(),
1190 allocator.object_id(),
1191 journal_handle.object_id(),
1192 checkpoint,
1193 LATEST_VERSION,
1194 );
1195
1196 let _ = self.handle.set(journal_handle);
1198 Ok(())
1199 }
1200
1201 pub async fn allocate_journal(&self) -> Result<(), Error> {
1204 let handle = self.handle.get().unwrap();
1205 let filesystem = handle.store().filesystem();
1206 let mut transaction = filesystem
1207 .clone()
1208 .new_transaction(
1209 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1210 Options { skip_journal_checks: true, ..Default::default() },
1211 )
1212 .await?;
1213 let mut file_range = 0..self.chunk_size();
1214 self.handle
1215 .get()
1216 .unwrap()
1217 .preallocate_range(&mut transaction, &mut file_range)
1218 .await
1219 .context("preallocate journal")?;
1220 if file_range.start < file_range.end {
1221 bail!("preallocate_range returned too little space");
1222 }
1223 transaction.commit().await?;
1224 Ok(())
1225 }
1226
1227 pub async fn init_superblocks(&self) -> Result<(), Error> {
1228 for _ in 0..2 {
1230 self.write_super_block().await?;
1231 }
1232 Ok(())
1233 }
1234
1235 pub async fn read_transactions_for_object(
1241 &self,
1242 object_id: u64,
1243 ) -> Result<Vec<JournaledTransaction>, Error> {
1244 let handle = self.handle.get().expect("No journal handle");
1245 let handle = ObjectStore::open_object(
1247 handle.owner(),
1248 handle.object_id(),
1249 journal_handle_options(),
1250 None,
1251 )
1252 .await?;
1253
1254 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1255 Some(checkpoint) => checkpoint,
1256 None => return Ok(vec![]),
1257 };
1258 let mut reader = JournalReader::new(handle, &checkpoint);
1259 let end_offset = self.inner.lock().valid_to;
1262 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1263 }
1264
1265 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1267 if transaction.is_empty() {
1268 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1269 }
1270
1271 self.pre_commit(transaction).await?;
1272 Ok(self.write_and_apply_mutations(transaction))
1273 }
1274
1275 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1278 let handle;
1279
1280 let (size, zero_offset) = {
1281 let mut inner = self.inner.lock();
1282
1283 if std::mem::take(&mut inner.output_reset_version) {
1285 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1286 }
1287
1288 if let Some(discard_offset) = inner.discard_offset {
1289 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1290 inner.discard_offset = None;
1291 }
1292
1293 if inner.needs_did_flush_device {
1294 let offset = inner.device_flushed_offset;
1295 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1296 inner.needs_did_flush_device = false;
1297 }
1298
1299 handle = match self.handle.get() {
1300 None => return Ok(()),
1301 Some(x) => x,
1302 };
1303
1304 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1305
1306 let size = handle.get_size();
1307 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1308
1309 if size.is_none()
1310 && inner.zero_offset.is_none()
1311 && !self.objects.needs_borrow_for_journal(file_offset)
1312 {
1313 return Ok(());
1314 }
1315
1316 (size, inner.zero_offset)
1317 };
1318
1319 let mut transaction = handle
1320 .new_transaction_with_options(Options {
1321 skip_journal_checks: true,
1322 borrow_metadata_space: true,
1323 allocator_reservation: Some(self.objects.metadata_reservation()),
1324 txn_guard: Some(transaction.txn_guard()),
1325 ..Default::default()
1326 })
1327 .await?;
1328 if let Some(size) = size {
1329 handle
1330 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1331 .await?;
1332 }
1333 if let Some(zero_offset) = zero_offset {
1334 handle.zero(&mut transaction, 0..zero_offset).await?;
1335 }
1336
1337 self.write_and_apply_mutations(&mut transaction);
1340
1341 let mut inner = self.inner.lock();
1342
1343 if let Some(size) = size {
1346 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1347 }
1348
1349 if inner.zero_offset == zero_offset {
1350 inner.zero_offset = None;
1351 }
1352
1353 Ok(())
1354 }
1355
1356 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1360 let super_block_header = &self.inner.lock().super_block_header;
1361 let offset = super_block_header
1362 .journal_file_offsets
1363 .get(&object_id)
1364 .cloned()
1365 .unwrap_or(super_block_header.super_block_journal_file_offset);
1366 journal_file_checkpoint.file_offset >= offset
1367 }
1368
1369 async fn write_super_block(&self) -> Result<(), Error> {
1372 let root_parent_store = self.objects.root_parent_store();
1373
1374 let old_layers;
1379 let old_super_block_offset;
1380 let mut new_super_block_header;
1381 let checkpoint;
1382 let borrowed;
1383
1384 {
1385 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1386 {
1387 let _write_guard = self.writer_mutex.lock();
1388 (checkpoint, borrowed) = self.pad_to_block()?;
1389 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1390 }
1391 self.flush_device(checkpoint.file_offset)
1392 .await
1393 .context("flush failed when writing superblock")?;
1394 }
1395
1396 new_super_block_header = self.inner.lock().super_block_header.clone();
1397
1398 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1399
1400 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1401
1402 new_super_block_header.generation =
1403 new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1404 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1405 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1406 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1407 new_super_block_header.journal_file_offsets = journal_file_offsets;
1408 new_super_block_header.borrowed_metadata_space = borrowed;
1409
1410 self.super_block_manager
1411 .save(
1412 new_super_block_header.clone(),
1413 self.objects.root_parent_store().filesystem(),
1414 old_layers,
1415 )
1416 .await?;
1417 {
1418 let mut inner = self.inner.lock();
1419 inner.super_block_header = new_super_block_header;
1420 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1421 }
1422
1423 Ok(())
1424 }
1425
1426 pub async fn sync(
1433 &self,
1434 options: SyncOptions<'_>,
1435 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1436 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1437
1438 let (checkpoint, borrowed) = {
1439 if let Some(precondition) = options.precondition {
1440 if !precondition() {
1441 return Ok(None);
1442 }
1443 }
1444
1445 let _guard = self.writer_mutex.lock();
1448
1449 self.pad_to_block()?
1450 };
1451
1452 if options.flush_device {
1453 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1454 }
1455
1456 Ok(Some((checkpoint, borrowed)))
1457 }
1458
1459 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1463 let mut inner = self.inner.lock();
1464 let checkpoint = inner.writer.journal_file_checkpoint();
1465 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1466 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1467 inner.writer.pad_to_block()?;
1468 if let Some(waker) = inner.flush_waker.take() {
1469 waker.wake();
1470 }
1471 }
1472 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1473 }
1474
1475 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1476 debug_assert_not_too_long!(poll_fn(|ctx| {
1477 let mut inner = self.inner.lock();
1478 if inner.flushed_offset >= checkpoint_offset {
1479 Poll::Ready(Ok(()))
1480 } else if inner.terminate {
1481 let context = inner
1482 .terminate_reason
1483 .as_ref()
1484 .map(|e| format!("Journal closed with error: {:?}", e))
1485 .unwrap_or_else(|| "Journal closed".to_string());
1486 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1487 } else {
1488 inner.sync_waker = Some(ctx.waker().clone());
1489 Poll::Pending
1490 }
1491 }))?;
1492
1493 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1494 if needs_flush {
1495 let trace = self.trace.load(Ordering::Relaxed);
1496 if trace {
1497 info!("J: start flush device");
1498 }
1499 self.handle.get().unwrap().flush_device().await?;
1500 if trace {
1501 info!("J: end flush device");
1502 }
1503
1504 {
1512 let mut inner = self.inner.lock();
1513 inner.device_flushed_offset = checkpoint_offset;
1514 inner.needs_did_flush_device = true;
1515 }
1516
1517 self.objects.allocator().did_flush_device(checkpoint_offset).await;
1520 if trace {
1521 info!("J: did flush device");
1522 }
1523 }
1524
1525 Ok(())
1526 }
1527
1528 pub fn super_block_header(&self) -> SuperBlockHeader {
1530 self.inner.lock().super_block_header.clone()
1531 }
1532
1533 pub async fn check_journal_space(&self) -> Result<(), Error> {
1535 loop {
1536 debug_assert_not_too_long!({
1537 let inner = self.inner.lock();
1538 if inner.terminate {
1539 let context = inner
1542 .terminate_reason
1543 .as_ref()
1544 .map(|e| format!("Journal closed with error: {:?}", e))
1545 .unwrap_or_else(|| "Journal closed".to_string());
1546 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1547 }
1548 if self.objects.last_end_offset()
1549 - inner.super_block_header.journal_checkpoint.file_offset
1550 < inner.reclaim_size
1551 {
1552 break Ok(());
1553 }
1554 if inner.image_builder_mode {
1555 break Ok(());
1556 }
1557 if inner.disable_compactions {
1558 break Err(
1559 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1560 );
1561 }
1562 self.reclaim_event.listen()
1563 });
1564 }
1565 }
1566
1567 fn chunk_size(&self) -> u64 {
1568 CHUNK_SIZE
1569 }
1570
1571 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1572 let checkpoint_before;
1573 let checkpoint_after;
1574 {
1575 let _guard = self.writer_mutex.lock();
1576 checkpoint_before = {
1577 let mut inner = self.inner.lock();
1578 let checkpoint = inner.writer.journal_file_checkpoint();
1579 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1580 self.objects.write_mutation(
1581 *object_id,
1582 mutation,
1583 Writer(*object_id, &mut inner.writer),
1584 );
1585 }
1586 checkpoint
1587 };
1588 let maybe_mutation =
1589 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1590 "apply_transaction should not fail in live mode; \
1591 filesystem will be in an inconsistent state",
1592 );
1593 checkpoint_after = {
1594 let mut inner = self.inner.lock();
1595 if let Some(mutation) = maybe_mutation {
1596 inner
1597 .writer
1598 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1599 .unwrap();
1600 }
1601 for (device_range, checksums, first_write) in
1602 transaction.take_checksums().into_iter()
1603 {
1604 inner
1605 .writer
1606 .write_record(&JournalRecord::DataChecksums(
1607 device_range,
1608 Checksums::fletcher(checksums),
1609 first_write,
1610 ))
1611 .unwrap();
1612 }
1613 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1614
1615 inner.writer.journal_file_checkpoint()
1616 };
1617 }
1618 self.objects.did_commit_transaction(
1619 transaction,
1620 &checkpoint_before,
1621 checkpoint_after.file_offset,
1622 );
1623
1624 if let Some(waker) = self.inner.lock().flush_waker.take() {
1625 waker.wake();
1626 }
1627
1628 checkpoint_before.file_offset
1629 }
1630
1631 pub async fn flush_task(self: Arc<Self>) {
1635 let mut flush_fut = None;
1636 let mut compact_fut = None;
1637 let mut flush_error = false;
1638 poll_fn(|ctx| loop {
1639 {
1640 let mut inner = self.inner.lock();
1641 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1642 let flushable = inner.writer.flushable_bytes();
1643 if flushable > 0 {
1644 flush_fut = Some(self.flush(flushable).boxed());
1645 }
1646 }
1647 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1648 return Poll::Ready(());
1649 }
1650 if compact_fut.is_none()
1654 && !inner.terminate
1655 && !inner.disable_compactions
1656 && !inner.image_builder_mode
1657 && self.objects.last_end_offset()
1658 - inner.super_block_header.journal_checkpoint.file_offset
1659 > inner.reclaim_size / 2
1660 {
1661 compact_fut = Some(self.compact().boxed());
1662 inner.compaction_running = true;
1663 }
1664 inner.flush_waker = Some(ctx.waker().clone());
1665 }
1666 let mut pending = true;
1667 if let Some(fut) = flush_fut.as_mut() {
1668 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1669 if let Err(e) = result {
1670 self.inner.lock().terminate(Some(e.context("Flush error")));
1671 self.reclaim_event.notify(usize::MAX);
1672 flush_error = true;
1673 }
1674 flush_fut = None;
1675 pending = false;
1676 }
1677 }
1678 if let Some(fut) = compact_fut.as_mut() {
1679 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1680 let mut inner = self.inner.lock();
1681 if let Err(e) = result {
1682 inner.terminate(Some(e.context("Compaction error")));
1683 }
1684 compact_fut = None;
1685 inner.compaction_running = false;
1686 self.reclaim_event.notify(usize::MAX);
1687 pending = false;
1688 }
1689 }
1690 if pending {
1691 return Poll::Pending;
1692 }
1693 })
1694 .await;
1695 }
1696
1697 async fn flush(&self, amount: usize) -> Result<(), Error> {
1698 let handle = self.handle.get().unwrap();
1699 let mut buf = handle.allocate_buffer(amount).await;
1700 let offset = self.inner.lock().writer.take_flushable(buf.as_mut());
1701 let len = buf.len() as u64;
1702 self.handle.get().unwrap().overwrite(offset, buf.as_mut(), false).await?;
1703 let mut inner = self.inner.lock();
1704 if let Some(waker) = inner.sync_waker.take() {
1705 waker.wake();
1706 }
1707 inner.flushed_offset = offset + len;
1708 inner.valid_to = inner.flushed_offset;
1709 Ok(())
1710 }
1711
1712 #[trace]
1715 pub async fn compact(&self) -> Result<(), Error> {
1716 let trace = self.trace.load(Ordering::Relaxed);
1717 debug!("Compaction starting");
1718 if trace {
1719 info!("J: start compaction");
1720 }
1721 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1722 self.inner.lock().super_block_header.earliest_version = earliest_version;
1723 self.write_super_block().await.context("Failed to write superblock")?;
1724 if trace {
1725 info!("J: end compaction");
1726 }
1727 debug!("Compaction finished");
1728 Ok(())
1729 }
1730
1731 pub async fn stop_compactions(&self) {
1732 loop {
1733 debug_assert_not_too_long!({
1734 let mut inner = self.inner.lock();
1735 inner.disable_compactions = true;
1736 if !inner.compaction_running {
1737 return;
1738 }
1739 self.reclaim_event.listen()
1740 });
1741 }
1742 }
1743
1744 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1747 let this = Arc::downgrade(self);
1748 parent.record_lazy_child(name, move || {
1749 let this_clone = this.clone();
1750 async move {
1751 let inspector = fuchsia_inspect::Inspector::default();
1752 if let Some(this) = this_clone.upgrade() {
1753 let (journal_min, journal_max, journal_reclaim_size) = {
1754 let inner = this.inner.lock();
1756 (
1757 round_down(
1758 inner.super_block_header.journal_checkpoint.file_offset,
1759 BLOCK_SIZE,
1760 ),
1761 inner.flushed_offset,
1762 inner.reclaim_size,
1763 )
1764 };
1765 let root = inspector.root();
1766 root.record_uint("journal_min_offset", journal_min);
1767 root.record_uint("journal_max_offset", journal_max);
1768 root.record_uint("journal_size", journal_max - journal_min);
1769 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1770
1771 if let Some(x) = round_div(
1773 100 * (journal_max - journal_min),
1774 this.objects.allocator().get_disk_bytes(),
1775 ) {
1776 root.record_uint("journal_size_to_disk_size_percent", x);
1777 }
1778 }
1779 Ok(inspector)
1780 }
1781 .boxed()
1782 });
1783 }
1784
1785 pub fn terminate(&self) {
1787 self.inner.lock().terminate(None);
1788 self.reclaim_event.notify(usize::MAX);
1789 }
1790}
1791
1792pub struct Writer<'a>(u64, &'a mut JournalWriter);
1794
1795impl Writer<'_> {
1796 pub fn write(&mut self, mutation: Mutation) {
1797 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1798 }
1799}
1800
1801#[cfg(test)]
1802mod tests {
1803 use crate::filesystem::{FxFilesystem, SyncOptions};
1804 use crate::fsck::fsck;
1805 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1806 use crate::object_store::directory::Directory;
1807 use crate::object_store::transaction::Options;
1808 use crate::object_store::volume::root_volume;
1809 use crate::object_store::{lock_keys, HandleOptions, LockKey, ObjectStore, NO_OWNER};
1810 use fuchsia_async as fasync;
1811 use fuchsia_async::MonotonicDuration;
1812 use storage_device::fake_device::FakeDevice;
1813 use storage_device::DeviceHolder;
1814
1815 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1816
1817 #[fuchsia::test]
1818 async fn test_replay() {
1819 const TEST_DATA: &[u8] = b"hello";
1820
1821 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1822
1823 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1824
1825 let object_id = {
1826 let root_store = fs.root_store();
1827 let root_directory =
1828 Directory::open(&root_store, root_store.root_directory_object_id())
1829 .await
1830 .expect("open failed");
1831 let mut transaction = fs
1832 .clone()
1833 .new_transaction(
1834 lock_keys![LockKey::object(
1835 root_store.store_object_id(),
1836 root_store.root_directory_object_id(),
1837 )],
1838 Options::default(),
1839 )
1840 .await
1841 .expect("new_transaction failed");
1842 let handle = root_directory
1843 .create_child_file(&mut transaction, "test")
1844 .await
1845 .expect("create_child_file failed");
1846
1847 transaction.commit().await.expect("commit failed");
1848 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1849 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1850 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1851 fs.sync(SyncOptions::default()).await.expect("sync failed");
1854 handle.object_id()
1855 };
1856
1857 {
1858 fs.close().await.expect("Close failed");
1859 let device = fs.take_device().await;
1860 device.reopen(false);
1861 let fs = FxFilesystem::open(device).await.expect("open failed");
1862 let handle = ObjectStore::open_object(
1863 &fs.root_store(),
1864 object_id,
1865 HandleOptions::default(),
1866 None,
1867 )
1868 .await
1869 .expect("open_object failed");
1870 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1871 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1872 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1873 fsck(fs.clone()).await.expect("fsck failed");
1874 fs.close().await.expect("Close failed");
1875 }
1876 }
1877
1878 #[fuchsia::test]
1879 async fn test_reset() {
1880 const TEST_DATA: &[u8] = b"hello";
1881
1882 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
1883
1884 let mut object_ids = Vec::new();
1885
1886 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1887 {
1888 let root_store = fs.root_store();
1889 let root_directory =
1890 Directory::open(&root_store, root_store.root_directory_object_id())
1891 .await
1892 .expect("open failed");
1893 let mut transaction = fs
1894 .clone()
1895 .new_transaction(
1896 lock_keys![LockKey::object(
1897 root_store.store_object_id(),
1898 root_store.root_directory_object_id(),
1899 )],
1900 Options::default(),
1901 )
1902 .await
1903 .expect("new_transaction failed");
1904 let handle = root_directory
1905 .create_child_file(&mut transaction, "test")
1906 .await
1907 .expect("create_child_file failed");
1908 transaction.commit().await.expect("commit failed");
1909 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1910 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1911 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1912 fs.sync(SyncOptions::default()).await.expect("sync failed");
1913 object_ids.push(handle.object_id());
1914
1915 for i in 0..1000 {
1918 let mut transaction = fs
1919 .clone()
1920 .new_transaction(
1921 lock_keys![LockKey::object(
1922 root_store.store_object_id(),
1923 root_store.root_directory_object_id(),
1924 )],
1925 Options::default(),
1926 )
1927 .await
1928 .expect("new_transaction failed");
1929 let handle = root_directory
1930 .create_child_file(&mut transaction, &format!("{}", i))
1931 .await
1932 .expect("create_child_file failed");
1933 transaction.commit().await.expect("commit failed");
1934 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1935 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1936 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1937 object_ids.push(handle.object_id());
1938 }
1939 }
1940 fs.close().await.expect("fs close failed");
1941 let device = fs.take_device().await;
1942 device.reopen(false);
1943 let fs = FxFilesystem::open(device).await.expect("open failed");
1944 fsck(fs.clone()).await.expect("fsck failed");
1945 {
1946 let root_store = fs.root_store();
1947 for &object_id in &object_ids[0..1] {
1949 let handle = ObjectStore::open_object(
1950 &root_store,
1951 object_id,
1952 HandleOptions::default(),
1953 None,
1954 )
1955 .await
1956 .expect("open_object failed");
1957 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1958 assert_eq!(
1959 handle.read(0, buf.as_mut()).await.expect("read failed"),
1960 TEST_DATA.len()
1961 );
1962 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1963 }
1964
1965 let root_directory =
1967 Directory::open(&root_store, root_store.root_directory_object_id())
1968 .await
1969 .expect("open failed");
1970 let mut transaction = fs
1971 .clone()
1972 .new_transaction(
1973 lock_keys![LockKey::object(
1974 root_store.store_object_id(),
1975 root_store.root_directory_object_id(),
1976 )],
1977 Options::default(),
1978 )
1979 .await
1980 .expect("new_transaction failed");
1981 let handle = root_directory
1982 .create_child_file(&mut transaction, "test2")
1983 .await
1984 .expect("create_child_file failed");
1985 transaction.commit().await.expect("commit failed");
1986 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1987 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1988 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1989 fs.sync(SyncOptions::default()).await.expect("sync failed");
1990 object_ids.push(handle.object_id());
1991 }
1992
1993 fs.close().await.expect("close failed");
1994 let device = fs.take_device().await;
1995 device.reopen(false);
1996 let fs = FxFilesystem::open(device).await.expect("open failed");
1997 {
1998 fsck(fs.clone()).await.expect("fsck failed");
1999
2000 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2002 let handle = ObjectStore::open_object(
2003 &fs.root_store(),
2004 object_id,
2005 HandleOptions::default(),
2006 None,
2007 )
2008 .await
2009 .unwrap_or_else(|e| {
2010 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2011 });
2012 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2013 assert_eq!(
2014 handle.read(0, buf.as_mut()).await.expect("read failed"),
2015 TEST_DATA.len()
2016 );
2017 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2018 }
2019 }
2020 fs.close().await.expect("close failed");
2021 }
2022
2023 #[fuchsia::test]
2024 async fn test_discard() {
2025 let device = {
2026 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2027 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2028 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2029
2030 let store =
2031 root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2032 let root_directory = Directory::open(&store, store.root_directory_object_id())
2033 .await
2034 .expect("open failed");
2035
2036 let mut i = 0;
2038 loop {
2039 let mut transaction = fs
2040 .clone()
2041 .new_transaction(
2042 lock_keys![LockKey::object(
2043 store.store_object_id(),
2044 store.root_directory_object_id()
2045 )],
2046 Options::default(),
2047 )
2048 .await
2049 .expect("new_transaction failed");
2050 root_directory
2051 .create_child_file(&mut transaction, &format!("a {i}"))
2052 .await
2053 .expect("create_child_file failed");
2054 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2055 break;
2056 }
2057 i += 1;
2058 }
2059
2060 fs.journal().compact().await.expect("compact failed");
2062 fs.journal().stop_compactions().await;
2063
2064 let mut i = 0;
2066 loop {
2067 let mut transaction = fs
2068 .clone()
2069 .new_transaction(
2070 lock_keys![LockKey::object(
2071 store.store_object_id(),
2072 store.root_directory_object_id()
2073 )],
2074 Options::default(),
2075 )
2076 .await
2077 .expect("new_transaction failed");
2078 root_directory
2079 .create_child_file(&mut transaction, &format!("b {i}"))
2080 .await
2081 .expect("create_child_file failed");
2082 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2083 break;
2084 }
2085 i += 1;
2086 }
2087
2088 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2090 fs.device().snapshot().expect("snapshot failed")
2093 };
2094
2095 let fs = FxFilesystem::open(device).await.expect("open failed");
2096
2097 {
2098 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2099
2100 let store = root_volume.volume("test", NO_OWNER, None).await.expect("volume failed");
2101
2102 let root_directory = Directory::open(&store, store.root_directory_object_id())
2103 .await
2104 .expect("open failed");
2105
2106 let mut transaction = fs
2108 .clone()
2109 .new_transaction(
2110 lock_keys![LockKey::object(
2111 store.store_object_id(),
2112 store.root_directory_object_id()
2113 )],
2114 Options::default(),
2115 )
2116 .await
2117 .expect("new_transaction failed");
2118 root_directory
2119 .create_child_file(&mut transaction, &format!("d"))
2120 .await
2121 .expect("create_child_file failed");
2122 transaction.commit().await.expect("commit failed");
2123 }
2124
2125 fs.close().await.expect("close failed");
2126 let device = fs.take_device().await;
2127 device.reopen(false);
2128
2129 let fs = FxFilesystem::open(device).await.expect("open failed");
2130 fsck(fs.clone()).await.expect("fsck failed");
2131 fs.close().await.expect("close failed");
2132 }
2133}
2134
2135#[cfg(fuzz)]
2136mod fuzz {
2137 use fuzz::fuzz;
2138
2139 #[fuzz]
2140 fn fuzz_journal_bytes(input: Vec<u8>) {
2141 use crate::filesystem::FxFilesystem;
2142 use fuchsia_async as fasync;
2143 use std::io::Write;
2144 use storage_device::fake_device::FakeDevice;
2145 use storage_device::DeviceHolder;
2146
2147 fasync::SendExecutor::new(4).run(async move {
2148 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2149 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2150 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2151 fs.close().await.expect("close failed");
2152 let device = fs.take_device().await;
2153 device.reopen(false);
2154 if let Ok(fs) = FxFilesystem::open(device).await {
2155 let _ = fs.close().await;
2158 }
2159 });
2160 }
2161
2162 #[fuzz]
2163 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2164 use crate::filesystem::FxFilesystem;
2165 use fuchsia_async as fasync;
2166 use storage_device::fake_device::FakeDevice;
2167 use storage_device::DeviceHolder;
2168
2169 fasync::SendExecutor::new(4).run(async move {
2170 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2171 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2172 {
2173 let mut inner = fs.journal().inner.lock();
2174 for record in &input {
2175 let _ = inner.writer.write_record(record);
2176 }
2177 }
2178 fs.close().await.expect("close failed");
2179 let device = fs.take_device().await;
2180 device.reopen(false);
2181 if let Ok(fs) = FxFilesystem::open(device).await {
2182 let _ = fs.close().await;
2185 }
2186 });
2187 }
2188}