1use crate::errors::FxfsError;
6use crate::lsm_tree::types::{ItemRef, LayerIterator};
7use crate::lsm_tree::Query;
8use crate::object_store::transaction::{lock_keys, LockKey, Mutation, Options};
9use crate::object_store::{
10 ObjectKey, ObjectKeyData, ObjectKind, ObjectStore, ObjectValue, ProjectProperty,
11};
12use anyhow::{ensure, Error};
13
14impl ObjectStore {
15 pub async fn set_project_limit(
18 &self,
19 project_id: u64,
20 bytes: u64,
21 nodes: u64,
22 ) -> Result<(), Error> {
23 ensure!(project_id != 0, FxfsError::OutOfRange);
24 let root_id = self.root_directory_object_id();
25 let mut transaction = self
26 .filesystem()
27 .new_transaction(
28 lock_keys![LockKey::ProjectId {
29 store_object_id: self.store_object_id,
30 project_id
31 }],
32 Options::default(),
33 )
34 .await?;
35 transaction.add(
36 self.store_object_id,
37 Mutation::replace_or_insert_object(
38 ObjectKey::project_limit(root_id, project_id),
39 ObjectValue::BytesAndNodes {
40 bytes: bytes.try_into().map_err(|_| FxfsError::TooBig)?,
41 nodes: nodes.try_into().map_err(|_| FxfsError::TooBig)?,
42 },
43 ),
44 );
45 transaction.commit().await?;
46 Ok(())
47 }
48
49 pub async fn clear_project_limit(&self, project_id: u64) -> Result<(), Error> {
52 let root_id = self.root_directory_object_id();
53 let mut transaction = self
54 .filesystem()
55 .new_transaction(
56 lock_keys![LockKey::ProjectId {
57 store_object_id: self.store_object_id,
58 project_id
59 }],
60 Options::default(),
61 )
62 .await?;
63 transaction.add(
64 self.store_object_id,
65 Mutation::replace_or_insert_object(
66 ObjectKey::project_limit(root_id, project_id),
67 ObjectValue::None,
68 ),
69 );
70 transaction.commit().await?;
71 Ok(())
72 }
73
74 pub async fn set_project_for_node(&self, node_id: u64, project_id: u64) -> Result<(), Error> {
76 ensure!(project_id != 0, FxfsError::OutOfRange);
77 let root_id = self.root_directory_object_id();
78 let mut transaction = self
79 .filesystem()
80 .new_transaction(
81 lock_keys![LockKey::object(self.store_object_id, node_id)],
82 Options::default(),
83 )
84 .await?;
85
86 let object_key = ObjectKey::object(node_id);
87 let (kind, mut attributes) =
88 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
89 ObjectValue::Object { kind, attributes } => (kind, attributes),
90 _ => return Err(FxfsError::Inconsistent.into()),
91 };
92 match kind {
94 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
95 ObjectKind::Symlink { .. } | ObjectKind::EncryptedSymlink { .. } => {
98 return Err(FxfsError::NotSupported.into())
99 }
100 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
101 }
102 let storage_size = attributes.allocated_size.try_into().map_err(|_| FxfsError::TooBig)?;
103 let old_project_id = attributes.project_id;
104 if old_project_id == project_id {
105 return Ok(());
106 }
107 attributes.project_id = project_id;
108
109 transaction.add(
110 self.store_object_id,
111 Mutation::replace_or_insert_object(
112 object_key,
113 ObjectValue::Object { kind, attributes },
114 ),
115 );
116 transaction.add(
117 self.store_object_id,
118 Mutation::merge_object(
119 ObjectKey::project_usage(root_id, project_id),
120 ObjectValue::BytesAndNodes { bytes: storage_size, nodes: 1 },
121 ),
122 );
123 if old_project_id != 0 {
124 transaction.add(
125 self.store_object_id,
126 Mutation::merge_object(
127 ObjectKey::project_usage(root_id, old_project_id),
128 ObjectValue::BytesAndNodes { bytes: -storage_size, nodes: -1 },
129 ),
130 );
131 }
132 transaction.commit().await?;
133 Ok(())
134 }
135
136 pub async fn get_project_for_node(&self, node_id: u64) -> Result<u64, Error> {
138 match self.tree().find(&ObjectKey::object(node_id)).await?.ok_or(FxfsError::NotFound)?.value
139 {
140 ObjectValue::Object { attributes, .. } => match attributes.project_id {
141 id => Ok(id),
142 },
143 _ => return Err(FxfsError::Inconsistent.into()),
144 }
145 }
146
147 pub async fn clear_project_for_node(&self, node_id: u64) -> Result<(), Error> {
150 let root_id = self.root_directory_object_id();
151 let mut transaction = self
152 .filesystem()
153 .new_transaction(
154 lock_keys![LockKey::object(self.store_object_id, node_id)],
155 Options::default(),
156 )
157 .await?;
158
159 let object_key = ObjectKey::object(node_id);
160 let (kind, mut attributes) =
161 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
162 ObjectValue::Object { kind, attributes } => (kind, attributes),
163 _ => return Err(FxfsError::Inconsistent.into()),
164 };
165 if attributes.project_id == 0 {
166 return Ok(());
167 }
168 match kind {
170 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
171 ObjectKind::Symlink { .. } | ObjectKind::EncryptedSymlink { .. } => {
174 return Err(FxfsError::NotSupported.into())
175 }
176 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
177 }
178 let old_project_id = attributes.project_id;
179 attributes.project_id = 0;
180 let storage_size = attributes.allocated_size;
181 transaction.add(
182 self.store_object_id,
183 Mutation::replace_or_insert_object(
184 object_key,
185 ObjectValue::Object { kind, attributes },
186 ),
187 );
188 transaction.add(
191 self.store_object_id,
192 Mutation::merge_object(
193 ObjectKey::project_usage(root_id, old_project_id),
194 ObjectValue::BytesAndNodes {
195 bytes: -(storage_size.try_into().map_err(|_| FxfsError::TooBig)?),
196 nodes: -1,
197 },
198 ),
199 );
200 transaction.commit().await?;
201 Ok(())
202 }
203
204 pub async fn list_projects(
209 &self,
210 start_id: u64,
211 max_entries: usize,
212 ) -> Result<(Vec<u64>, Option<u64>), Error> {
213 let root_dir_id = self.root_directory_object_id();
214 let layer_set = self.tree().layer_set();
215 let mut merger = layer_set.merger();
216 let mut iter = merger
217 .query(Query::FullRange(&ObjectKey::project_limit(root_dir_id, start_id)))
218 .await?;
219 let mut entries = Vec::new();
220 let mut prev_entry = 0;
221 let mut next_entry = None;
222 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
223 iter.get()
224 {
225 if *object_id != root_dir_id {
227 break;
228 }
229 match key_data {
230 ObjectKeyData::Project { project_id, .. } => {
231 if *value != ObjectValue::None && prev_entry < *project_id {
233 if entries.len() == max_entries {
234 next_entry = Some(*project_id);
235 break;
236 }
237 prev_entry = *project_id;
238 entries.push(*project_id);
239 }
240 }
241 _ => {
243 break;
244 }
245 }
246 iter.advance().await?;
247 }
248 Ok((entries, next_entry))
250 }
251
252 pub async fn project_info(
255 &self,
256 project_id: u64,
257 ) -> Result<(Option<(u64, u64)>, Option<(u64, u64)>), Error> {
258 let root_id = self.root_directory_object_id();
259 let layer_set = self.tree().layer_set();
260 let mut merger = layer_set.merger();
261 let mut iter =
262 merger.query(Query::FullRange(&ObjectKey::project_limit(root_id, project_id))).await?;
263 let mut limit = None;
264 let mut usage = None;
265 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
267 iter.get()
268 {
269 if *object_id != root_id {
271 break;
272 }
273 if let (
274 ObjectKeyData::Project { project_id: found_project_id, property },
275 ObjectValue::BytesAndNodes { bytes, nodes },
276 ) = (key_data, value)
277 {
278 if *found_project_id != project_id {
280 break;
281 }
282 let raw_value: (u64, u64) = (
283 (*bytes).try_into().map_err(|_| FxfsError::Inconsistent)?,
285 (*nodes).try_into().map_err(|_| FxfsError::Inconsistent)?,
286 );
287 match property {
288 ProjectProperty::Limit => limit = Some(raw_value),
289 ProjectProperty::Usage => usage = Some(raw_value),
290 }
291 } else {
292 break;
293 }
294 iter.advance().await?;
295 }
296 Ok((limit, usage))
297 }
298}
299
300