1use crate::errors::FxfsError;
6use crate::fsck::{fsck_volume_with_options, fsck_with_options, FsckOptions};
7use crate::log::*;
8use crate::object_store::allocator::{Allocator, Hold, Reservation};
9use crate::object_store::directory::Directory;
10use crate::object_store::graveyard::Graveyard;
11use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
12use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::transaction::{
15 self, lock_keys, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation,
16 ReadGuard, Transaction, WriteGuard, TRANSACTION_METADATA_MAX_AMOUNT,
17};
18use crate::object_store::volume::{root_volume, VOLUMES_DIRECTORY};
19use crate::object_store::{ObjectStore, NO_OWNER};
20use crate::range::RangeExt;
21use crate::serialized_types::{Version, LATEST_VERSION};
22use crate::{debug_assert_not_too_long, metrics};
23use anyhow::{bail, ensure, Context, Error};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use once_cell::sync::OnceCell;
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58pub struct Info {
60 pub total_bytes: u64,
61 pub used_bytes: u64,
62}
63
64pub type PostCommitHook =
65 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
66
67pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
68
69pub struct Options {
70 pub read_only: bool,
72
73 pub roll_metadata_key_byte_count: u64,
77
78 pub pre_commit_hook: PreCommitHook,
81
82 pub post_commit_hook: PostCommitHook,
85
86 pub skip_initial_reap: bool,
89
90 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
95
96 pub image_builder_mode: Option<SuperBlockInstance>,
99}
100
101impl Default for Options {
102 fn default() -> Self {
103 Options {
104 roll_metadata_key_byte_count: 128 * 1024 * 1024,
105 read_only: false,
106 pre_commit_hook: None,
107 post_commit_hook: None,
108 skip_initial_reap: false,
109 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
110 image_builder_mode: None,
111 }
112 }
113}
114
115pub struct ApplyContext<'a, 'b> {
117 pub mode: ApplyMode<'a, 'b>,
119
120 pub checkpoint: JournalCheckpoint,
122}
123
124pub enum ApplyMode<'a, 'b> {
127 Replay,
128 Live(&'a Transaction<'b>),
129}
130
131impl ApplyMode<'_, '_> {
132 pub fn is_replay(&self) -> bool {
133 matches!(self, ApplyMode::Replay)
134 }
135
136 pub fn is_live(&self) -> bool {
137 matches!(self, ApplyMode::Live(_))
138 }
139}
140
141#[async_trait]
144pub trait JournalingObject: Send + Sync {
145 fn apply_mutation(
149 &self,
150 mutation: Mutation,
151 context: &ApplyContext<'_, '_>,
152 assoc_obj: AssocObj<'_>,
153 ) -> Result<(), Error>;
154
155 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
157
158 async fn flush(&self) -> Result<Version, Error>;
162
163 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
166 writer.write(mutation.clone());
167 }
168}
169
170#[derive(Default)]
171pub struct SyncOptions<'a> {
172 pub flush_device: bool,
179
180 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
183}
184
185pub struct OpenFxFilesystem(Arc<FxFilesystem>);
186
187impl OpenFxFilesystem {
188 pub async fn take_device(self) -> DeviceHolder {
191 let fut = self.device.take_when_dropped();
192 std::mem::drop(self);
193 debug_assert_not_too_long!(fut)
194 }
195
196 pub async fn finalize(&self) -> Result<(), Error> {
198 ensure!(
199 self.journal().image_builder_mode().is_some(),
200 "finalize() only valid in image_builder_mode."
201 );
202 self.journal().allocate_journal().await?;
203 self.journal().set_image_builder_mode(None);
204 self.journal().compact().await?;
205 Ok(())
206 }
207}
208
209impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
210 fn from(fs: Arc<FxFilesystem>) -> Self {
211 Self(fs)
212 }
213}
214
215impl Drop for OpenFxFilesystem {
216 fn drop(&mut self) {
217 if self.options.image_builder_mode.is_some()
218 && self.journal().image_builder_mode().is_some()
219 {
220 error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
221 }
222 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
223 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
224 }
225 }
226}
227
228impl std::ops::Deref for OpenFxFilesystem {
229 type Target = Arc<FxFilesystem>;
230
231 fn deref(&self) -> &Self::Target {
232 &self.0
233 }
234}
235
236pub struct FxFilesystemBuilder {
237 format: bool,
238 trace: bool,
239 options: Options,
240 journal_options: JournalOptions,
241 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
242 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
243 fsck_after_every_transaction: bool,
244}
245
246impl FxFilesystemBuilder {
247 pub fn new() -> Self {
248 Self {
249 format: false,
250 trace: false,
251 options: Options::default(),
252 journal_options: JournalOptions::default(),
253 on_new_allocator: None,
254 on_new_store: None,
255 fsck_after_every_transaction: false,
256 }
257 }
258
259 pub fn format(mut self, format: bool) -> Self {
261 self.format = format;
262 self
263 }
264
265 pub fn trace(mut self, trace: bool) -> Self {
267 self.trace = trace;
268 self
269 }
270
271 pub fn read_only(mut self, read_only: bool) -> Self {
274 self.options.read_only = read_only;
275 self
276 }
277
278 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
285 self.options.image_builder_mode = mode;
286 self
287 }
288
289 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
291 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
292 self
293 }
294
295 pub fn pre_commit_hook(
297 mut self,
298 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
299 ) -> Self {
300 self.options.pre_commit_hook = Some(Box::new(hook));
301 self
302 }
303
304 pub fn post_commit_hook(
307 mut self,
308 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
309 ) -> Self {
310 self.options.post_commit_hook = Some(Box::new(hook));
311 self
312 }
313
314 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
317 self.options.skip_initial_reap = skip_initial_reap;
318 self
319 }
320
321 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
323 self.journal_options = journal_options;
324 self
325 }
326
327 pub fn on_new_allocator(
329 mut self,
330 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
331 ) -> Self {
332 self.on_new_allocator = Some(Box::new(on_new_allocator));
333 self
334 }
335
336 pub fn on_new_store(
338 mut self,
339 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
340 ) -> Self {
341 self.on_new_store = Some(Box::new(on_new_store));
342 self
343 }
344
345 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
347 self.fsck_after_every_transaction = fsck_after_every_transaction;
348 self
349 }
350
351 pub fn trim_config(
352 mut self,
353 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
354 ) -> Self {
355 self.options.trim_config = delay_and_interval;
356 self
357 }
358
359 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
361 let read_only = self.options.read_only;
362 if self.format && read_only {
363 bail!("Cannot initialize a filesystem as read-only");
364 }
365
366 let objects = Arc::new(ObjectManager::new(self.on_new_store));
367 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
368
369 let image_builder_mode = self.options.image_builder_mode;
370
371 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
372 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
373 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
374
375 let mut fsck_after_every_transaction = None;
376 let mut filesystem_options = self.options;
377 if self.fsck_after_every_transaction {
378 let instance =
379 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
380 fsck_after_every_transaction = Some(instance.clone());
381 filesystem_options.post_commit_hook =
382 Some(Box::new(move || instance.clone().run().boxed()));
383 }
384
385 if !read_only && !self.format {
386 device.flush().await.context("Device flush failed")?;
389 }
390
391 let filesystem = Arc::new(FxFilesystem {
392 device,
393 block_size,
394 objects: objects.clone(),
395 journal,
396 commit_mutex: futures::lock::Mutex::new(()),
397 lock_manager: LockManager::new(),
398 flush_task: Mutex::new(None),
399 trim_task: Mutex::new(None),
400 closed: AtomicBool::new(true),
401 shutdown_event: Event::new(),
402 trace: self.trace,
403 graveyard: Graveyard::new(objects.clone()),
404 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
405 options: filesystem_options,
406 in_flight_transactions: AtomicU64::new(0),
407 transaction_limit_event: Event::new(),
408 });
409
410 filesystem.journal().set_image_builder_mode(image_builder_mode);
411
412 filesystem.journal.set_trace(self.trace);
413 if self.format {
414 filesystem.journal.init_empty(filesystem.clone()).await?;
415 if image_builder_mode.is_none() {
416 filesystem.journal.init_superblocks().await?;
419
420 filesystem.graveyard.clone().reap_async();
422 }
423
424 let root_store = filesystem.root_store();
426 root_store.set_trace(self.trace);
427 let root_directory =
428 Directory::open(&root_store, root_store.root_directory_object_id())
429 .await
430 .context("Unable to open root volume directory")?;
431 let mut transaction = filesystem
432 .clone()
433 .new_transaction(
434 lock_keys![LockKey::object(
435 root_store.store_object_id(),
436 root_directory.object_id()
437 )],
438 transaction::Options::default(),
439 )
440 .await?;
441 let volume_directory =
442 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
443 transaction.commit().await?;
444 objects.set_volume_directory(volume_directory);
445 } else {
446 filesystem
447 .journal
448 .replay(filesystem.clone(), self.on_new_allocator)
449 .await
450 .context("Journal replay failed")?;
451 filesystem.root_store().set_trace(self.trace);
452
453 if !read_only {
454 for store in objects.unlocked_stores() {
458 filesystem.graveyard.initial_reap(&store).await?;
459 }
460 }
461 }
462
463 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
465 fsck_after_every_transaction
466 .fs
467 .set(Arc::downgrade(&filesystem))
468 .unwrap_or_else(|_| unreachable!());
469 }
470
471 filesystem.closed.store(false, Ordering::SeqCst);
472
473 if !read_only && image_builder_mode.is_none() {
474 filesystem.graveyard.clone().reap_async();
476
477 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
478 filesystem.start_trim_task(delay, interval);
479 }
480 }
481
482 Ok(filesystem.into())
483 }
484}
485
486pub struct FxFilesystem {
487 block_size: u64,
488 objects: Arc<ObjectManager>,
489 journal: Arc<Journal>,
490 commit_mutex: futures::lock::Mutex<()>,
491 lock_manager: LockManager,
492 flush_task: Mutex<Option<fasync::Task<()>>>,
493 trim_task: Mutex<Option<fasync::Task<()>>>,
494 closed: AtomicBool,
495 shutdown_event: Event,
497 trace: bool,
498 graveyard: Arc<Graveyard>,
499 completed_transactions: UintProperty,
500 options: Options,
501
502 in_flight_transactions: AtomicU64,
504
505 transaction_limit_event: Event,
508
509 device: DeviceHolder,
512}
513
514#[fxfs_trace::trace]
515impl FxFilesystem {
516 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
517 FxFilesystemBuilder::new().format(true).open(device).await
518 }
519
520 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
521 FxFilesystemBuilder::new().open(device).await
522 }
523
524 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
525 self.objects.root_parent_store()
526 }
527
528 pub async fn close(&self) -> Result<(), Error> {
529 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
530 self.shutdown_event.notify(usize::MAX);
531 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
532 let trim_task = self.trim_task.lock().take();
533 if let Some(task) = trim_task {
534 debug_assert_not_too_long!(task);
535 }
536 self.journal.stop_compactions().await;
537 let sync_status =
538 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
539 match &sync_status {
540 Ok(checkpoint) => info!(
541 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
542 reservation_required={}, borrowed={})",
543 checkpoint.as_ref().unwrap().0.file_offset,
544 self.object_manager().metadata_reservation(),
545 self.object_manager().required_reservation(),
546 self.object_manager().borrowed_metadata_space(),
547 ),
548 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
549 }
550 self.journal.terminate();
551 let flush_task = self.flush_task.lock().take();
552 if let Some(task) = flush_task {
553 debug_assert_not_too_long!(task);
554 }
555 self.device().close().await.context("Failed to close device")?;
558 sync_status.map(|_| ())
559 }
560
561 pub fn device(&self) -> Arc<dyn Device> {
562 Arc::clone(&self.device)
563 }
564
565 pub fn root_store(&self) -> Arc<ObjectStore> {
566 self.objects.root_store()
567 }
568
569 pub fn allocator(&self) -> Arc<Allocator> {
570 self.objects.allocator()
571 }
572
573 pub fn object_manager(&self) -> &Arc<ObjectManager> {
574 &self.objects
575 }
576
577 pub fn journal(&self) -> &Arc<Journal> {
578 &self.journal
579 }
580
581 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
582 self.journal.sync(options).await.map(|_| ())
583 }
584
585 pub fn block_size(&self) -> u64 {
586 self.block_size
587 }
588
589 pub fn get_info(&self) -> Info {
590 Info {
591 total_bytes: self.device.size(),
592 used_bytes: self.object_manager().allocator().get_used_bytes().0,
593 }
594 }
595
596 pub fn super_block_header(&self) -> SuperBlockHeader {
597 self.journal.super_block_header()
598 }
599
600 pub fn graveyard(&self) -> &Arc<Graveyard> {
601 &self.graveyard
602 }
603
604 pub fn trace(&self) -> bool {
605 self.trace
606 }
607
608 pub fn options(&self) -> &Options {
609 &self.options
610 }
611
612 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
619 TxnGuard::Owned(
620 self.lock_manager
621 .read_lock(lock_keys!(LockKey::Filesystem))
622 .await
623 .into_owned(self.clone()),
624 )
625 }
626
627 pub async fn new_transaction<'a>(
628 self: Arc<Self>,
629 locks: LockKeys,
630 options: transaction::Options<'a>,
631 ) -> Result<Transaction<'a>, Error> {
632 let guard = if let Some(guard) = options.txn_guard.as_ref() {
633 TxnGuard::Borrowed(guard)
634 } else {
635 self.txn_guard().await
636 };
637 Transaction::new(guard, options, locks).await
638 }
639
640 #[trace]
641 pub async fn commit_transaction(
642 &self,
643 transaction: &mut Transaction<'_>,
644 callback: &mut (dyn FnMut(u64) + Send),
645 ) -> Result<u64, Error> {
646 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
647 hook(transaction)?;
648 }
649 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
650 self.maybe_start_flush_task();
651 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
652 let journal_offset = if self.journal().image_builder_mode().is_some() {
653 let journal_checkpoint =
654 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
655 let maybe_mutation = self
656 .object_manager()
657 .apply_transaction(transaction, &journal_checkpoint)
658 .expect("Transactions must not fail in image_builder_mode");
659 if let Some(mutation) = maybe_mutation {
660 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
661 }
665 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
666 0
667 } else {
668 self.journal.commit(transaction).await?
669 };
670 self.completed_transactions.add(1);
671
672 callback(journal_offset);
677
678 if let Some(hook) = self.options.post_commit_hook.as_ref() {
679 hook().await;
680 }
681
682 Ok(journal_offset)
683 }
684
685 pub fn lock_manager(&self) -> &LockManager {
686 &self.lock_manager
687 }
688
689 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
690 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
691 self.sub_transaction();
692 }
693 if let MetadataReservation::Hold(hold_amount) =
695 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
696 {
697 let hold = transaction
698 .allocator_reservation
699 .unwrap()
700 .reserve(0)
701 .expect("Zero should always succeed.");
702 hold.add(hold_amount);
703 }
704 self.objects.drop_transaction(transaction);
705 self.lock_manager.drop_transaction(transaction);
706 }
707
708 fn maybe_start_flush_task(&self) {
709 let mut flush_task = self.flush_task.lock();
710 if flush_task.is_none() {
711 let journal = self.journal.clone();
712 *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
713 }
714 }
715
716 async fn do_trim(&self) -> Result<usize, Error> {
718 const MAX_EXTENTS_PER_BATCH: usize = 8;
719 const MAX_EXTENT_SIZE: usize = 256 * 1024;
720 let mut offset = 0;
721 let mut bytes_trimmed = 0;
722 loop {
723 if self.closed.load(Ordering::Relaxed) {
724 info!("Filesystem is closed, nothing to trim");
725 return Ok(bytes_trimmed);
726 }
727 let allocator = self.allocator();
728 let trimmable_extents =
729 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
730 for device_range in trimmable_extents.extents() {
731 self.device.trim(device_range.clone()).await?;
732 bytes_trimmed += device_range.length()? as usize;
733 }
734 if let Some(device_range) = trimmable_extents.extents().last() {
735 offset = device_range.end;
736 } else {
737 break;
738 }
739 }
740 Ok(bytes_trimmed)
741 }
742
743 fn start_trim_task(
744 self: &Arc<Self>,
745 delay: std::time::Duration,
746 interval: std::time::Duration,
747 ) {
748 if !self.device.supports_trim() {
749 info!("Device does not support trim; not scheduling trimming");
750 return;
751 }
752 let this = self.clone();
753 let mut next_timer = delay;
754 *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
755 loop {
756 let shutdown_listener = this.shutdown_event.listen();
757 if this.closed.load(Ordering::SeqCst) {
763 return;
764 }
765 futures::select!(
766 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
767 () = shutdown_listener.fuse() => return,
768 );
769 let start_time = std::time::Instant::now();
770 let res = this.do_trim().await;
771 let duration = std::time::Instant::now() - start_time;
772 next_timer = interval.clone();
773 match res {
774 Ok(bytes_trimmed) => info!(
775 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
776 {next_timer:?}",
777 ),
778 Err(e) => error!(e:?; "Failed to trim"),
779 }
780 }
781 }));
782 }
783
784 pub(crate) async fn reservation_for_transaction<'a>(
785 self: &Arc<Self>,
786 options: transaction::Options<'a>,
787 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
788 if self.options.image_builder_mode.is_some() {
789 return Ok((MetadataReservation::Borrowed, None, None));
792 }
793 if !options.skip_journal_checks {
794 self.maybe_start_flush_task();
795 self.journal.check_journal_space().await?;
796 }
797
798 let mut hold = None;
812 let metadata_reservation = if options.borrow_metadata_space {
813 MetadataReservation::Borrowed
814 } else {
815 match options.allocator_reservation {
816 Some(reservation) => {
817 hold = Some(
818 reservation
819 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
820 .ok_or(FxfsError::NoSpace)?,
821 );
822 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
823 }
824 None => {
825 let reservation = self
826 .allocator()
827 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
828 .ok_or(FxfsError::NoSpace)?;
829 MetadataReservation::Reservation(reservation)
830 }
831 }
832 };
833 Ok((metadata_reservation, options.allocator_reservation, hold))
834 }
835
836 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
837 if skip_journal_checks {
838 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
839 } else {
840 let inc = || {
841 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
842 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
843 match self.in_flight_transactions.compare_exchange_weak(
844 in_flights,
845 in_flights + 1,
846 Ordering::Relaxed,
847 Ordering::Relaxed,
848 ) {
849 Ok(_) => return true,
850 Err(x) => in_flights = x,
851 }
852 }
853 return false;
854 };
855 while !inc() {
856 let listener = self.transaction_limit_event.listen();
857 if inc() {
858 break;
859 }
860 listener.await;
861 }
862 }
863 }
864
865 pub(crate) fn sub_transaction(&self) {
866 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
867 assert!(old != 0);
868 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
869 self.transaction_limit_event.notify(usize::MAX);
870 }
871 }
872
873 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
874 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
875 TruncateGuard(self.lock_manager().write_lock(keys).await)
876 }
877}
878
879pub enum TxnGuard<'a> {
880 Borrowed(&'a TxnGuard<'a>),
881 Owned(ReadGuard<'static>),
882}
883
884impl TxnGuard<'_> {
885 pub fn fs(&self) -> &Arc<FxFilesystem> {
886 match self {
887 TxnGuard::Borrowed(b) => b.fs(),
888 TxnGuard::Owned(o) => o.fs().unwrap(),
889 }
890 }
891}
892
893#[allow(dead_code)]
895pub struct TruncateGuard<'a>(WriteGuard<'a>);
896
897pub async fn mkfs(device: DeviceHolder) -> Result<(), Error> {
899 let fs = FxFilesystem::new_empty(device).await?;
900 fs.close().await
901}
902
903pub async fn mkfs_with_volume(
907 device: DeviceHolder,
908 volume_name: &str,
909 crypt: Option<Arc<dyn Crypt>>,
910) -> Result<(), Error> {
911 let fs = FxFilesystem::new_empty(device).await?;
912 {
913 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
916 root_volume.new_volume(volume_name, NO_OWNER, crypt).await.expect("Create volume failed");
917 }
918 fs.close().await?;
919 Ok(())
920}
921
922struct FsckAfterEveryTransaction {
923 fs: OnceCell<Weak<FxFilesystem>>,
924 old_hook: PostCommitHook,
925}
926
927impl FsckAfterEveryTransaction {
928 fn new(old_hook: PostCommitHook) -> Arc<Self> {
929 Arc::new(Self { fs: OnceCell::new(), old_hook })
930 }
931
932 async fn run(self: Arc<Self>) {
933 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
934 let options = FsckOptions {
935 fail_on_warning: true,
936 no_lock: true,
937 quiet: true,
938 ..Default::default()
939 };
940 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
941 let object_manager = fs.object_manager();
942 for store in object_manager.unlocked_stores() {
943 let store_id = store.store_object_id();
944 if !object_manager.is_system_store(store_id) {
945 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
946 .await
947 .expect("fsck_volume_with_options failed");
948 }
949 }
950 }
951 if let Some(old_hook) = self.old_hook.as_ref() {
952 old_hook().await;
953 }
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
960 use crate::fsck::{fsck, fsck_volume};
961 use crate::log::*;
962 use crate::lsm_tree::types::Item;
963 use crate::lsm_tree::Operation;
964 use crate::object_handle::{
965 ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
966 };
967 use crate::object_store::directory::{replace_child, Directory};
968 use crate::object_store::journal::super_block::SuperBlockInstance;
969 use crate::object_store::journal::JournalOptions;
970 use crate::object_store::transaction::{lock_keys, LockKey, Options};
971 use crate::object_store::volume::root_volume;
972 use crate::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, NO_OWNER};
973 use crate::range::RangeExt;
974 use fuchsia_async as fasync;
975 use fuchsia_sync::Mutex;
976 use futures::future::join_all;
977 use futures::stream::{FuturesUnordered, TryStreamExt};
978 use fxfs_insecure_crypto::InsecureCrypt;
979 use rustc_hash::FxHashMap as HashMap;
980 use std::ops::Range;
981 use std::sync::Arc;
982 use std::time::Duration;
983 use storage_device::fake_device::FakeDevice;
984 use storage_device::DeviceHolder;
985
986 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
987
988 #[fuchsia::test(threads = 10)]
989 async fn test_compaction() {
990 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
991
992 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
994 let root_store = fs.root_store();
995 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
996 .await
997 .expect("open failed");
998
999 let mut tasks = Vec::new();
1000 for i in 0..2 {
1001 let mut transaction = fs
1002 .clone()
1003 .new_transaction(
1004 lock_keys![LockKey::object(
1005 root_store.store_object_id(),
1006 root_directory.object_id()
1007 )],
1008 Options::default(),
1009 )
1010 .await
1011 .expect("new_transaction failed");
1012 let handle = root_directory
1013 .create_child_file(&mut transaction, &format!("{}", i))
1014 .await
1015 .expect("create_child_file failed");
1016 transaction.commit().await.expect("commit failed");
1017 tasks.push(fasync::Task::spawn(async move {
1018 const TEST_DATA: &[u8] = b"hello";
1019 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1020 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1021 for _ in 0..1500 {
1022 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1023 }
1024 }));
1025 }
1026 join_all(tasks).await;
1027 fs.sync(SyncOptions::default()).await.expect("sync failed");
1028
1029 fsck(fs.clone()).await.expect("fsck failed");
1030 fs.close().await.expect("Close failed");
1031 }
1032
1033 #[fuchsia::test(threads = 10)]
1034 async fn test_replay_is_identical() {
1035 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1036 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1037
1038 fs.close().await.expect("close failed");
1041 let device = fs.take_device().await;
1042 device.reopen(false);
1043
1044 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1045
1046 impl<K: Clone, V: Clone> Mutations<K, V> {
1047 fn new() -> Self {
1048 Mutations(Mutex::new(Vec::new()))
1049 }
1050
1051 fn push(&self, operation: Operation, item: &Item<K, V>) {
1052 self.0.lock().push((operation, item.clone()));
1053 }
1054 }
1055
1056 let open_fs = |device,
1057 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1058 allocator_mutations: Arc<Mutations<_, _>>| async {
1059 FxFilesystemBuilder::new()
1060 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1061 .on_new_allocator(move |allocator| {
1062 let allocator_mutations = allocator_mutations.clone();
1063 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1064 allocator_mutations.push(op, item)
1065 })));
1066 })
1067 .on_new_store(move |store| {
1068 let mutations = Arc::new(Mutations::new());
1069 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1070 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1071 mutations.push(op, item)
1072 })));
1073 })
1074 .open(device)
1075 .await
1076 .expect("open failed")
1077 };
1078
1079 let allocator_mutations = Arc::new(Mutations::new());
1080 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1081 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1082
1083 let root_store = fs.root_store();
1084 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1085 .await
1086 .expect("open failed");
1087
1088 let mut transaction = fs
1089 .clone()
1090 .new_transaction(
1091 lock_keys![LockKey::object(
1092 root_store.store_object_id(),
1093 root_directory.object_id()
1094 )],
1095 Options::default(),
1096 )
1097 .await
1098 .expect("new_transaction failed");
1099 let object = root_directory
1100 .create_child_file(&mut transaction, "test")
1101 .await
1102 .expect("create_child_file failed");
1103 transaction.commit().await.expect("commit failed");
1104
1105 let buf = object.allocate_buffer(10000).await;
1107 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1108
1109 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1111
1112 object.truncate(3000).await.expect("truncate failed");
1114
1115 let mut transaction = fs
1117 .clone()
1118 .new_transaction(
1119 lock_keys![
1120 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1121 LockKey::object(root_store.store_object_id(), object.object_id()),
1122 ],
1123 Options::default(),
1124 )
1125 .await
1126 .expect("new_transaction failed");
1127
1128 replace_child(&mut transaction, None, (&root_directory, "test"))
1129 .await
1130 .expect("replace_child failed");
1131
1132 transaction.commit().await.expect("commit failed");
1133
1134 root_store
1136 .tombstone_object(object.object_id(), Options::default())
1137 .await
1138 .expect("tombstone failed");
1139
1140 fs.close().await.expect("close failed");
1142
1143 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1144
1145 let device = fs.take_device().await;
1146 device.reopen(false);
1147
1148 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1149 let replayed_allocator_mutations = Arc::new(Mutations::new());
1150 let fs = open_fs(
1151 device,
1152 replayed_object_mutations.clone(),
1153 replayed_allocator_mutations.clone(),
1154 )
1155 .await;
1156
1157 let m1 = object_mutations.lock();
1158 let m2 = replayed_object_mutations.lock();
1159 assert_eq!(m1.len(), m2.len());
1160 for (store_id, mutations) in &*m1 {
1161 let mutations = mutations.0.lock();
1162 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1163 assert_eq!(mutations.len(), replayed.len());
1164 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1165 assert_eq!(op1, op2);
1166 assert_eq!(i1.key, i2.key);
1167 assert_eq!(i1.value, i2.value);
1168 assert_eq!(i1.sequence, i2.sequence);
1169 }
1170 }
1171
1172 let a1 = allocator_mutations.0.lock();
1173 let a2 = replayed_allocator_mutations.0.lock();
1174 assert_eq!(a1.len(), a2.len());
1175 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1176 assert_eq!(op1, op2);
1177 assert_eq!(i1.key, i2.key);
1178 assert_eq!(i1.value, i2.value);
1179 assert_eq!(i1.sequence, i2.sequence);
1180 }
1181
1182 assert_eq!(
1183 fs.object_manager().metadata_reservation().amount(),
1184 metadata_reservation_amount
1185 );
1186 }
1187
1188 #[fuchsia::test]
1189 async fn test_max_in_flight_transactions() {
1190 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1191 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1192
1193 let transactions = FuturesUnordered::new();
1194 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1195 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1196 }
1197 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1198
1199 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1201 assert!(futures::poll!(&mut fut).is_pending());
1202
1203 transactions.pop();
1205
1206 assert!(futures::poll!(&mut fut).is_ready());
1207 }
1208
1209 #[fuchsia::test(threads = 10)]
1211 async fn test_continuously_trim() {
1212 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1213 let fs = FxFilesystemBuilder::new()
1214 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1215 .format(true)
1216 .open(device)
1217 .await
1218 .expect("open failed");
1219 fasync::Timer::new(Duration::from_millis(10)).await;
1221
1222 let root_store = fs.root_store();
1225 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1226 .await
1227 .expect("open failed");
1228 for _ in 0..100 {
1229 let mut transaction = fs
1230 .clone()
1231 .new_transaction(
1232 lock_keys![LockKey::object(
1233 root_store.store_object_id(),
1234 root_directory.object_id()
1235 )],
1236 Options::default(),
1237 )
1238 .await
1239 .expect("new_transaction failed");
1240 let object = root_directory
1241 .create_child_file(&mut transaction, "test")
1242 .await
1243 .expect("create_child_file failed");
1244 transaction.commit().await.expect("commit failed");
1245
1246 {
1247 let buf = object.allocate_buffer(1024).await;
1248 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1249 }
1250 std::mem::drop(object);
1251
1252 let mut transaction = root_directory
1253 .acquire_context_for_replace(None, "test", true)
1254 .await
1255 .expect("acquire_context_for_replace failed")
1256 .transaction;
1257 replace_child(&mut transaction, None, (&root_directory, "test"))
1258 .await
1259 .expect("replace_child failed");
1260 transaction.commit().await.expect("commit failed");
1261 }
1262 fs.close().await.expect("close failed");
1263 }
1264
1265 #[fuchsia::test]
1266 async fn test_power_fail() {
1267 for _ in 0..10 {
1270 let (store_id, device, test_file_object_id) = {
1271 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1272 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1273 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1274
1275 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1276 .await
1277 .expect("sync failed");
1278
1279 let store = root_volume
1280 .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1281 .await
1282 .expect("new_volume failed");
1283 let root_directory = Directory::open(&store, store.root_directory_object_id())
1284 .await
1285 .expect("open failed");
1286
1287 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1289 let fs = store.filesystem();
1290 let root_directory = Directory::open(store, store.root_directory_object_id())
1291 .await
1292 .expect("open failed");
1293 for i in 0..100 {
1294 let mut transaction = fs
1295 .clone()
1296 .new_transaction(
1297 lock_keys![LockKey::object(
1298 store.store_object_id(),
1299 store.root_directory_object_id()
1300 )],
1301 Options::default(),
1302 )
1303 .await
1304 .expect("new_transaction failed");
1305 root_directory
1306 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1307 .await
1308 .expect("create_child_file failed");
1309 transaction.commit().await.expect("commit failed");
1310 }
1311 }
1312
1313 create_files(&store, "A").await;
1315
1316 let mut transaction = fs
1319 .clone()
1320 .new_transaction(
1321 lock_keys![LockKey::object(
1322 store.store_object_id(),
1323 store.root_directory_object_id()
1324 )],
1325 Options::default(),
1326 )
1327 .await
1328 .expect("new_transaction failed");
1329 let object = root_directory
1330 .create_child_file(&mut transaction, "test")
1331 .await
1332 .expect("create_child_file failed");
1333 transaction.commit().await.expect("commit failed");
1334
1335 let mut transaction =
1336 object.new_transaction().await.expect("new_transaction failed");
1337 let mut buffer = object.allocate_buffer(4096).await;
1338 buffer.as_mut_slice().fill(0xed);
1339 object
1340 .txn_write(&mut transaction, 0, buffer.as_ref())
1341 .await
1342 .expect("txn_write failed");
1343 transaction.commit().await.expect("commit failed");
1344
1345 create_files(&store, "B").await;
1347
1348 fs.sync(SyncOptions::default()).await.expect("sync failed");
1351
1352 fasync::Timer::new(Duration::from_millis(10)).await;
1358
1359 (
1360 store.store_object_id(),
1361 fs.device().snapshot().expect("snapshot failed"),
1362 object.object_id(),
1363 )
1364 };
1365
1366 device
1369 .discard_random_since_last_flush()
1370 .expect("discard_random_since_last_flush failed");
1371
1372 let fs = FxFilesystem::open(device).await.expect("open failed");
1373 fsck(fs.clone()).await.expect("fsck failed");
1374
1375 let mut check_test_file = false;
1376
1377 let object_id = if fs.object_manager().store(store_id).is_some() {
1380 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1381 .await
1382 .expect("fsck_volume failed");
1383
1384 let store = root_volume(fs.clone())
1388 .await
1389 .expect("root_volume failed")
1390 .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1391 .await
1392 .expect("volume failed");
1393
1394 let root_directory = Directory::open(&store, store.root_directory_object_id())
1395 .await
1396 .expect("open failed");
1397
1398 let mut transaction = fs
1399 .clone()
1400 .new_transaction(
1401 lock_keys![LockKey::object(
1402 store.store_object_id(),
1403 store.root_directory_object_id()
1404 )],
1405 Options::default(),
1406 )
1407 .await
1408 .expect("new_transaction failed");
1409 let object = root_directory
1410 .create_child_file(&mut transaction, &format!("C"))
1411 .await
1412 .expect("create_child_file failed");
1413 transaction.commit().await.expect("commit failed");
1414
1415 if let Ok(test_file) = ObjectStore::open_object(
1417 &store,
1418 test_file_object_id,
1419 HandleOptions::default(),
1420 None,
1421 )
1422 .await
1423 {
1424 let mut buffer = test_file.allocate_buffer(4096).await;
1426 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1427 if bytes == 4096 {
1428 let expected = [0xed; 4096];
1429 assert_eq!(buffer.as_slice(), &expected);
1430 } else {
1431 assert_eq!(bytes, 0);
1433 }
1434
1435 let mut transaction =
1437 test_file.new_transaction().await.expect("new_transaction failed");
1438 buffer.as_mut_slice().fill(0x37);
1439 test_file
1440 .txn_write(&mut transaction, 0, buffer.as_ref())
1441 .await
1442 .expect("txn_write failed");
1443 transaction.commit().await.expect("commit failed");
1444 check_test_file = true;
1445 }
1446
1447 object.object_id()
1448 } else {
1449 INVALID_OBJECT_ID
1450 };
1451
1452 fs.close().await.expect("close failed");
1454 let device = fs.take_device().await;
1455 device.reopen(false);
1456
1457 let fs = FxFilesystem::open(device).await.expect("open failed");
1458 fsck(fs.clone()).await.expect("fsck failed");
1459
1460 if object_id != INVALID_OBJECT_ID {
1463 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1464 .await
1465 .expect("fsck_volume failed");
1466
1467 let store = root_volume(fs.clone())
1468 .await
1469 .expect("root_volume failed")
1470 .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1471 .await
1472 .expect("volume failed");
1473 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1475 .await
1476 .expect("open_object failed");
1477
1478 if check_test_file {
1480 info!("Checking test file for modification");
1481 let test_file = ObjectStore::open_object(
1482 &store,
1483 test_file_object_id,
1484 HandleOptions::default(),
1485 None,
1486 )
1487 .await
1488 .expect("open_object failed");
1489 let mut buffer = test_file.allocate_buffer(4096).await;
1490 assert_eq!(
1491 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1492 4096
1493 );
1494 let expected = [0x37; 4096];
1495 assert_eq!(buffer.as_slice(), &expected);
1496 }
1497 }
1498
1499 fs.close().await.expect("close failed");
1500 }
1501 }
1502
1503 #[fuchsia::test]
1504 async fn test_image_builder_mode_no_early_writes() {
1505 const BLOCK_SIZE: u32 = 4096;
1506 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1507 device.reopen(true);
1508 let fs = FxFilesystemBuilder::new()
1509 .format(true)
1510 .image_builder_mode(Some(SuperBlockInstance::A))
1511 .open(device)
1512 .await
1513 .expect("open failed");
1514 fs.close().await.expect("closed");
1517 }
1518
1519 #[fuchsia::test]
1520 async fn test_image_builder_mode() {
1521 const BLOCK_SIZE: u32 = 4096;
1522 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1523 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1524
1525 {
1527 let mut write_buf =
1528 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1529 write_buf.as_mut_slice().fill(0xf0);
1530 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1531 }
1532
1533 device.reopen(true);
1534
1535 let device = {
1536 let fs = FxFilesystemBuilder::new()
1537 .format(true)
1538 .image_builder_mode(Some(SuperBlockInstance::B))
1539 .open(device)
1540 .await
1541 .expect("open failed");
1542 {
1543 let root_store = fs.root_store();
1544 let root_directory =
1545 Directory::open(&root_store, root_store.root_directory_object_id())
1546 .await
1547 .expect("open failed");
1548 let handle;
1550 {
1551 let mut transaction = fs
1552 .clone()
1553 .new_transaction(
1554 lock_keys![LockKey::object(
1555 root_directory.store().store_object_id(),
1556 root_directory.object_id()
1557 )],
1558 Options::default(),
1559 )
1560 .await
1561 .expect("new transaction");
1562 handle = root_directory
1563 .create_child_file(&mut transaction, "test")
1564 .await
1565 .expect("create file");
1566 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1567 transaction.commit().await.expect("commit");
1568 }
1569 }
1570 fs.device().reopen(false);
1571 fs.finalize().await.expect("finalize");
1572 fs.close().await.expect("close");
1573 fs.take_device().await
1574 };
1575 device.reopen(false);
1576 let fs = FxFilesystem::open(device).await.expect("open failed");
1577 fsck(fs.clone()).await.expect("fsck failed");
1578
1579 let root_store = fs.root_store();
1581 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1582 .await
1583 .expect("open failed");
1584 let (object_id, descriptor, _) =
1585 root_directory.lookup("test").await.expect("lookup failed").unwrap();
1586 assert_eq!(descriptor, ObjectDescriptor::File);
1587 let test_file =
1588 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1589 .await
1590 .expect("open failed");
1591 let mut read_buf =
1592 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1593 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1594 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1595 fs.close().await.expect("closed");
1596 }
1597}