1use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::merge::{Merger, MergerIterator};
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::lsm_tree::Query;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13 ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use crate::object_store::ObjectStore;
17use anyhow::{anyhow, bail, Context, Error};
18use fuchsia_async::{self as fasync};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
21use futures::channel::oneshot;
22use futures::StreamExt;
23use std::sync::Arc;
24
25enum ReaperTask {
26 None,
27 Pending(UnboundedReceiver<Message>),
28 Running(fasync::Task<()>),
29}
30
31pub struct Graveyard {
37 object_manager: Arc<ObjectManager>,
38 reaper_task: Mutex<ReaperTask>,
39 channel: UnboundedSender<Message>,
40}
41
42enum Message {
43 Tombstone(u64, u64, Option<u64>),
46
47 Trim(u64, u64),
49
50 Flush(oneshot::Sender<()>),
53}
54
55#[fxfs_trace::trace]
56impl Graveyard {
57 pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
59 let (sender, receiver) = unbounded();
60 Arc::new(Graveyard {
61 object_manager,
62 reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
63 channel: sender,
64 })
65 }
66
67 pub fn create(transaction: &mut Transaction<'_>, store: &ObjectStore) -> u64 {
69 let object_id = store.maybe_get_next_object_id();
70 assert_ne!(object_id, INVALID_OBJECT_ID);
73 let now = Timestamp::now();
74 transaction.add(
75 store.store_object_id,
76 Mutation::insert_object(
77 ObjectKey::object(object_id),
78 ObjectValue::Object {
79 kind: ObjectKind::Graveyard,
80 attributes: ObjectAttributes {
81 creation_time: now.clone(),
82 modification_time: now,
83 project_id: 0,
84 ..Default::default()
85 },
86 },
87 ),
88 );
89 object_id
90 }
91
92 pub fn reap_async(self: Arc<Self>) {
97 let mut reaper_task = self.reaper_task.lock();
98 if let ReaperTask::Pending(_) = &*reaper_task {
99 if let ReaperTask::Pending(receiver) =
100 std::mem::replace(&mut *reaper_task, ReaperTask::None)
101 {
102 *reaper_task =
103 ReaperTask::Running(fasync::Task::spawn(self.clone().reap_task(receiver)));
104 } else {
105 unreachable!();
106 }
107 }
108 }
109
110 pub async fn wait_for_reap(&self) {
112 self.channel.close_channel();
113 let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
114 if let ReaperTask::Running(task) = task {
115 task.await;
116 }
117 }
118
119 async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
120 while let Some(message) = receiver.next().await {
122 match message {
123 Message::Tombstone(store_id, object_id, attribute_id) => {
124 let res = if let Some(attribute_id) = attribute_id {
125 self.tombstone_attribute(store_id, object_id, attribute_id).await
126 } else {
127 self.tombstone_object(store_id, object_id).await
128 };
129 if let Err(e) = res {
130 debug_assert!(
131 false,
132 "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}, \
133 attribute_id: {attribute_id:?}"
134 );
135 error!(
136 error:? = e,
137 store_id,
138 oid = object_id,
139 attribute_id;
140 "Tombstone error"
141 );
142 }
143 }
144 Message::Trim(store_id, object_id) => {
145 if let Err(e) = self.trim(store_id, object_id).await {
146 debug_assert!(
147 false,
148 "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}"
149 );
150 error!(error:? = e, store_id, oid = object_id; "Tombstone error");
151 }
152 }
153 Message::Flush(sender) => {
154 let _ = sender.send(());
155 }
156 }
157 }
158 }
159
160 #[trace]
166 pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
167 if store.filesystem().options().skip_initial_reap {
168 return Ok(0);
169 }
170 let mut count = 0;
171 let layer_set = store.tree().layer_set();
172 let mut merger = layer_set.merger();
173 let graveyard_object_id = store.graveyard_directory_object_id();
174 let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
175 let store_id = store.store_object_id();
176 while let Some(GraveyardEntryInfo { object_id, attribute_id, sequence: _, value }) =
179 iter.get()
180 {
181 match value {
182 ObjectValue::Some => {
183 if let Some(attribute_id) = attribute_id {
184 self.queue_tombstone_attribute(store_id, object_id, attribute_id)
185 } else {
186 self.queue_tombstone_object(store_id, object_id)
187 }
188 }
189 ObjectValue::Trim => {
190 if attribute_id.is_some() {
191 return Err(anyhow!(
192 "Trim is not currently supported for a single attribute"
193 ));
194 }
195 self.queue_trim(store_id, object_id)
196 }
197 _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
198 }
199 count += 1;
200 iter.advance().await?;
201 }
202 Ok(count)
203 }
204
205 pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
207 let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
208 }
209
210 pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
212 let _ = self.channel.unbounded_send(Message::Tombstone(
213 store_id,
214 object_id,
215 Some(attribute_id),
216 ));
217 }
218
219 fn queue_trim(&self, store_id: u64, object_id: u64) {
220 let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
221 }
222
223 pub async fn flush(&self) {
225 let (sender, receiver) = oneshot::channel::<()>();
226 self.channel.unbounded_send(Message::Flush(sender)).unwrap();
227 receiver.await.unwrap();
228 }
229
230 pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
233 let store = self
234 .object_manager
235 .store(store_id)
236 .with_context(|| format!("Failed to get store {}", store_id))?;
237 let options = if store_id == self.object_manager.root_parent_store_object_id()
241 || store_id == self.object_manager.root_store_object_id()
242 {
243 Options {
244 skip_journal_checks: true,
245 borrow_metadata_space: true,
246 allocator_reservation: Some(self.object_manager.metadata_reservation()),
247 ..Default::default()
248 }
249 } else {
250 Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
251 };
252 store.tombstone_object(object_id, options).await
253 }
254
255 pub async fn tombstone_attribute(
258 &self,
259 store_id: u64,
260 object_id: u64,
261 attribute_id: u64,
262 ) -> Result<(), Error> {
263 let store = self
264 .object_manager
265 .store(store_id)
266 .with_context(|| format!("Failed to get store {}", store_id))?;
267 let options = if store_id == self.object_manager.root_parent_store_object_id()
271 || store_id == self.object_manager.root_store_object_id()
272 {
273 Options {
274 skip_journal_checks: true,
275 borrow_metadata_space: true,
276 allocator_reservation: Some(self.object_manager.metadata_reservation()),
277 ..Default::default()
278 }
279 } else {
280 Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
281 };
282 store.tombstone_attribute(object_id, attribute_id, options).await
283 }
284
285 async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
286 let store = self
287 .object_manager
288 .store(store_id)
289 .with_context(|| format!("Failed to get store {}", store_id))?;
290 let fs = store.filesystem();
291 let truncate_guard = fs.truncate_guard(store_id, object_id).await;
292 store.trim(object_id, &truncate_guard).await.context("Failed to trim object")
293 }
294
295 pub async fn iter<'a, 'b>(
303 graveyard_object_id: u64,
304 merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
305 ) -> Result<GraveyardIterator<'a, 'b>, Error> {
306 Self::iter_from(merger, graveyard_object_id, 0).await
307 }
308
309 async fn iter_from<'a, 'b>(
316 merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
317 graveyard_object_id: u64,
318 from: u64,
319 ) -> Result<GraveyardIterator<'a, 'b>, Error> {
320 GraveyardIterator::new(
321 graveyard_object_id,
322 merger
323 .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
324 .await?,
325 )
326 .await
327 }
328}
329
330pub struct GraveyardIterator<'a, 'b> {
331 object_id: u64,
332 iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
333}
334
335#[derive(Debug, PartialEq)]
338pub struct GraveyardEntryInfo {
339 object_id: u64,
340 attribute_id: Option<u64>,
341 sequence: u64,
342 value: ObjectValue,
343}
344
345impl GraveyardEntryInfo {
346 pub fn object_id(&self) -> u64 {
347 self.object_id
348 }
349
350 pub fn attribute_id(&self) -> Option<u64> {
351 self.attribute_id
352 }
353
354 pub fn value(&self) -> &ObjectValue {
355 &self.value
356 }
357}
358
359impl<'a, 'b> GraveyardIterator<'a, 'b> {
360 async fn new(
361 object_id: u64,
362 iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
363 ) -> Result<GraveyardIterator<'a, 'b>, Error> {
364 let mut iter = GraveyardIterator { object_id, iter };
365 iter.skip_deleted_entries().await?;
366 Ok(iter)
367 }
368
369 async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
370 loop {
371 match self.iter.get() {
372 Some(ItemRef {
373 key: ObjectKey { object_id, .. },
374 value: ObjectValue::None,
375 ..
376 }) if *object_id == self.object_id => {}
377 _ => return Ok(()),
378 }
379 self.iter.advance().await?;
380 }
381 }
382
383 pub fn get(&self) -> Option<GraveyardEntryInfo> {
384 match self.iter.get() {
385 Some(ItemRef {
386 key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
387 value,
388 sequence,
389 ..
390 }) if *oid == self.object_id => Some(GraveyardEntryInfo {
391 object_id: *object_id,
392 attribute_id: None,
393 sequence,
394 value: value.clone(),
395 }),
396 Some(ItemRef {
397 key:
398 ObjectKey {
399 object_id: oid,
400 data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
401 },
402 value,
403 sequence,
404 }) if *oid == self.object_id => Some(GraveyardEntryInfo {
405 object_id: *object_id,
406 attribute_id: Some(*attribute_id),
407 sequence,
408 value: value.clone(),
409 }),
410 _ => None,
411 }
412 }
413
414 pub async fn advance(&mut self) -> Result<(), Error> {
415 self.iter.advance().await?;
416 self.skip_deleted_entries().await
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
423 use crate::errors::FxfsError;
424 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
425 use crate::fsck::fsck;
426 use crate::object_handle::ObjectHandle;
427 use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
428 use crate::object_store::object_record::ObjectValue;
429 use crate::object_store::transaction::{lock_keys, Options};
430 use crate::object_store::{HandleOptions, Mutation, ObjectKey, FSVERITY_MERKLE_ATTRIBUTE_ID};
431 use assert_matches::assert_matches;
432 use storage_device::fake_device::FakeDevice;
433 use storage_device::DeviceHolder;
434
435 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
436
437 #[fuchsia::test]
438 async fn test_graveyard() {
439 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
440 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
441 let root_store = fs.root_store();
442
443 let mut transaction = fs
445 .clone()
446 .new_transaction(lock_keys![], Options::default())
447 .await
448 .expect("new_transaction failed");
449
450 root_store.add_to_graveyard(&mut transaction, 3);
451 root_store.add_to_graveyard(&mut transaction, 4);
452 transaction.commit().await.expect("commit failed");
453
454 {
456 let layer_set = root_store.tree().layer_set();
457 let mut merger = layer_set.merger();
458 let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
459 .await
460 .expect("iter failed");
461 assert_matches!(
462 iter.get().expect("missing entry"),
463 GraveyardEntryInfo {
464 object_id: 3,
465 attribute_id: None,
466 value: ObjectValue::Some,
467 ..
468 }
469 );
470 iter.advance().await.expect("advance failed");
471 assert_matches!(
472 iter.get().expect("missing entry"),
473 GraveyardEntryInfo {
474 object_id: 4,
475 attribute_id: None,
476 value: ObjectValue::Some,
477 ..
478 }
479 );
480 iter.advance().await.expect("advance failed");
481 assert_eq!(iter.get(), None);
482 }
483
484 let mut transaction = fs
486 .clone()
487 .new_transaction(lock_keys![], Options::default())
488 .await
489 .expect("new_transaction failed");
490 root_store.remove_from_graveyard(&mut transaction, 4);
491 transaction.commit().await.expect("commit failed");
492
493 let layer_set = root_store.tree().layer_set();
495 let mut merger = layer_set.merger();
496 let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
497 .await
498 .expect("iter failed");
499 assert_matches!(
500 iter.get().expect("missing entry"),
501 GraveyardEntryInfo { object_id: 3, attribute_id: None, value: ObjectValue::Some, .. }
502 );
503 iter.advance().await.expect("advance failed");
504 assert_eq!(iter.get(), None);
505 }
506
507 #[fuchsia::test]
508 async fn test_tombstone_attribute() {
509 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
510 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
511 let root_store = fs.root_store();
512 let mut transaction = fs
513 .clone()
514 .new_transaction(lock_keys![], Options::default())
515 .await
516 .expect("new_transaction failed");
517
518 let handle = ObjectStore::create_object(
519 &root_store,
520 &mut transaction,
521 HandleOptions::default(),
522 None,
523 )
524 .await
525 .expect("failed to create object");
526 transaction.commit().await.expect("commit failed");
527
528 handle
529 .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
530 .await
531 .expect("failed to write merkle attribute");
532 let object_id = handle.object_id();
533 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
534 transaction.add(
535 root_store.store_object_id(),
536 Mutation::replace_or_insert_object(
537 ObjectKey::graveyard_attribute_entry(
538 root_store.graveyard_directory_object_id(),
539 object_id,
540 FSVERITY_MERKLE_ATTRIBUTE_ID,
541 ),
542 ObjectValue::Some,
543 ),
544 );
545
546 transaction.commit().await.expect("commit failed");
547
548 fs.close().await.expect("failed to close filesystem");
549 let device = fs.take_device().await;
550 device.reopen(false);
551
552 let fs =
553 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
554 fsck(fs.clone()).await.expect("fsck failed");
555 fs.close().await.expect("failed to close filesystem");
556 let device = fs.take_device().await;
557 device.reopen(false);
558
559 let fs = FxFilesystem::open(device).await.expect("open failed");
561 fs.graveyard().wait_for_reap().await;
563 let root_store = fs.root_store();
564
565 let handle =
566 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
567 .await
568 .expect("failed to open object");
569
570 assert_eq!(
571 handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
572 None
573 );
574 fsck(fs.clone()).await.expect("fsck failed");
575 }
576
577 #[fuchsia::test]
578 async fn test_tombstone_attribute_and_object() {
579 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
580 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
581 let root_store = fs.root_store();
582 let mut transaction = fs
583 .clone()
584 .new_transaction(lock_keys![], Options::default())
585 .await
586 .expect("new_transaction failed");
587
588 let handle = ObjectStore::create_object(
589 &root_store,
590 &mut transaction,
591 HandleOptions::default(),
592 None,
593 )
594 .await
595 .expect("failed to create object");
596 transaction.commit().await.expect("commit failed");
597
598 handle
599 .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
600 .await
601 .expect("failed to write merkle attribute");
602 let object_id = handle.object_id();
603 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
604 transaction.add(
605 root_store.store_object_id(),
606 Mutation::replace_or_insert_object(
607 ObjectKey::graveyard_attribute_entry(
608 root_store.graveyard_directory_object_id(),
609 object_id,
610 FSVERITY_MERKLE_ATTRIBUTE_ID,
611 ),
612 ObjectValue::Some,
613 ),
614 );
615 transaction.commit().await.expect("commit failed");
616 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
617 transaction.add(
618 root_store.store_object_id(),
619 Mutation::replace_or_insert_object(
620 ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
621 ObjectValue::Some,
622 ),
623 );
624 transaction.commit().await.expect("commit failed");
625
626 fs.close().await.expect("failed to close filesystem");
627 let device = fs.take_device().await;
628 device.reopen(false);
629
630 let fs =
631 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
632 fsck(fs.clone()).await.expect("fsck failed");
633 fs.close().await.expect("failed to close filesystem");
634 let device = fs.take_device().await;
635 device.reopen(false);
636
637 let fs = FxFilesystem::open(device).await.expect("open failed");
639 fs.graveyard().wait_for_reap().await;
641
642 let root_store = fs.root_store();
643 if let Err(e) =
644 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
645 {
646 assert!(FxfsError::NotFound.matches(&e));
647 } else {
648 panic!("open_object succeeded");
649 };
650 fsck(fs.clone()).await.expect("fsck failed");
651 }
652
653 #[fuchsia::test]
654 async fn test_tombstone_large_attribute() {
655 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
656 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
657 let root_store = fs.root_store();
658 let mut transaction = fs
659 .clone()
660 .new_transaction(lock_keys![], Options::default())
661 .await
662 .expect("new_transaction failed");
663
664 let handle = ObjectStore::create_object(
665 &root_store,
666 &mut transaction,
667 HandleOptions::default(),
668 None,
669 )
670 .await
671 .expect("failed to create object");
672 transaction.commit().await.expect("commit failed");
673
674 let object_id = {
675 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
676 transaction.add(
677 root_store.store_object_id(),
678 Mutation::replace_or_insert_object(
679 ObjectKey::graveyard_attribute_entry(
680 root_store.graveyard_directory_object_id(),
681 handle.object_id(),
682 FSVERITY_MERKLE_ATTRIBUTE_ID,
683 ),
684 ObjectValue::Some,
685 ),
686 );
687
688 handle
691 .write_new_attr_in_batches(
692 &mut transaction,
693 FSVERITY_MERKLE_ATTRIBUTE_ID,
694 &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
695 WRITE_ATTR_BATCH_SIZE,
696 )
697 .await
698 .expect("failed to write merkle attribute");
699
700 handle.object_id()
701 };
704
705 fs.close().await.expect("failed to close filesystem");
706 let device = fs.take_device().await;
707 device.reopen(false);
708
709 let fs =
710 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
711 fsck(fs.clone()).await.expect("fsck failed");
712 fs.close().await.expect("failed to close filesystem");
713 let device = fs.take_device().await;
714 device.reopen(false);
715
716 let fs = FxFilesystem::open(device).await.expect("open failed");
718 fs.graveyard().wait_for_reap().await;
720
721 let root_store = fs.root_store();
722
723 let handle =
724 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
725 .await
726 .expect("failed to open object");
727
728 assert_eq!(
729 handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
730 None
731 );
732 fsck(fs.clone()).await.expect("fsck failed");
733 }
734}