fxfs/object_store/
graveyard.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::merge::{Merger, MergerIterator};
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::lsm_tree::Query;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13    ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use crate::object_store::ObjectStore;
17use anyhow::{anyhow, bail, Context, Error};
18use fuchsia_async::{self as fasync};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
21use futures::channel::oneshot;
22use futures::StreamExt;
23use std::sync::Arc;
24
25enum ReaperTask {
26    None,
27    Pending(UnboundedReceiver<Message>),
28    Running(fasync::Task<()>),
29}
30
31/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
32/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
33/// that at mount time, any objects in the graveyard will get removed.  Each object store has a
34/// directory like object that contains a list of the objects within that store that are part of the
35/// graveyard.  A single instance of this Graveyard struct manages *all* stores.
36pub struct Graveyard {
37    object_manager: Arc<ObjectManager>,
38    reaper_task: Mutex<ReaperTask>,
39    channel: UnboundedSender<Message>,
40}
41
42enum Message {
43    // Tombstone the object identified by <store-id>, <object-id>, Option<attribute-id>. If
44    // <attribute-id> is Some, tombstone just the attribute instead of the entire object.
45    Tombstone(u64, u64, Option<u64>),
46
47    // Trims the identified object.
48    Trim(u64, u64),
49
50    // When the flush message is processed, notifies sender.  This allows the receiver to know
51    // that all preceding tombstone messages have been processed.
52    Flush(oneshot::Sender<()>),
53}
54
55#[fxfs_trace::trace]
56impl Graveyard {
57    /// Creates a new instance of the graveyard manager.
58    pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
59        let (sender, receiver) = unbounded();
60        Arc::new(Graveyard {
61            object_manager,
62            reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
63            channel: sender,
64        })
65    }
66
67    /// Creates a graveyard object in `store`.  Returns the object ID for the graveyard object.
68    pub fn create(transaction: &mut Transaction<'_>, store: &ObjectStore) -> u64 {
69        let object_id = store.maybe_get_next_object_id();
70        // This is OK because we only ever create a graveyard as we are creating a new store so
71        // maybe_get_next_object_id will never fail here due to a lack of an object ID cipher.
72        assert_ne!(object_id, INVALID_OBJECT_ID);
73        let now = Timestamp::now();
74        transaction.add(
75            store.store_object_id,
76            Mutation::insert_object(
77                ObjectKey::object(object_id),
78                ObjectValue::Object {
79                    kind: ObjectKind::Graveyard,
80                    attributes: ObjectAttributes {
81                        creation_time: now.clone(),
82                        modification_time: now,
83                        project_id: 0,
84                        ..Default::default()
85                    },
86                },
87            ),
88        );
89        object_id
90    }
91
92    /// Starts an asynchronous task to reap the graveyard for all entries older than
93    /// |journal_offset| (exclusive).
94    /// If a task is already started, this has no effect, even if that task was targeting an older
95    /// |journal_offset|.
96    pub fn reap_async(self: Arc<Self>) {
97        let mut reaper_task = self.reaper_task.lock();
98        if let ReaperTask::Pending(_) = &*reaper_task {
99            if let ReaperTask::Pending(receiver) =
100                std::mem::replace(&mut *reaper_task, ReaperTask::None)
101            {
102                *reaper_task =
103                    ReaperTask::Running(fasync::Task::spawn(self.clone().reap_task(receiver)));
104            } else {
105                unreachable!();
106            }
107        }
108    }
109
110    /// Returns a future which completes when the ongoing reap task (if it exists) completes.
111    pub async fn wait_for_reap(&self) {
112        self.channel.close_channel();
113        let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
114        if let ReaperTask::Running(task) = task {
115            task.await;
116        }
117    }
118
119    async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
120        // Wait and process reap requests.
121        while let Some(message) = receiver.next().await {
122            match message {
123                Message::Tombstone(store_id, object_id, attribute_id) => {
124                    let res = if let Some(attribute_id) = attribute_id {
125                        self.tombstone_attribute(store_id, object_id, attribute_id).await
126                    } else {
127                        self.tombstone_object(store_id, object_id).await
128                    };
129                    if let Err(e) = res {
130                        debug_assert!(
131                            false,
132                            "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}, \
133                             attribute_id: {attribute_id:?}"
134                        );
135                        error!(
136                            error:? = e,
137                            store_id,
138                            oid = object_id,
139                            attribute_id;
140                            "Tombstone error"
141                        );
142                    }
143                }
144                Message::Trim(store_id, object_id) => {
145                    if let Err(e) = self.trim(store_id, object_id).await {
146                        debug_assert!(
147                            false,
148                            "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}"
149                        );
150                        error!(error:? = e, store_id, oid = object_id; "Tombstone error");
151                    }
152                }
153                Message::Flush(sender) => {
154                    let _ = sender.send(());
155                }
156            }
157        }
158    }
159
160    /// Performs the initial mount-time reap for the given store.  This will queue all items in the
161    /// graveyard.  Concurrently adding more entries to the graveyard will lead to undefined
162    /// behaviour: the entries might or might not be immediately tombstoned, so callers should wait
163    /// for this to return before changing to a state where more entries can be added.  Once this
164    /// has returned, entries will be tombstoned in the background.
165    #[trace]
166    pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
167        if store.filesystem().options().skip_initial_reap {
168            return Ok(0);
169        }
170        let mut count = 0;
171        let layer_set = store.tree().layer_set();
172        let mut merger = layer_set.merger();
173        let graveyard_object_id = store.graveyard_directory_object_id();
174        let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
175        let store_id = store.store_object_id();
176        // TODO(https://fxbug.dev/355246066): This iteration can sometimes attempt to reap
177        // attributes after they've been reaped as part of the parent object.
178        while let Some(GraveyardEntryInfo { object_id, attribute_id, sequence: _, value }) =
179            iter.get()
180        {
181            match value {
182                ObjectValue::Some => {
183                    if let Some(attribute_id) = attribute_id {
184                        self.queue_tombstone_attribute(store_id, object_id, attribute_id)
185                    } else {
186                        self.queue_tombstone_object(store_id, object_id)
187                    }
188                }
189                ObjectValue::Trim => {
190                    if attribute_id.is_some() {
191                        return Err(anyhow!(
192                            "Trim is not currently supported for a single attribute"
193                        ));
194                    }
195                    self.queue_trim(store_id, object_id)
196                }
197                _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
198            }
199            count += 1;
200            iter.advance().await?;
201        }
202        Ok(count)
203    }
204
205    /// Queues an object for tombstoning.
206    pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
207        let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
208    }
209
210    /// Queues an object's attribute for tombstoning.
211    pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
212        let _ = self.channel.unbounded_send(Message::Tombstone(
213            store_id,
214            object_id,
215            Some(attribute_id),
216        ));
217    }
218
219    fn queue_trim(&self, store_id: u64, object_id: u64) {
220        let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
221    }
222
223    /// Waits for all preceding queued tombstones to finish.
224    pub async fn flush(&self) {
225        let (sender, receiver) = oneshot::channel::<()>();
226        self.channel.unbounded_send(Message::Flush(sender)).unwrap();
227        receiver.await.unwrap();
228    }
229
230    /// Immediately tombstones (discards) an object in the graveyard.
231    /// NB: Code should generally use |queue_tombstone| instead.
232    pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
233        let store = self
234            .object_manager
235            .store(store_id)
236            .with_context(|| format!("Failed to get store {}", store_id))?;
237        // For now, it's safe to assume that all objects in the root parent and root store should
238        // return space to the metadata reservation, but we might have to revisit that if we end up
239        // with objects that are in other stores.
240        let options = if store_id == self.object_manager.root_parent_store_object_id()
241            || store_id == self.object_manager.root_store_object_id()
242        {
243            Options {
244                skip_journal_checks: true,
245                borrow_metadata_space: true,
246                allocator_reservation: Some(self.object_manager.metadata_reservation()),
247                ..Default::default()
248            }
249        } else {
250            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
251        };
252        store.tombstone_object(object_id, options).await
253    }
254
255    /// Immediately tombstones (discards) and attribute in the graveyard.
256    /// NB: Code should generally use |queue_tombstone| instead.
257    pub async fn tombstone_attribute(
258        &self,
259        store_id: u64,
260        object_id: u64,
261        attribute_id: u64,
262    ) -> Result<(), Error> {
263        let store = self
264            .object_manager
265            .store(store_id)
266            .with_context(|| format!("Failed to get store {}", store_id))?;
267        // For now, it's safe to assume that all objects in the root parent and root store should
268        // return space to the metadata reservation, but we might have to revisit that if we end up
269        // with objects that are in other stores.
270        let options = if store_id == self.object_manager.root_parent_store_object_id()
271            || store_id == self.object_manager.root_store_object_id()
272        {
273            Options {
274                skip_journal_checks: true,
275                borrow_metadata_space: true,
276                allocator_reservation: Some(self.object_manager.metadata_reservation()),
277                ..Default::default()
278            }
279        } else {
280            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
281        };
282        store.tombstone_attribute(object_id, attribute_id, options).await
283    }
284
285    async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
286        let store = self
287            .object_manager
288            .store(store_id)
289            .with_context(|| format!("Failed to get store {}", store_id))?;
290        let fs = store.filesystem();
291        let truncate_guard = fs.truncate_guard(store_id, object_id).await;
292        store.trim(object_id, &truncate_guard).await.context("Failed to trim object")
293    }
294
295    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
296    /// usage:
297    ///
298    ///   let layer_set = graveyard.store().tree().layer_set();
299    ///   let mut merger = layer_set.merger();
300    ///   let mut iter = graveyard.iter(&mut merger).await?;
301    ///
302    pub async fn iter<'a, 'b>(
303        graveyard_object_id: u64,
304        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
305    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
306        Self::iter_from(merger, graveyard_object_id, 0).await
307    }
308
309    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
310    ///
311    ///   let layer_set = graveyard.store().tree().layer_set();
312    ///   let mut merger = layer_set.merger();
313    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
314    ///
315    async fn iter_from<'a, 'b>(
316        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
317        graveyard_object_id: u64,
318        from: u64,
319    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
320        GraveyardIterator::new(
321            graveyard_object_id,
322            merger
323                .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
324                .await?,
325        )
326        .await
327    }
328}
329
330pub struct GraveyardIterator<'a, 'b> {
331    object_id: u64,
332    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
333}
334
335/// Contains information about a graveyard entry associated with a particular object or
336/// attribute.
337#[derive(Debug, PartialEq)]
338pub struct GraveyardEntryInfo {
339    object_id: u64,
340    attribute_id: Option<u64>,
341    sequence: u64,
342    value: ObjectValue,
343}
344
345impl GraveyardEntryInfo {
346    pub fn object_id(&self) -> u64 {
347        self.object_id
348    }
349
350    pub fn attribute_id(&self) -> Option<u64> {
351        self.attribute_id
352    }
353
354    pub fn value(&self) -> &ObjectValue {
355        &self.value
356    }
357}
358
359impl<'a, 'b> GraveyardIterator<'a, 'b> {
360    async fn new(
361        object_id: u64,
362        iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
363    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
364        let mut iter = GraveyardIterator { object_id, iter };
365        iter.skip_deleted_entries().await?;
366        Ok(iter)
367    }
368
369    async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
370        loop {
371            match self.iter.get() {
372                Some(ItemRef {
373                    key: ObjectKey { object_id, .. },
374                    value: ObjectValue::None,
375                    ..
376                }) if *object_id == self.object_id => {}
377                _ => return Ok(()),
378            }
379            self.iter.advance().await?;
380        }
381    }
382
383    pub fn get(&self) -> Option<GraveyardEntryInfo> {
384        match self.iter.get() {
385            Some(ItemRef {
386                key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
387                value,
388                sequence,
389                ..
390            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
391                object_id: *object_id,
392                attribute_id: None,
393                sequence,
394                value: value.clone(),
395            }),
396            Some(ItemRef {
397                key:
398                    ObjectKey {
399                        object_id: oid,
400                        data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
401                    },
402                value,
403                sequence,
404            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
405                object_id: *object_id,
406                attribute_id: Some(*attribute_id),
407                sequence,
408                value: value.clone(),
409            }),
410            _ => None,
411        }
412    }
413
414    pub async fn advance(&mut self) -> Result<(), Error> {
415        self.iter.advance().await?;
416        self.skip_deleted_entries().await
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
423    use crate::errors::FxfsError;
424    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
425    use crate::fsck::fsck;
426    use crate::object_handle::ObjectHandle;
427    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
428    use crate::object_store::object_record::ObjectValue;
429    use crate::object_store::transaction::{lock_keys, Options};
430    use crate::object_store::{HandleOptions, Mutation, ObjectKey, FSVERITY_MERKLE_ATTRIBUTE_ID};
431    use assert_matches::assert_matches;
432    use storage_device::fake_device::FakeDevice;
433    use storage_device::DeviceHolder;
434
435    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
436
437    #[fuchsia::test]
438    async fn test_graveyard() {
439        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
440        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
441        let root_store = fs.root_store();
442
443        // Create and add two objects to the graveyard.
444        let mut transaction = fs
445            .clone()
446            .new_transaction(lock_keys![], Options::default())
447            .await
448            .expect("new_transaction failed");
449
450        root_store.add_to_graveyard(&mut transaction, 3);
451        root_store.add_to_graveyard(&mut transaction, 4);
452        transaction.commit().await.expect("commit failed");
453
454        // Check that we see the objects we added.
455        {
456            let layer_set = root_store.tree().layer_set();
457            let mut merger = layer_set.merger();
458            let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
459                .await
460                .expect("iter failed");
461            assert_matches!(
462                iter.get().expect("missing entry"),
463                GraveyardEntryInfo {
464                    object_id: 3,
465                    attribute_id: None,
466                    value: ObjectValue::Some,
467                    ..
468                }
469            );
470            iter.advance().await.expect("advance failed");
471            assert_matches!(
472                iter.get().expect("missing entry"),
473                GraveyardEntryInfo {
474                    object_id: 4,
475                    attribute_id: None,
476                    value: ObjectValue::Some,
477                    ..
478                }
479            );
480            iter.advance().await.expect("advance failed");
481            assert_eq!(iter.get(), None);
482        }
483
484        // Remove one of the objects.
485        let mut transaction = fs
486            .clone()
487            .new_transaction(lock_keys![], Options::default())
488            .await
489            .expect("new_transaction failed");
490        root_store.remove_from_graveyard(&mut transaction, 4);
491        transaction.commit().await.expect("commit failed");
492
493        // Check that the graveyard has been updated as expected.
494        let layer_set = root_store.tree().layer_set();
495        let mut merger = layer_set.merger();
496        let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
497            .await
498            .expect("iter failed");
499        assert_matches!(
500            iter.get().expect("missing entry"),
501            GraveyardEntryInfo { object_id: 3, attribute_id: None, value: ObjectValue::Some, .. }
502        );
503        iter.advance().await.expect("advance failed");
504        assert_eq!(iter.get(), None);
505    }
506
507    #[fuchsia::test]
508    async fn test_tombstone_attribute() {
509        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
510        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
511        let root_store = fs.root_store();
512        let mut transaction = fs
513            .clone()
514            .new_transaction(lock_keys![], Options::default())
515            .await
516            .expect("new_transaction failed");
517
518        let handle = ObjectStore::create_object(
519            &root_store,
520            &mut transaction,
521            HandleOptions::default(),
522            None,
523        )
524        .await
525        .expect("failed to create object");
526        transaction.commit().await.expect("commit failed");
527
528        handle
529            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
530            .await
531            .expect("failed to write merkle attribute");
532        let object_id = handle.object_id();
533        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
534        transaction.add(
535            root_store.store_object_id(),
536            Mutation::replace_or_insert_object(
537                ObjectKey::graveyard_attribute_entry(
538                    root_store.graveyard_directory_object_id(),
539                    object_id,
540                    FSVERITY_MERKLE_ATTRIBUTE_ID,
541                ),
542                ObjectValue::Some,
543            ),
544        );
545
546        transaction.commit().await.expect("commit failed");
547
548        fs.close().await.expect("failed to close filesystem");
549        let device = fs.take_device().await;
550        device.reopen(false);
551
552        let fs =
553            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
554        fsck(fs.clone()).await.expect("fsck failed");
555        fs.close().await.expect("failed to close filesystem");
556        let device = fs.take_device().await;
557        device.reopen(false);
558
559        // On open, the filesystem will call initial_reap which will call queue_tombstone().
560        let fs = FxFilesystem::open(device).await.expect("open failed");
561        // `wait_for_reap` ensures that the Message::Tombstone is actually processed.
562        fs.graveyard().wait_for_reap().await;
563        let root_store = fs.root_store();
564
565        let handle =
566            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
567                .await
568                .expect("failed to open object");
569
570        assert_eq!(
571            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
572            None
573        );
574        fsck(fs.clone()).await.expect("fsck failed");
575    }
576
577    #[fuchsia::test]
578    async fn test_tombstone_attribute_and_object() {
579        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
580        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
581        let root_store = fs.root_store();
582        let mut transaction = fs
583            .clone()
584            .new_transaction(lock_keys![], Options::default())
585            .await
586            .expect("new_transaction failed");
587
588        let handle = ObjectStore::create_object(
589            &root_store,
590            &mut transaction,
591            HandleOptions::default(),
592            None,
593        )
594        .await
595        .expect("failed to create object");
596        transaction.commit().await.expect("commit failed");
597
598        handle
599            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
600            .await
601            .expect("failed to write merkle attribute");
602        let object_id = handle.object_id();
603        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
604        transaction.add(
605            root_store.store_object_id(),
606            Mutation::replace_or_insert_object(
607                ObjectKey::graveyard_attribute_entry(
608                    root_store.graveyard_directory_object_id(),
609                    object_id,
610                    FSVERITY_MERKLE_ATTRIBUTE_ID,
611                ),
612                ObjectValue::Some,
613            ),
614        );
615        transaction.commit().await.expect("commit failed");
616        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
617        transaction.add(
618            root_store.store_object_id(),
619            Mutation::replace_or_insert_object(
620                ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
621                ObjectValue::Some,
622            ),
623        );
624        transaction.commit().await.expect("commit failed");
625
626        fs.close().await.expect("failed to close filesystem");
627        let device = fs.take_device().await;
628        device.reopen(false);
629
630        let fs =
631            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
632        fsck(fs.clone()).await.expect("fsck failed");
633        fs.close().await.expect("failed to close filesystem");
634        let device = fs.take_device().await;
635        device.reopen(false);
636
637        // On open, the filesystem will call initial_reap which will call queue_tombstone().
638        let fs = FxFilesystem::open(device).await.expect("open failed");
639        // `wait_for_reap` ensures that the two tombstone messages are processed.
640        fs.graveyard().wait_for_reap().await;
641
642        let root_store = fs.root_store();
643        if let Err(e) =
644            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
645        {
646            assert!(FxfsError::NotFound.matches(&e));
647        } else {
648            panic!("open_object succeeded");
649        };
650        fsck(fs.clone()).await.expect("fsck failed");
651    }
652
653    #[fuchsia::test]
654    async fn test_tombstone_large_attribute() {
655        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
656        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
657        let root_store = fs.root_store();
658        let mut transaction = fs
659            .clone()
660            .new_transaction(lock_keys![], Options::default())
661            .await
662            .expect("new_transaction failed");
663
664        let handle = ObjectStore::create_object(
665            &root_store,
666            &mut transaction,
667            HandleOptions::default(),
668            None,
669        )
670        .await
671        .expect("failed to create object");
672        transaction.commit().await.expect("commit failed");
673
674        let object_id = {
675            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
676            transaction.add(
677                root_store.store_object_id(),
678                Mutation::replace_or_insert_object(
679                    ObjectKey::graveyard_attribute_entry(
680                        root_store.graveyard_directory_object_id(),
681                        handle.object_id(),
682                        FSVERITY_MERKLE_ATTRIBUTE_ID,
683                    ),
684                    ObjectValue::Some,
685                ),
686            );
687
688            // This write should span three transactions. This test mimics the behavior when the
689            // last transaction gets interrupted by a filesystem.close().
690            handle
691                .write_new_attr_in_batches(
692                    &mut transaction,
693                    FSVERITY_MERKLE_ATTRIBUTE_ID,
694                    &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
695                    WRITE_ATTR_BATCH_SIZE,
696                )
697                .await
698                .expect("failed to write merkle attribute");
699
700            handle.object_id()
701            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
702            // release the transaction locks.
703        };
704
705        fs.close().await.expect("failed to close filesystem");
706        let device = fs.take_device().await;
707        device.reopen(false);
708
709        let fs =
710            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
711        fsck(fs.clone()).await.expect("fsck failed");
712        fs.close().await.expect("failed to close filesystem");
713        let device = fs.take_device().await;
714        device.reopen(false);
715
716        // On open, the filesystem will call initial_reap which will call queue_tombstone().
717        let fs = FxFilesystem::open(device).await.expect("open failed");
718        // `wait_for_reap` ensures that the two tombstone messages are processed.
719        fs.graveyard().wait_for_reap().await;
720
721        let root_store = fs.root_store();
722
723        let handle =
724            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
725                .await
726                .expect("failed to open object");
727
728        assert_eq!(
729            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
730            None
731        );
732        fsck(fs.clone()).await.expect("fsck failed");
733    }
734}