1use crate::checksum::Checksum;
6use crate::debug_assert_not_too_long;
7use crate::filesystem::{FxFilesystem, TxnGuard};
8use crate::log::*;
9use crate::lsm_tree::types::Item;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{reserved_space_from_journal_usage, ObjectManager};
13use crate::object_store::object_record::{
14 ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43, ObjectItemV46, ObjectKey,
15 ObjectKeyData, ObjectValue, ProjectProperty,
16};
17use crate::serialized_types::{migrate_nodefault, migrate_to_version, Migrate, Versioned};
18use anyhow::Error;
19use either::{Either, Left, Right};
20use fprint::TypeFingerprint;
21use fuchsia_sync::Mutex;
22use futures::future::poll_fn;
23use futures::pin_mut;
24use fxfs_crypto::{WrappedKey, WrappedKeyV32, WrappedKeyV40};
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::hash_map::Entry;
31use std::collections::BTreeSet;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV46;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV46 {
87 ObjectStore(ObjectStoreMutationV46),
88 EncryptedObjectStore(Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKeyV40),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV46)]
104pub enum MutationV43 {
105 ObjectStore(ObjectStoreMutationV43),
106 EncryptedObjectStore(Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKeyV40),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV43)]
118pub enum MutationV41 {
119 ObjectStore(ObjectStoreMutationV41),
120 EncryptedObjectStore(Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
123 EndFlush,
124 DeleteVolume,
125 UpdateBorrowed(u64),
126 UpdateMutationsKey(UpdateMutationsKeyV40),
127 CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV41)]
132pub enum MutationV40 {
133 ObjectStore(ObjectStoreMutationV40),
134 EncryptedObjectStore(Box<[u8]>),
135 Allocator(AllocatorMutationV32),
136 BeginFlush,
138 EndFlush,
141 DeleteVolume,
143 UpdateBorrowed(u64),
144 UpdateMutationsKey(UpdateMutationsKeyV40),
145 CreateInternalDir(u64),
146}
147
148impl Mutation {
149 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
150 Mutation::ObjectStore(ObjectStoreMutation {
151 item: Item::new(key, value),
152 op: Operation::Insert,
153 })
154 }
155
156 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
157 Mutation::ObjectStore(ObjectStoreMutation {
158 item: Item::new(key, value),
159 op: Operation::ReplaceOrInsert,
160 })
161 }
162
163 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
164 Mutation::ObjectStore(ObjectStoreMutation {
165 item: Item::new(key, value),
166 op: Operation::Merge,
167 })
168 }
169
170 pub fn update_mutations_key(key: WrappedKey) -> Self {
171 Mutation::UpdateMutationsKey(key.into())
172 }
173}
174
175pub type ObjectStoreMutation = ObjectStoreMutationV46;
179
180#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
181#[migrate_nodefault]
182#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
183pub struct ObjectStoreMutationV46 {
184 pub item: ObjectItemV46,
185 pub op: OperationV32,
186}
187
188#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
189#[migrate_nodefault]
190pub struct ObjectStoreMutationV43 {
191 pub item: ObjectItemV43,
192 pub op: OperationV32,
193}
194
195#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
196#[migrate_to_version(ObjectStoreMutationV43)]
197#[migrate_nodefault]
198pub struct ObjectStoreMutationV41 {
199 pub item: ObjectItemV41,
200 pub op: OperationV32,
201}
202
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
204#[migrate_nodefault]
205#[migrate_to_version(ObjectStoreMutationV41)]
206pub struct ObjectStoreMutationV40 {
207 pub item: ObjectItemV40,
208 pub op: OperationV32,
209}
210
211pub type Operation = OperationV32;
213
214#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
215#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
216pub enum OperationV32 {
217 Insert,
218 ReplaceOrInsert,
219 Merge,
220}
221
222impl Ord for ObjectStoreMutation {
223 fn cmp(&self, other: &Self) -> Ordering {
224 self.item.key.cmp(&other.item.key)
225 }
226}
227
228impl PartialOrd for ObjectStoreMutation {
229 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
230 Some(self.cmp(other))
231 }
232}
233
234impl PartialEq for ObjectStoreMutation {
235 fn eq(&self, other: &Self) -> bool {
236 self.item.key.eq(&other.item.key)
237 }
238}
239
240impl Eq for ObjectStoreMutation {}
241
242impl Ord for AllocatorItem {
243 fn cmp(&self, other: &Self) -> Ordering {
244 self.key.cmp(&other.key)
245 }
246}
247
248impl PartialOrd for AllocatorItem {
249 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
250 Some(self.cmp(other))
251 }
252}
253
254pub type DeviceRange = DeviceRangeV32;
257
258#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
259#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
260pub struct DeviceRangeV32(pub Range<u64>);
261
262impl Deref for DeviceRange {
263 type Target = Range<u64>;
264
265 fn deref(&self) -> &Self::Target {
266 &self.0
267 }
268}
269
270impl DerefMut for DeviceRange {
271 fn deref_mut(&mut self) -> &mut Self::Target {
272 &mut self.0
273 }
274}
275
276impl From<Range<u64>> for DeviceRange {
277 fn from(range: Range<u64>) -> Self {
278 Self(range)
279 }
280}
281
282impl Into<Range<u64>> for DeviceRange {
283 fn into(self) -> Range<u64> {
284 self.0
285 }
286}
287
288impl Ord for DeviceRange {
289 fn cmp(&self, other: &Self) -> Ordering {
290 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
291 }
292}
293
294impl PartialOrd for DeviceRange {
295 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
296 Some(self.cmp(other))
297 }
298}
299
300pub type AllocatorMutation = AllocatorMutationV32;
301
302#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
303#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
304pub enum AllocatorMutationV32 {
305 Allocate {
306 device_range: DeviceRangeV32,
307 owner_object_id: u64,
308 },
309 Deallocate {
310 device_range: DeviceRangeV32,
311 owner_object_id: u64,
312 },
313 SetLimit {
314 owner_object_id: u64,
315 bytes: u64,
316 },
317 MarkForDeletion(u64),
323}
324
325pub type UpdateMutationsKey = UpdateMutationsKeyV40;
326
327#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
328pub struct UpdateMutationsKeyV40(pub WrappedKeyV40);
329
330#[derive(Serialize, Deserialize, TypeFingerprint)]
331pub struct UpdateMutationsKeyV32(pub WrappedKeyV32);
332
333impl From<UpdateMutationsKeyV32> for UpdateMutationsKeyV40 {
334 fn from(value: UpdateMutationsKeyV32) -> Self {
335 Self(value.0.into())
336 }
337}
338
339impl From<UpdateMutationsKey> for WrappedKey {
340 fn from(outer: UpdateMutationsKey) -> Self {
341 outer.0
342 }
343}
344
345impl From<WrappedKey> for UpdateMutationsKey {
346 fn from(inner: WrappedKey) -> Self {
347 Self(inner)
348 }
349}
350
351#[cfg(fuzz)]
352impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
353 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
354 <u128>::arbitrary(u).map(|wrapping_key_id| {
355 UpdateMutationsKey::from(WrappedKey {
356 wrapping_key_id,
357 key: fxfs_crypto::WrappedKeyBytes::default(),
359 })
360 })
361 }
362}
363
364impl Ord for UpdateMutationsKey {
365 fn cmp(&self, other: &Self) -> Ordering {
366 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
367 }
368}
369
370impl PartialOrd for UpdateMutationsKey {
371 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
372 Some(self.cmp(other))
373 }
374}
375
376impl Eq for UpdateMutationsKey {}
377
378impl PartialEq for UpdateMutationsKey {
379 fn eq(&self, other: &Self) -> bool {
380 std::ptr::eq(self, other)
381 }
382}
383
384#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
394pub enum LockKey {
395 Filesystem,
397
398 Flush {
400 object_id: u64,
401 },
402
403 ObjectAttribute {
405 store_object_id: u64,
406 object_id: u64,
407 attribute_id: u64,
408 },
409
410 Object {
412 store_object_id: u64,
413 object_id: u64,
414 },
415
416 ProjectId {
417 store_object_id: u64,
418 project_id: u64,
419 },
420
421 Truncate {
423 store_object_id: u64,
424 object_id: u64,
425 },
426}
427
428impl LockKey {
429 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
430 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
431 }
432
433 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
434 LockKey::Object { store_object_id, object_id }
435 }
436
437 pub const fn flush(object_id: u64) -> Self {
438 LockKey::Flush { object_id }
439 }
440
441 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
442 LockKey::Truncate { store_object_id, object_id }
443 }
444}
445
446#[derive(Clone, Debug)]
448pub enum LockKeys {
449 None,
450 Inline(LockKey),
451 Vec(Vec<LockKey>),
452}
453
454impl LockKeys {
455 pub fn with_capacity(capacity: usize) -> Self {
456 if capacity > 1 {
457 LockKeys::Vec(Vec::with_capacity(capacity))
458 } else {
459 LockKeys::None
460 }
461 }
462
463 pub fn push(&mut self, key: LockKey) {
464 match self {
465 Self::None => *self = LockKeys::Inline(key),
466 Self::Inline(inline) => {
467 *self = LockKeys::Vec(vec![*inline, key]);
468 }
469 Self::Vec(vec) => vec.push(key),
470 }
471 }
472
473 pub fn truncate(&mut self, len: usize) {
474 match self {
475 Self::None => {}
476 Self::Inline(_) => {
477 if len == 0 {
478 *self = Self::None;
479 }
480 }
481 Self::Vec(vec) => vec.truncate(len),
482 }
483 }
484
485 fn len(&self) -> usize {
486 match self {
487 Self::None => 0,
488 Self::Inline(_) => 1,
489 Self::Vec(vec) => vec.len(),
490 }
491 }
492
493 fn contains(&self, key: &LockKey) -> bool {
494 match self {
495 Self::None => false,
496 Self::Inline(single) => single == key,
497 Self::Vec(vec) => vec.contains(key),
498 }
499 }
500
501 fn sort_unstable(&mut self) {
502 match self {
503 Self::Vec(vec) => vec.sort_unstable(),
504 _ => {}
505 }
506 }
507
508 fn dedup(&mut self) {
509 match self {
510 Self::Vec(vec) => vec.dedup(),
511 _ => {}
512 }
513 }
514
515 fn iter(&self) -> LockKeysIter<'_> {
516 match self {
517 LockKeys::None => LockKeysIter::None,
518 LockKeys::Inline(key) => LockKeysIter::Inline(key),
519 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
520 }
521 }
522}
523
524enum LockKeysIter<'a> {
525 None,
526 Inline(&'a LockKey),
527 Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
528}
529
530impl<'a> Iterator for LockKeysIter<'a> {
531 type Item = &'a LockKey;
532 fn next(&mut self) -> Option<Self::Item> {
533 match self {
534 Self::None => None,
535 Self::Inline(inline) => {
536 let next = *inline;
537 *self = Self::None;
538 Some(next)
539 }
540 Self::Vec(vec) => vec.next(),
541 }
542 }
543}
544
545impl Default for LockKeys {
546 fn default() -> Self {
547 LockKeys::None
548 }
549}
550
551#[macro_export]
552macro_rules! lock_keys {
553 () => {
554 $crate::object_store::transaction::LockKeys::None
555 };
556 ($lock_key:expr $(,)?) => {
557 $crate::object_store::transaction::LockKeys::Inline($lock_key)
558 };
559 ($($lock_keys:expr),+ $(,)?) => {
560 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
561 };
562}
563pub use lock_keys;
564
565pub trait AssociatedObject: Send + Sync {
569 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
570 }
571}
572
573pub enum AssocObj<'a> {
574 None,
575 Borrowed(&'a (dyn AssociatedObject)),
576 Owned(Box<dyn AssociatedObject>),
577}
578
579impl AssocObj<'_> {
580 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
581 match self {
582 AssocObj::None => None,
583 AssocObj::Borrowed(ref b) => Some(f(*b)),
584 AssocObj::Owned(ref o) => Some(f(o.as_ref())),
585 }
586 }
587}
588
589pub struct TxnMutation<'a> {
590 pub object_id: u64,
594
595 pub mutation: Mutation,
597
598 pub associated_object: AssocObj<'a>,
601}
602
603impl Ord for TxnMutation<'_> {
606 fn cmp(&self, other: &Self) -> Ordering {
607 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
608 }
609}
610
611impl PartialOrd for TxnMutation<'_> {
612 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
613 Some(self.cmp(other))
614 }
615}
616
617impl PartialEq for TxnMutation<'_> {
618 fn eq(&self, other: &Self) -> bool {
619 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
620 }
621}
622
623impl Eq for TxnMutation<'_> {}
624
625impl std::fmt::Debug for TxnMutation<'_> {
626 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
627 f.debug_struct("TxnMutation")
628 .field("object_id", &self.object_id)
629 .field("mutation", &self.mutation)
630 .finish()
631 }
632}
633
634pub enum MetadataReservation {
635 None,
637
638 Borrowed,
641
642 Reservation(Reservation),
644
645 Hold(u64),
647}
648
649pub struct Transaction<'a> {
651 txn_guard: TxnGuard<'a>,
652
653 mutations: BTreeSet<TxnMutation<'a>>,
655
656 txn_locks: LockKeys,
658
659 pub allocator_reservation: Option<&'a Reservation>,
661
662 pub metadata_reservation: MetadataReservation,
664
665 new_objects: BTreeSet<(u64, u64)>,
668
669 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
671}
672
673impl<'a> Transaction<'a> {
674 pub async fn new(
677 txn_guard: TxnGuard<'a>,
678 options: Options<'a>,
679 txn_locks: LockKeys,
680 ) -> Result<Transaction<'a>, Error> {
681 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
682 let fs = txn_guard.fs().clone();
683 let guard = scopeguard::guard((), |_| fs.sub_transaction());
684 let (metadata_reservation, allocator_reservation, hold) =
685 txn_guard.fs().reservation_for_transaction(options).await?;
686
687 let txn_locks = {
688 let lock_manager = txn_guard.fs().lock_manager();
689 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
690 std::mem::take(&mut write_guard.0.lock_keys)
691 };
692 let mut transaction = Transaction {
693 txn_guard,
694 mutations: BTreeSet::new(),
695 txn_locks,
696 allocator_reservation: None,
697 metadata_reservation,
698 new_objects: BTreeSet::new(),
699 checksums: Vec::new(),
700 };
701
702 ScopeGuard::into_inner(guard);
703 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
705 Ok(transaction)
706 }
707
708 pub fn txn_guard(&self) -> &TxnGuard<'_> {
709 &self.txn_guard
710 }
711
712 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
713 &self.mutations
714 }
715
716 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
717 self.new_objects.clear();
718 mem::take(&mut self.mutations)
719 }
720
721 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
724 self.add_with_object(object_id, mutation, AssocObj::None)
725 }
726
727 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
729 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
730 if self.mutations.remove(&txn_mutation) {
731 if let Mutation::ObjectStore(ObjectStoreMutation {
732 item:
733 ObjectItem {
734 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
735 ..
736 },
737 op: Operation::Insert,
738 }) = txn_mutation.mutation
739 {
740 self.new_objects.remove(&(object_id, new_object_id));
741 }
742 }
743 }
744
745 pub fn add_with_object(
748 &mut self,
749 object_id: u64,
750 mutation: Mutation,
751 associated_object: AssocObj<'a>,
752 ) -> Option<Mutation> {
753 assert!(object_id != INVALID_OBJECT_ID);
754 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
755 self.verify_locks(&txn_mutation);
756 self.mutations.replace(txn_mutation).map(|m| m.mutation)
757 }
758
759 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
760 self.checksums.push((range, checksums, first_write));
761 }
762
763 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
764 &self.checksums
765 }
766
767 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
768 std::mem::replace(&mut self.checksums, Vec::new())
769 }
770
771 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
772 if let TxnMutation {
776 mutation:
777 Mutation::ObjectStore { 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op } },
778 object_id: store_object_id,
779 ..
780 } = mutation
781 {
782 match &key.data {
783 ObjectKeyData::Attribute(..) => {
784 }
786 ObjectKeyData::Child { .. }
787 | ObjectKeyData::EncryptedChild { .. }
788 | ObjectKeyData::CasefoldChild { .. } => {
789 let id = key.object_id;
790 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
791 && !self.new_objects.contains(&(*store_object_id, id))
792 {
793 debug_assert!(
794 false,
795 "Not holding required lock for object {id} \
796 in store {store_object_id}"
797 );
798 error!(
799 "Not holding required lock for object {id} in store \
800 {store_object_id}"
801 )
802 }
803 }
804 ObjectKeyData::GraveyardEntry { .. } => {
805 }
807 ObjectKeyData::GraveyardAttributeEntry { .. } => {
808 }
810 ObjectKeyData::Keys => {
811 let id = key.object_id;
812 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
813 && !self.new_objects.contains(&(*store_object_id, id))
814 {
815 debug_assert!(
816 false,
817 "Not holding required lock for object {id} \
818 in store {store_object_id}"
819 );
820 error!(
821 "Not holding required lock for object {id} in store \
822 {store_object_id}"
823 )
824 }
825 }
826 ObjectKeyData::Object => match op {
827 Operation::Insert => {
829 self.new_objects.insert((*store_object_id, key.object_id));
830 }
831 Operation::Merge | Operation::ReplaceOrInsert => {
832 let id = key.object_id;
833 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
834 && !self.new_objects.contains(&(*store_object_id, id))
835 {
836 debug_assert!(
837 false,
838 "Not holding required lock for object {id} \
839 in store {store_object_id}"
840 );
841 error!(
842 "Not holding required lock for object {id} in store \
843 {store_object_id}"
844 )
845 }
846 }
847 },
848 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
849 if !self.txn_locks.contains(&LockKey::ProjectId {
850 store_object_id: *store_object_id,
851 project_id: *project_id,
852 }) {
853 debug_assert!(
854 false,
855 "Not holding required lock for project limit id {project_id} \
856 in store {store_object_id}"
857 );
858 error!(
859 "Not holding required lock for project limit id {project_id} in \
860 store {store_object_id}"
861 )
862 }
863 }
864 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
865 Operation::Insert | Operation::ReplaceOrInsert => {
866 panic!(
867 "Project usage is all handled by merging deltas, no inserts or \
868 replacements should be used"
869 );
870 }
871 Operation::Merge => {}
873 },
874 ObjectKeyData::ExtendedAttribute { .. } => {
875 let id = key.object_id;
876 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
877 && !self.new_objects.contains(&(*store_object_id, id))
878 {
879 debug_assert!(
880 false,
881 "Not holding required lock for object {id} \
882 in store {store_object_id} while mutating extended attribute"
883 );
884 error!(
885 "Not holding required lock for object {id} in store \
886 {store_object_id} while mutating extended attribute"
887 )
888 }
889 }
890 }
891 }
892 }
893
894 pub fn is_empty(&self) -> bool {
896 self.mutations.is_empty()
897 }
898
899 pub fn get_object_mutation(
902 &self,
903 store_object_id: u64,
904 key: ObjectKey,
905 ) -> Option<&ObjectStoreMutation> {
906 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
907 self.mutations.get(&TxnMutation {
908 object_id: store_object_id,
909 mutation: Mutation::insert_object(key, ObjectValue::None),
910 associated_object: AssocObj::None,
911 })
912 {
913 Some(mutation)
914 } else {
915 None
916 }
917 }
918
919 pub async fn commit(mut self) -> Result<u64, Error> {
921 debug!(txn:? = &self; "Commit");
922 self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
923 }
924
925 pub async fn commit_with_callback<R: Send>(
928 mut self,
929 f: impl FnOnce(u64) -> R + Send,
930 ) -> Result<R, Error> {
931 debug!(txn:? = &self; "Commit");
932 let mut f = Some(f);
935 let mut result = None;
936 self.txn_guard
937 .fs()
938 .clone()
939 .commit_transaction(&mut self, &mut |offset| {
940 result = Some(f.take().unwrap()(offset));
941 })
942 .await?;
943 Ok(result.unwrap())
944 }
945
946 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
949 debug!(txn:? = self; "Commit");
950 self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
951 assert!(self.mutations.is_empty());
952 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
953 Ok(())
954 }
955}
956
957impl Drop for Transaction<'_> {
958 fn drop(&mut self) {
959 debug!(txn:? = &self; "Drop");
962 self.txn_guard.fs().clone().drop_transaction(self);
963 }
964}
965
966impl std::fmt::Debug for Transaction<'_> {
967 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
968 f.debug_struct("Transaction")
969 .field("mutations", &self.mutations)
970 .field("txn_locks", &self.txn_locks)
971 .field("reservation", &self.allocator_reservation)
972 .finish()
973 }
974}
975
976pub enum BorrowedOrOwned<'a, T> {
977 Borrowed(&'a T),
978 Owned(T),
979}
980
981impl<T> Deref for BorrowedOrOwned<'_, T> {
982 type Target = T;
983
984 fn deref(&self) -> &Self::Target {
985 match self {
986 BorrowedOrOwned::Borrowed(b) => b,
987 BorrowedOrOwned::Owned(o) => &o,
988 }
989 }
990}
991
992impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
993 fn from(value: &'a T) -> Self {
994 BorrowedOrOwned::Borrowed(value)
995 }
996}
997
998impl<T> From<T> for BorrowedOrOwned<'_, T> {
999 fn from(value: T) -> Self {
1000 BorrowedOrOwned::Owned(value)
1001 }
1002}
1003
1004pub struct LockManager {
1033 locks: Mutex<Locks>,
1034}
1035
1036struct Locks {
1037 keys: HashMap<LockKey, LockEntry>,
1038}
1039
1040impl Locks {
1041 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1042 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1043 let entry = occupied.get_mut();
1044 let wake = match state {
1045 LockState::ReadLock => {
1046 entry.read_count -= 1;
1047 entry.read_count == 0
1048 }
1049 LockState::Locked | LockState::WriteLock => {
1051 entry.state = LockState::ReadLock;
1052 true
1053 }
1054 };
1055 if wake {
1056 unsafe {
1058 entry.wake();
1059 }
1060 if entry.can_remove() {
1061 occupied.remove_entry();
1062 }
1063 }
1064 } else {
1065 unreachable!();
1066 }
1067 }
1068
1069 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1070 for lock in lock_keys.iter() {
1071 self.drop_lock(*lock, LockState::ReadLock);
1072 }
1073 }
1074
1075 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1076 for lock in lock_keys.iter() {
1077 self.drop_lock(*lock, LockState::WriteLock);
1080 }
1081 }
1082
1083 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1085 for lock in lock_keys.iter() {
1086 unsafe {
1088 self.keys.get_mut(lock).unwrap().downgrade_lock();
1089 }
1090 }
1091 }
1092}
1093
1094#[derive(Debug)]
1095struct LockEntry {
1096 read_count: u64,
1099
1100 state: LockState,
1102
1103 head: *const LockWaker,
1108 tail: *const LockWaker,
1109}
1110
1111unsafe impl Send for LockEntry {}
1112
1113struct LockWaker {
1116 next: UnsafeCell<*const LockWaker>,
1118 prev: UnsafeCell<*const LockWaker>,
1119
1120 key: LockKey,
1123
1124 waker: UnsafeCell<WakerState>,
1126
1127 target_state: LockState,
1129
1130 is_upgrade: bool,
1132
1133 _pin: PhantomPinned,
1135}
1136
1137enum WakerState {
1138 Pending,
1140
1141 Registered(Waker),
1143
1144 Woken,
1146}
1147
1148impl WakerState {
1149 fn is_woken(&self) -> bool {
1150 matches!(self, WakerState::Woken)
1151 }
1152}
1153
1154unsafe impl Send for LockWaker {}
1155unsafe impl Sync for LockWaker {}
1156
1157impl LockWaker {
1158 async fn wait(&self, manager: &LockManager) {
1160 let waker_guard = scopeguard::guard((), |_| {
1162 let mut locks = manager.locks.lock();
1163 unsafe {
1165 if (*self.waker.get()).is_woken() {
1166 if self.is_upgrade {
1168 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1169 } else {
1170 locks.drop_lock(self.key, self.target_state);
1171 }
1172 } else {
1173 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1176 }
1177 }
1178 });
1179
1180 poll_fn(|cx| {
1181 let _locks = manager.locks.lock();
1182 unsafe {
1184 if (*self.waker.get()).is_woken() {
1185 Poll::Ready(())
1186 } else {
1187 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1188 Poll::Pending
1189 }
1190 }
1191 })
1192 .await;
1193
1194 ScopeGuard::into_inner(waker_guard);
1195 }
1196}
1197
1198#[derive(Copy, Clone, Debug, PartialEq)]
1199enum LockState {
1200 ReadLock,
1202
1203 Locked,
1206
1207 WriteLock,
1209}
1210
1211impl LockManager {
1212 pub fn new() -> Self {
1213 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1214 }
1215
1216 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1220 TransactionLocks(
1221 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1222 )
1223 }
1224
1225 async fn lock<'a>(
1228 &'a self,
1229 mut lock_keys: LockKeys,
1230 target_state: LockState,
1231 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1232 let mut guard = match &target_state {
1233 LockState::ReadLock => Left(ReadGuard {
1234 manager: self.into(),
1235 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1236 }),
1237 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1238 manager: self.into(),
1239 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1240 }),
1241 };
1242 let guard_keys = match &mut guard {
1243 Left(g) => &mut g.lock_keys,
1244 Right(g) => &mut g.lock_keys,
1245 };
1246 lock_keys.sort_unstable();
1247 lock_keys.dedup();
1248 for lock in lock_keys.iter() {
1249 let lock_waker = None;
1250 pin_mut!(lock_waker);
1251 {
1252 let mut locks = self.locks.lock();
1253 match locks.keys.entry(*lock) {
1254 Entry::Vacant(vacant) => {
1255 vacant.insert(LockEntry {
1256 read_count: if let LockState::ReadLock = target_state {
1257 guard_keys.push(*lock);
1258 1
1259 } else {
1260 guard_keys.push(*lock);
1261 0
1262 },
1263 state: target_state,
1264 head: std::ptr::null(),
1265 tail: std::ptr::null(),
1266 });
1267 }
1268 Entry::Occupied(mut occupied) => {
1269 let entry = occupied.get_mut();
1270 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1272 if let LockState::ReadLock = target_state {
1273 entry.read_count += 1;
1274 guard_keys.push(*lock);
1275 } else {
1276 entry.state = target_state;
1277 guard_keys.push(*lock);
1278 }
1279 } else {
1280 unsafe {
1283 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1284 next: UnsafeCell::new(std::ptr::null()),
1285 prev: UnsafeCell::new(entry.tail),
1286 key: *lock,
1287 waker: UnsafeCell::new(WakerState::Pending),
1288 target_state: target_state,
1289 is_upgrade: false,
1290 _pin: PhantomPinned,
1291 });
1292 }
1293 let waker = (*lock_waker).as_ref().unwrap();
1294 if entry.tail.is_null() {
1295 entry.head = waker;
1296 } else {
1297 unsafe {
1299 *(*entry.tail).next.get() = waker;
1300 }
1301 }
1302 entry.tail = waker;
1303 }
1304 }
1305 }
1306 }
1307 if let Some(waker) = &*lock_waker {
1308 waker.wait(self).await;
1309 guard_keys.push(*lock);
1310 }
1311 }
1312 guard
1313 }
1314
1315 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1317 let mut locks = self.locks.lock();
1318 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1319 }
1320
1321 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1323 self.commit_prepare_keys(&transaction.txn_locks).await;
1324 }
1325
1326 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1327 for lock in lock_keys.iter() {
1328 let lock_waker = None;
1329 pin_mut!(lock_waker);
1330 {
1331 let mut locks = self.locks.lock();
1332 let entry = locks.keys.get_mut(lock).unwrap();
1333 assert_eq!(entry.state, LockState::Locked);
1334
1335 if entry.read_count == 0 {
1336 entry.state = LockState::WriteLock;
1337 } else {
1338 unsafe {
1341 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1342 next: UnsafeCell::new(entry.head),
1343 prev: UnsafeCell::new(std::ptr::null()),
1344 key: *lock,
1345 waker: UnsafeCell::new(WakerState::Pending),
1346 target_state: LockState::WriteLock,
1347 is_upgrade: true,
1348 _pin: PhantomPinned,
1349 });
1350 }
1351 let waker = (*lock_waker).as_ref().unwrap();
1352 if entry.head.is_null() {
1353 entry.tail = (*lock_waker).as_ref().unwrap();
1354 } else {
1355 unsafe {
1357 *(*entry.head).prev.get() = waker;
1358 }
1359 }
1360 entry.head = waker;
1361 }
1362 }
1363
1364 if let Some(waker) = &*lock_waker {
1365 waker.wait(self).await;
1366 }
1367 }
1368 }
1369
1370 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1376 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1377 }
1378
1379 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1382 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1383 }
1384
1385 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1388 self.locks.lock().downgrade_locks(lock_keys);
1389 }
1390}
1391
1392impl LockEntry {
1394 unsafe fn wake(&mut self) {
1395 if self.head.is_null() || self.state == LockState::WriteLock {
1397 return;
1398 }
1399
1400 let waker = &*self.head;
1401
1402 if waker.is_upgrade {
1403 if self.read_count > 0 {
1404 return;
1405 }
1406 } else if !self.is_allowed(waker.target_state, true) {
1407 return;
1408 }
1409
1410 self.pop_and_wake();
1411
1412 if waker.target_state == LockState::WriteLock {
1415 return;
1416 }
1417
1418 while !self.head.is_null() && (*self.head).target_state == LockState::ReadLock {
1419 self.pop_and_wake();
1420 }
1421 }
1422
1423 unsafe fn pop_and_wake(&mut self) {
1424 let waker = &*self.head;
1425
1426 self.head = *waker.next.get();
1428 if self.head.is_null() {
1429 self.tail = std::ptr::null()
1430 } else {
1431 *(*self.head).prev.get() = std::ptr::null();
1432 }
1433
1434 if waker.target_state == LockState::ReadLock {
1436 self.read_count += 1;
1437 } else {
1438 self.state = waker.target_state;
1439 }
1440
1441 if let WakerState::Registered(waker) =
1443 std::mem::replace(&mut *waker.waker.get(), WakerState::Woken)
1444 {
1445 waker.wake();
1446 }
1447 }
1448
1449 fn can_remove(&self) -> bool {
1450 self.state == LockState::ReadLock && self.read_count == 0
1451 }
1452
1453 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1454 let is_first = (*waker.prev.get()).is_null();
1455 if is_first {
1456 self.head = *waker.next.get();
1457 } else {
1458 *(**waker.prev.get()).next.get() = *waker.next.get();
1459 }
1460 if (*waker.next.get()).is_null() {
1461 self.tail = *waker.prev.get();
1462 } else {
1463 *(**waker.next.get()).prev.get() = *waker.prev.get();
1464 }
1465 if is_first {
1466 self.wake();
1468 }
1469 }
1470
1471 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1475 match self.state {
1476 LockState::ReadLock => {
1477 (self.read_count == 0
1479 || target_state == LockState::Locked
1480 || target_state == LockState::ReadLock)
1481 && is_head
1482 }
1483 LockState::Locked => {
1484 target_state == LockState::ReadLock && (is_head || !(*self.head).is_upgrade)
1488 }
1489 LockState::WriteLock => false,
1490 }
1491 }
1492
1493 unsafe fn downgrade_lock(&mut self) {
1494 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1495 self.wake();
1496 }
1497}
1498
1499#[must_use]
1500pub struct ReadGuard<'a> {
1501 manager: LockManagerRef<'a>,
1502 lock_keys: LockKeys,
1503}
1504
1505impl ReadGuard<'_> {
1506 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1507 if let LockManagerRef::Owned(fs) = &self.manager {
1508 Some(fs)
1509 } else {
1510 None
1511 }
1512 }
1513
1514 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1515 ReadGuard {
1516 manager: LockManagerRef::Owned(fs),
1517 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1518 }
1519 }
1520}
1521
1522impl Drop for ReadGuard<'_> {
1523 fn drop(&mut self) {
1524 let mut locks = self.manager.locks.lock();
1525 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1526 }
1527}
1528
1529impl fmt::Debug for ReadGuard<'_> {
1530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1531 f.debug_struct("ReadGuard")
1532 .field("manager", &(&self.manager as *const _))
1533 .field("lock_keys", &self.lock_keys)
1534 .finish()
1535 }
1536}
1537
1538#[must_use]
1539pub struct WriteGuard<'a> {
1540 manager: LockManagerRef<'a>,
1541 lock_keys: LockKeys,
1542}
1543
1544impl Drop for WriteGuard<'_> {
1545 fn drop(&mut self) {
1546 let mut locks = self.manager.locks.lock();
1547 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1548 }
1549}
1550
1551impl fmt::Debug for WriteGuard<'_> {
1552 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1553 f.debug_struct("WriteGuard")
1554 .field("manager", &(&self.manager as *const _))
1555 .field("lock_keys", &self.lock_keys)
1556 .finish()
1557 }
1558}
1559
1560enum LockManagerRef<'a> {
1561 Borrowed(&'a LockManager),
1562 Owned(Arc<FxFilesystem>),
1563}
1564
1565impl Deref for LockManagerRef<'_> {
1566 type Target = LockManager;
1567
1568 fn deref(&self) -> &Self::Target {
1569 match self {
1570 LockManagerRef::Borrowed(m) => m,
1571 LockManagerRef::Owned(f) => f.lock_manager(),
1572 }
1573 }
1574}
1575
1576impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1577 fn from(value: &'a LockManager) -> Self {
1578 LockManagerRef::Borrowed(value)
1579 }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1585 use crate::filesystem::FxFilesystem;
1586 use fuchsia_async as fasync;
1587 use fuchsia_sync::Mutex;
1588 use futures::channel::oneshot::channel;
1589 use futures::future::FutureExt;
1590 use futures::stream::FuturesUnordered;
1591 use futures::{join, pin_mut, StreamExt};
1592 use std::task::Poll;
1593 use std::time::Duration;
1594 use storage_device::fake_device::FakeDevice;
1595 use storage_device::DeviceHolder;
1596
1597 #[fuchsia::test]
1598 async fn test_simple() {
1599 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1600 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1601 let mut t = fs
1602 .clone()
1603 .new_transaction(lock_keys![], Options::default())
1604 .await
1605 .expect("new_transaction failed");
1606 t.add(1, Mutation::BeginFlush);
1607 assert!(!t.is_empty());
1608 }
1609
1610 #[fuchsia::test]
1611 async fn test_locks() {
1612 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1613 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1614 let (send1, recv1) = channel();
1615 let (send2, recv2) = channel();
1616 let (send3, recv3) = channel();
1617 let done = Mutex::new(false);
1618 let mut futures = FuturesUnordered::new();
1619 futures.push(
1620 async {
1621 let _t = fs
1622 .clone()
1623 .new_transaction(
1624 lock_keys![LockKey::object_attribute(1, 2, 3)],
1625 Options::default(),
1626 )
1627 .await
1628 .expect("new_transaction failed");
1629 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1632 fasync::Timer::new(Duration::from_millis(100)).await;
1634 assert!(!*done.lock());
1635 }
1636 .boxed(),
1637 );
1638 futures.push(
1639 async {
1640 recv1.await.unwrap();
1641 let _t = fs
1643 .clone()
1644 .new_transaction(
1645 lock_keys![LockKey::object_attribute(2, 2, 3)],
1646 Options::default(),
1647 )
1648 .await
1649 .expect("new_transaction failed");
1650 send2.send(()).unwrap();
1652 }
1653 .boxed(),
1654 );
1655 futures.push(
1656 async {
1657 recv3.await.unwrap();
1659 let _t = fs
1660 .clone()
1661 .new_transaction(
1662 lock_keys![LockKey::object_attribute(1, 2, 3)],
1663 Options::default(),
1664 )
1665 .await;
1666 *done.lock() = true;
1667 }
1668 .boxed(),
1669 );
1670 while let Some(()) = futures.next().await {}
1671 }
1672
1673 #[fuchsia::test]
1674 async fn test_read_lock_after_write_lock() {
1675 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1676 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1677 let (send1, recv1) = channel();
1678 let (send2, recv2) = channel();
1679 let done = Mutex::new(false);
1680 join!(
1681 async {
1682 let t = fs
1683 .clone()
1684 .new_transaction(
1685 lock_keys![LockKey::object_attribute(1, 2, 3)],
1686 Options::default(),
1687 )
1688 .await
1689 .expect("new_transaction failed");
1690 send1.send(()).unwrap(); recv2.await.unwrap();
1692 t.commit().await.expect("commit failed");
1693 *done.lock() = true;
1694 },
1695 async {
1696 recv1.await.unwrap();
1697 let _guard = fs
1699 .lock_manager()
1700 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1701 .await;
1702 send2.send(()).unwrap();
1704 fasync::Timer::new(Duration::from_millis(100)).await;
1707 assert!(!*done.lock());
1708 },
1709 );
1710 }
1711
1712 #[fuchsia::test]
1713 async fn test_write_lock_after_read_lock() {
1714 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1715 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1716 let (send1, recv1) = channel();
1717 let (send2, recv2) = channel();
1718 let done = Mutex::new(false);
1719 join!(
1720 async {
1721 let _guard = fs
1723 .lock_manager()
1724 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1725 .await;
1726 send1.send(()).unwrap();
1728 recv2.await.unwrap();
1729 fasync::Timer::new(Duration::from_millis(100)).await;
1732 assert!(!*done.lock());
1733 },
1734 async {
1735 recv1.await.unwrap();
1736 let t = fs
1737 .clone()
1738 .new_transaction(
1739 lock_keys![LockKey::object_attribute(1, 2, 3)],
1740 Options::default(),
1741 )
1742 .await
1743 .expect("new_transaction failed");
1744 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1746 *done.lock() = true;
1747 },
1748 );
1749 }
1750
1751 #[fuchsia::test]
1752 async fn test_drop_uncommitted_transaction() {
1753 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1754 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1755 let key = lock_keys![LockKey::object(1, 1)];
1756
1757 {
1759 let _write_lock = fs
1760 .clone()
1761 .new_transaction(key.clone(), Options::default())
1762 .await
1763 .expect("new_transaction failed");
1764 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1765 }
1766 {
1768 let _write_lock = fs
1769 .clone()
1770 .new_transaction(key.clone(), Options::default())
1771 .await
1772 .expect("new_transaction failed");
1773 }
1774 fs.clone()
1776 .new_transaction(key.clone(), Options::default())
1777 .await
1778 .expect("new_transaction failed");
1779 }
1780
1781 #[fuchsia::test]
1782 async fn test_drop_waiting_write_lock() {
1783 let manager = LockManager::new();
1784 let keys = lock_keys![LockKey::object(1, 1)];
1785 {
1786 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1787 if let Poll::Ready(_) =
1788 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1789 {
1790 assert!(false);
1791 }
1792 }
1793 let _ = manager.lock(keys, LockState::WriteLock).await;
1794 }
1795
1796 #[fuchsia::test]
1797 async fn test_write_lock_blocks_everything() {
1798 let manager = LockManager::new();
1799 let keys = lock_keys![LockKey::object(1, 1)];
1800 {
1801 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1802 if let Poll::Ready(_) =
1803 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1804 {
1805 assert!(false);
1806 }
1807 if let Poll::Ready(_) =
1808 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1809 {
1810 assert!(false);
1811 }
1812 }
1813 {
1814 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1815 }
1816 {
1817 let _guard = manager.lock(keys, LockState::ReadLock).await;
1818 }
1819 }
1820
1821 #[fuchsia::test]
1822 async fn test_downgrade_locks() {
1823 let manager = LockManager::new();
1824 let keys = lock_keys![LockKey::object(1, 1)];
1825 let _guard = manager.txn_lock(keys.clone()).await;
1826 manager.commit_prepare_keys(&keys).await;
1827
1828 let mut read_lock: FuturesUnordered<_> =
1830 std::iter::once(manager.read_lock(keys.clone())).collect();
1831
1832 assert!(futures::poll!(read_lock.next()).is_pending());
1834
1835 manager.downgrade_locks(&keys);
1836
1837 assert!(futures::poll!(read_lock.next()).is_ready());
1839 }
1840
1841 #[fuchsia::test]
1842 async fn test_dropped_write_lock_wakes() {
1843 let manager = LockManager::new();
1844 let keys = lock_keys![LockKey::object(1, 1)];
1845 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1846 let mut read_lock = FuturesUnordered::new();
1847 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1848
1849 {
1850 let write_lock = manager.lock(keys, LockState::WriteLock);
1851 pin_mut!(write_lock);
1852
1853 assert!(futures::poll!(write_lock).is_pending());
1855
1856 assert!(futures::poll!(read_lock.next()).is_pending());
1858 }
1859
1860 assert!(futures::poll!(read_lock.next()).is_ready());
1862 }
1863
1864 #[fuchsia::test]
1865 async fn test_drop_upgrade() {
1866 let manager = LockManager::new();
1867 let keys = lock_keys![LockKey::object(1, 1)];
1868 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1869
1870 {
1871 let commit_prepare = manager.commit_prepare_keys(&keys);
1872 pin_mut!(commit_prepare);
1873 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1874 assert!(futures::poll!(commit_prepare).is_pending());
1875
1876 }
1879
1880 manager.commit_prepare_keys(&keys).await;
1882 }
1883
1884 #[fasync::run_singlethreaded(test)]
1885 async fn test_woken_upgrade_blocks_reads() {
1886 let manager = LockManager::new();
1887 let keys = lock_keys![LockKey::object(1, 1)];
1888 let guard = manager.lock(keys.clone(), LockState::Locked).await;
1890
1891 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1893
1894 let commit_prepare = manager.commit_prepare_keys(&keys);
1896 pin_mut!(commit_prepare);
1897 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1898
1899 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1901 pin_mut!(read2);
1902 assert!(futures::poll!(read2.as_mut()).is_pending());
1903
1904 std::mem::drop(read1);
1906 assert!(futures::poll!(commit_prepare).is_ready());
1907
1908 assert!(futures::poll!(read2.as_mut()).is_pending());
1910
1911 std::mem::drop(guard);
1913 assert!(futures::poll!(read2).is_ready());
1914 }
1915
1916 static LOCK_KEY_1: LockKey = LockKey::flush(1);
1917 static LOCK_KEY_2: LockKey = LockKey::flush(2);
1918 static LOCK_KEY_3: LockKey = LockKey::flush(3);
1919
1920 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1922 match (value, expected) {
1923 (LockKeys::None, LockKeys::None) => {}
1924 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
1925 if key1 != key2 {
1926 panic!("{key1:?} != {key2:?}");
1927 }
1928 }
1929 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
1930 if vec1 != vec2 {
1931 panic!("{vec1:?} != {vec2:?}");
1932 }
1933 if vec1.capacity() != vec2.capacity() {
1934 panic!(
1935 "LockKeys have different capacity: {} != {}",
1936 vec1.capacity(),
1937 vec2.capacity()
1938 );
1939 }
1940 }
1941 (_, _) => panic!("{value:?} != {expected:?}"),
1942 }
1943 }
1944
1945 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
1947 let value: Vec<_> = value.iter().collect();
1948 let expected: Vec<_> = expected.iter().collect();
1949 assert_eq!(value, expected);
1950 }
1951
1952 #[test]
1953 fn test_lock_keys_macro() {
1954 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
1955 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
1956 assert_lock_keys_equal(
1957 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
1958 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
1959 );
1960 }
1961
1962 #[test]
1963 fn test_lock_keys_with_capacity() {
1964 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
1965 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
1966 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
1967 }
1968
1969 #[test]
1970 fn test_lock_keys_len() {
1971 assert_eq!(lock_keys![].len(), 0);
1972 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
1973 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
1974 }
1975
1976 #[test]
1977 fn test_lock_keys_contains() {
1978 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
1979 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
1980 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
1981 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
1982 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
1983 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
1984 }
1985
1986 #[test]
1987 fn test_lock_keys_push() {
1988 let mut keys = lock_keys![];
1989 keys.push(LOCK_KEY_1);
1990 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
1991 keys.push(LOCK_KEY_2);
1992 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
1993 keys.push(LOCK_KEY_3);
1994 assert_lock_keys_equivalent(
1995 &keys,
1996 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
1997 );
1998 }
1999
2000 #[test]
2001 fn test_lock_keys_sort_unstable() {
2002 let mut keys = lock_keys![];
2003 keys.sort_unstable();
2004 assert_lock_keys_equal(&keys, &lock_keys![]);
2005
2006 let mut keys = lock_keys![LOCK_KEY_1];
2007 keys.sort_unstable();
2008 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2009
2010 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2011 keys.sort_unstable();
2012 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2013 }
2014
2015 #[test]
2016 fn test_lock_keys_dedup() {
2017 let mut keys = lock_keys![];
2018 keys.dedup();
2019 assert_lock_keys_equal(&keys, &lock_keys![]);
2020
2021 let mut keys = lock_keys![LOCK_KEY_1];
2022 keys.dedup();
2023 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2024
2025 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2026 keys.dedup();
2027 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2028 }
2029
2030 #[test]
2031 fn test_lock_keys_truncate() {
2032 let mut keys = lock_keys![];
2033 keys.truncate(5);
2034 assert_lock_keys_equal(&keys, &lock_keys![]);
2035 keys.truncate(0);
2036 assert_lock_keys_equal(&keys, &lock_keys![]);
2037
2038 let mut keys = lock_keys![LOCK_KEY_1];
2039 keys.truncate(5);
2040 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2041 keys.truncate(0);
2042 assert_lock_keys_equal(&keys, &lock_keys![]);
2043
2044 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2045 keys.truncate(5);
2046 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2047 keys.truncate(1);
2048 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2050 }
2051
2052 #[test]
2053 fn test_lock_keys_iter() {
2054 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2055
2056 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2057
2058 assert_eq!(
2059 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2060 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2061 );
2062 }
2063}