1#![deny(missing_docs)]
6#![allow(clippy::let_unit_value)]
7
8use anyhow::{Context as _, Error, anyhow};
11use delivery_blob::{CompressionMode, Type1Blob};
12use fidl::endpoints::ClientEnd;
13use fidl_fuchsia_fs_startup::{CreateOptions, MountOptions};
14use fuchsia_merkle::Hash;
15use std::borrow::Cow;
16use std::collections::BTreeSet;
17use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
18
19const RAMDISK_BLOCK_SIZE: u64 = 512;
20static FXFS_BLOB_VOLUME_NAME: &str = "blob";
21
22#[cfg(test)]
23mod test;
24
25#[derive(Debug, Clone)]
27pub struct BlobInfo {
28 merkle: Hash,
29 contents: Cow<'static, [u8]>,
30}
31
32impl<B> From<B> for BlobInfo
33where
34 B: Into<Cow<'static, [u8]>>,
35{
36 fn from(bytes: B) -> Self {
37 let bytes = bytes.into();
38 Self { merkle: fuchsia_merkle::root_from_slice(&bytes), contents: bytes }
39 }
40}
41
42pub struct BlobfsRamdiskBuilder {
44 ramdisk: Option<SuppliedRamdisk>,
45 blobs: Vec<BlobInfo>,
46 implementation: Implementation,
47}
48
49enum SuppliedRamdisk {
50 Formatted(FormattedRamdisk),
51 Unformatted(Ramdisk),
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum Implementation {
57 CppBlobfs,
59 Fxblob,
61}
62
63impl Implementation {
64 pub fn from_env() -> Self {
70 match env!("FXFS_BLOB") {
71 "true" => Self::Fxblob,
72 "false" => Self::CppBlobfs,
73 other => panic!("unexpected value for env var 'FXFS_BLOB': {other}"),
74 }
75 }
76}
77
78impl BlobfsRamdiskBuilder {
79 fn new() -> Self {
80 Self { ramdisk: None, blobs: vec![], implementation: Implementation::CppBlobfs }
81 }
82
83 pub fn formatted_ramdisk(self, ramdisk: FormattedRamdisk) -> Self {
85 Self { ramdisk: Some(SuppliedRamdisk::Formatted(ramdisk)), ..self }
86 }
87
88 pub fn ramdisk(self, ramdisk: Ramdisk) -> Self {
90 Self { ramdisk: Some(SuppliedRamdisk::Unformatted(ramdisk)), ..self }
91 }
92
93 pub fn with_blob(mut self, blob: impl Into<BlobInfo>) -> Self {
95 self.blobs.push(blob.into());
96 self
97 }
98
99 pub fn cpp_blobfs(self) -> Self {
102 Self { implementation: Implementation::CppBlobfs, ..self }
103 }
104
105 pub fn fxblob(self) -> Self {
108 Self { implementation: Implementation::Fxblob, ..self }
109 }
110
111 pub fn implementation(self, implementation: Implementation) -> Self {
113 Self { implementation, ..self }
114 }
115
116 pub fn impl_from_env(self) -> Self {
118 self.implementation(Implementation::from_env())
119 }
120
121 pub async fn start(self) -> Result<BlobfsRamdisk, Error> {
123 let Self { ramdisk, blobs, implementation } = self;
124 let (ramdisk, needs_format) = match ramdisk {
125 Some(SuppliedRamdisk::Formatted(FormattedRamdisk(ramdisk))) => (ramdisk, false),
126 Some(SuppliedRamdisk::Unformatted(ramdisk)) => (ramdisk, true),
127 None => (Ramdisk::start().await.context("creating backing ramdisk for blobfs")?, true),
128 };
129
130 let ramdisk_controller = ramdisk.client.open_controller()?;
131
132 let mut fs = match implementation {
134 Implementation::CppBlobfs => fs_management::filesystem::Filesystem::new(
135 ramdisk_controller,
136 fs_management::Blobfs { ..fs_management::Blobfs::dynamic_child() },
137 ),
138 Implementation::Fxblob => fs_management::filesystem::Filesystem::new(
139 ramdisk_controller,
140 fs_management::Fxfs::default(),
141 ),
142 };
143 if needs_format {
144 let () = fs.format().await.context("formatting ramdisk")?;
145 }
146
147 let fs = match implementation {
148 Implementation::CppBlobfs => ServingFilesystem::SingleVolume(
149 fs.serve().await.context("serving single volume filesystem")?,
150 ),
151 Implementation::Fxblob => {
152 let fs =
153 fs.serve_multi_volume().await.context("serving multi volume filesystem")?;
154 let volume = if needs_format {
155 fs.create_volume(
156 FXFS_BLOB_VOLUME_NAME,
157 CreateOptions::default(),
158 MountOptions { as_blob: Some(true), ..MountOptions::default() },
159 )
160 .await
161 .context("creating blob volume")?
162 } else {
163 fs.open_volume(
164 FXFS_BLOB_VOLUME_NAME,
165 MountOptions { as_blob: Some(true), ..MountOptions::default() },
166 )
167 .await
168 .context("opening blob volume")?
169 };
170 ServingFilesystem::MultiVolume(fs, volume)
171 }
172 };
173
174 let blobfs = BlobfsRamdisk { backing_ramdisk: FormattedRamdisk(ramdisk), fs };
175
176 if !blobs.is_empty() {
178 let mut present_blobs = blobfs.list_blobs()?;
179
180 for blob in blobs {
181 if present_blobs.contains(&blob.merkle) {
182 continue;
183 }
184 blobfs
185 .write_blob(blob.merkle, &blob.contents)
186 .await
187 .context(format!("writing {}", blob.merkle))?;
188 present_blobs.insert(blob.merkle);
189 }
190 }
191
192 Ok(blobfs)
193 }
194}
195
196pub struct BlobfsRamdisk {
198 backing_ramdisk: FormattedRamdisk,
199 fs: ServingFilesystem,
200}
201
202enum ServingFilesystem {
207 SingleVolume(fs_management::filesystem::ServingSingleVolumeFilesystem),
208 MultiVolume(
209 fs_management::filesystem::ServingMultiVolumeFilesystem,
210 fs_management::filesystem::ServingVolume,
211 ),
212}
213
214impl ServingFilesystem {
215 async fn shutdown(self) -> Result<(), Error> {
216 match self {
217 Self::SingleVolume(fs) => fs.shutdown().await.context("shutting down single volume"),
218 Self::MultiVolume(fs, _volume) => {
219 fs.shutdown().await.context("shutting down multi volume")
220 }
221 }
222 }
223
224 fn exposed_dir(&self) -> Result<&fio::DirectoryProxy, Error> {
225 match self {
226 Self::SingleVolume(fs) => Ok(fs.exposed_dir()),
227 Self::MultiVolume(_fs, volume) => Ok(volume.exposed_dir()),
228 }
229 }
230
231 fn blob_dir_name(&self) -> &'static str {
233 match self {
234 Self::SingleVolume(_) => "blob-exec",
235 Self::MultiVolume(_, _) => "root",
236 }
237 }
238
239 fn svc_dir(&self) -> Result<fio::DirectoryProxy, Error> {
240 match self {
241 Self::SingleVolume(_) => Ok(fuchsia_fs::directory::open_directory_async(
242 self.exposed_dir()?,
243 ".",
244 fio::PERM_READABLE,
245 )
246 .context("opening svc dir")?),
247 Self::MultiVolume(_, _) => Ok(fuchsia_fs::directory::open_directory_async(
248 self.exposed_dir()?,
249 "svc",
250 fio::PERM_READABLE,
251 )
252 .context("opening svc dir")?),
253 }
254 }
255
256 fn blob_creator_proxy(&self) -> Result<ffxfs::BlobCreatorProxy, Error> {
257 fuchsia_component::client::connect_to_protocol_at_dir_root::<ffxfs::BlobCreatorMarker>(
258 &self.svc_dir()?,
259 )
260 .context("connecting to fuchsia.fxfs.BlobCreator")
261 }
262
263 fn blob_reader_proxy(&self) -> Result<ffxfs::BlobReaderProxy, Error> {
264 fuchsia_component::client::connect_to_protocol_at_dir_root::<ffxfs::BlobReaderMarker>(
265 &self.svc_dir()?,
266 )
267 .context("connecting to fuchsia.fxfs.BlobReader")
268 }
269
270 fn overwrite_configuration_proxy(
271 &self,
272 ) -> Result<fidl_fuchsia_storage_blobfs::OverwriteConfigurationProxy, Error> {
273 fuchsia_component::client::connect_to_protocol_at_dir_root::<
274 fidl_fuchsia_storage_blobfs::OverwriteConfigurationMarker,
275 >(&self.svc_dir()?)
276 .context("connecting to fuchsia.storage.blobfs.OverwriteConfiguration")
277 }
278
279 fn implementation(&self) -> Implementation {
280 match self {
281 Self::SingleVolume(_) => Implementation::CppBlobfs,
282 Self::MultiVolume(_, _) => Implementation::Fxblob,
283 }
284 }
285}
286
287impl BlobfsRamdisk {
288 pub fn builder() -> BlobfsRamdiskBuilder {
290 BlobfsRamdiskBuilder::new()
291 }
292
293 pub async fn start() -> Result<Self, Error> {
295 Self::builder().start().await
296 }
297
298 pub fn client(&self) -> blobfs::Client {
304 blobfs::Client::new(
305 self.root_dir_proxy().unwrap(),
306 Some(self.blob_creator_proxy().unwrap()),
307 self.blob_reader_proxy().unwrap(),
308 None,
309 )
310 .unwrap()
311 }
312
313 pub fn root_dir_handle(&self) -> Result<ClientEnd<fio::DirectoryMarker>, Error> {
315 let (root_clone, server_end) = zx::Channel::create();
316 self.fs.exposed_dir()?.open(
317 self.fs.blob_dir_name(),
318 fio::PERM_READABLE | fio::Flags::PERM_INHERIT_WRITE | fio::Flags::PERM_EXECUTE,
319 &Default::default(),
320 server_end,
321 )?;
322 Ok(root_clone.into())
323 }
324
325 pub fn root_dir_proxy(&self) -> Result<fio::DirectoryProxy, Error> {
327 Ok(self.root_dir_handle()?.into_proxy())
328 }
329
330 pub fn root_dir(&self) -> Result<openat::Dir, Error> {
332 use std::os::fd::{FromRawFd as _, IntoRawFd as _, OwnedFd};
333
334 let fd: OwnedFd =
335 fdio::create_fd(self.root_dir_handle()?.into()).context("failed to create fd")?;
336
337 unsafe { Ok(openat::Dir::from_raw_fd(fd.into_raw_fd())) }
342 }
343
344 pub async fn into_builder(self) -> Result<BlobfsRamdiskBuilder, Error> {
347 let implementation = self.fs.implementation();
348 let ramdisk = self.unmount().await?;
349 Ok(Self::builder().formatted_ramdisk(ramdisk).implementation(implementation))
350 }
351
352 pub async fn unmount(self) -> Result<FormattedRamdisk, Error> {
354 self.fs.shutdown().await?;
355 Ok(self.backing_ramdisk)
356 }
357
358 pub async fn stop(self) -> Result<(), Error> {
360 self.unmount().await?.stop().await
361 }
362
363 pub fn list_blobs(&self) -> Result<BTreeSet<Hash>, Error> {
365 self.root_dir()?
366 .list_dir(".")?
367 .map(|entry| {
368 Ok(entry?
369 .file_name()
370 .to_str()
371 .ok_or_else(|| anyhow!("expected valid utf-8"))?
372 .parse()?)
373 })
374 .collect()
375 }
376
377 pub async fn add_blob_from(
379 &self,
380 merkle: Hash,
381 mut source: impl std::io::Read,
382 ) -> Result<(), Error> {
383 let mut bytes = vec![];
384 source.read_to_end(&mut bytes)?;
385 self.write_blob(merkle, &bytes).await
386 }
387
388 pub async fn write_blob_with_overwrite(
392 &self,
393 merkle: Hash,
394 bytes: &[u8],
395 overwrite: bool,
396 ) -> Result<(), Error> {
397 let compressed_data = Type1Blob::generate(bytes, CompressionMode::Attempt);
398 let blob_creator = self.blob_creator_proxy()?;
399 let writer_client_end = match blob_creator.create(&merkle.into(), overwrite).await? {
400 Ok(writer_client_end) => writer_client_end,
401 Err(ffxfs::CreateBlobError::AlreadyExists) => {
402 return Ok(());
403 }
404 Err(e) => {
405 return Err(anyhow!("create blob error {:?}", e));
406 }
407 };
408 let writer = writer_client_end.into_proxy();
409 let mut blob_writer = blob_writer::BlobWriter::create(writer, compressed_data.len() as u64)
410 .await
411 .context("failed to create BlobWriter")?;
412 blob_writer.write(&compressed_data).await?;
413 Ok(())
414 }
415
416 pub async fn write_blob(&self, merkle: Hash, bytes: &[u8]) -> Result<(), Error> {
419 self.write_blob_with_overwrite(merkle, bytes, false).await
420 }
421
422 pub fn svc_dir(&self) -> Result<fio::DirectoryProxy, Error> {
426 self.fs.svc_dir()
427 }
428
429 pub fn blob_creator_proxy(&self) -> Result<ffxfs::BlobCreatorProxy, Error> {
431 self.fs.blob_creator_proxy()
432 }
433
434 pub fn blob_reader_proxy(&self) -> Result<ffxfs::BlobReaderProxy, Error> {
436 self.fs.blob_reader_proxy()
437 }
438
439 pub fn overwrite_configuration_proxy(
441 &self,
442 ) -> Result<fidl_fuchsia_storage_blobfs::OverwriteConfigurationProxy, Error> {
443 self.fs.overwrite_configuration_proxy()
444 }
445}
446
447pub struct RamdiskBuilder {
449 block_count: u64,
450}
451
452impl RamdiskBuilder {
453 fn new() -> Self {
454 Self { block_count: 1 << 20 }
455 }
456
457 pub fn block_count(mut self, block_count: u64) -> Self {
459 self.block_count = block_count;
460 self
461 }
462
463 pub async fn start(self) -> Result<Ramdisk, Error> {
465 let client = ramdevice_client::RamdiskClient::builder(RAMDISK_BLOCK_SIZE, self.block_count);
466 let client = client.build().await?;
467 Ok(Ramdisk { client })
468 }
469
470 pub async fn into_blobfs_builder(self) -> Result<BlobfsRamdiskBuilder, Error> {
472 Ok(BlobfsRamdiskBuilder::new().ramdisk(self.start().await?))
473 }
474}
475
476pub struct Ramdisk {
478 client: ramdevice_client::RamdiskClient,
479}
480
481impl Ramdisk {
484 pub fn builder() -> RamdiskBuilder {
486 RamdiskBuilder::new()
487 }
488
489 pub async fn start() -> Result<Self, Error> {
492 Self::builder().start().await
493 }
494
495 pub async fn stop(self) -> Result<(), Error> {
497 self.client.destroy().await
498 }
499}
500
501pub struct FormattedRamdisk(Ramdisk);
503
504impl std::ops::Deref for FormattedRamdisk {
506 type Target = Ramdisk;
507 fn deref(&self) -> &Self::Target {
508 &self.0
509 }
510}
511
512impl FormattedRamdisk {
513 pub async fn stop(self) -> Result<(), Error> {
515 self.0.stop().await
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use test_case::test_case;
523
524 #[test_case(Implementation::CppBlobfs; "blobfs")]
525 #[test_case(Implementation::Fxblob; "fxblob")]
526 #[fuchsia_async::run_singlethreaded(test)]
527 async fn clean_start_and_stop(implementation: Implementation) {
528 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
529
530 let proxy = blobfs.root_dir_proxy().unwrap();
531 drop(proxy);
532
533 blobfs.stop().await.unwrap();
534 }
535
536 #[test_case(Implementation::CppBlobfs; "blobfs")]
537 #[test_case(Implementation::Fxblob; "fxblob")]
538 #[fuchsia_async::run_singlethreaded(test)]
539 async fn clean_start_contains_no_blobs(implementation: Implementation) {
540 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
541
542 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::new());
543
544 blobfs.stop().await.unwrap();
545 }
546
547 #[test]
548 fn blob_info_conversions() {
549 let a = BlobInfo::from(&b"static slice"[..]);
550 let b = BlobInfo::from(b"owned vec".to_vec());
551 let c = BlobInfo::from(Cow::from(&b"cow"[..]));
552 assert_ne!(a.merkle, b.merkle);
553 assert_ne!(b.merkle, c.merkle);
554 assert_eq!(a.merkle, fuchsia_merkle::root_from_slice(b"static slice"));
555
556 let _ = BlobfsRamdisk::builder()
558 .with_blob(&b"static slice"[..])
559 .with_blob(b"owned vec".to_vec())
560 .with_blob(Cow::from(&b"cow"[..]));
561 }
562
563 #[test_case(Implementation::CppBlobfs; "blobfs")]
564 #[test_case(Implementation::Fxblob; "fxblob")]
565 #[fuchsia_async::run_singlethreaded(test)]
566 async fn with_blob_ignores_duplicates(implementation: Implementation) {
567 let blob = BlobInfo::from(&b"duplicate"[..]);
568
569 let blobfs = BlobfsRamdisk::builder()
570 .implementation(implementation)
571 .with_blob(blob.clone())
572 .with_blob(blob.clone())
573 .start()
574 .await
575 .unwrap();
576 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([blob.merkle]));
577
578 let blobfs =
579 blobfs.into_builder().await.unwrap().with_blob(blob.clone()).start().await.unwrap();
580 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([blob.merkle]));
581 }
582
583 #[test_case(Implementation::CppBlobfs; "blobfs")]
584 #[test_case(Implementation::Fxblob; "fxblob")]
585 #[fuchsia_async::run_singlethreaded(test)]
586 async fn build_with_two_blobs(implementation: Implementation) {
587 let blobfs = BlobfsRamdisk::builder()
588 .implementation(implementation)
589 .with_blob(&b"blob 1"[..])
590 .with_blob(&b"blob 2"[..])
591 .start()
592 .await
593 .unwrap();
594
595 let expected = BTreeSet::from([
596 fuchsia_merkle::root_from_slice(b"blob 1"),
597 fuchsia_merkle::root_from_slice(b"blob 2"),
598 ]);
599 assert_eq!(expected.len(), 2);
600 assert_eq!(blobfs.list_blobs().unwrap(), expected);
601
602 blobfs.stop().await.unwrap();
603 }
604
605 #[test_case(Implementation::CppBlobfs; "blobfs")]
606 #[test_case(Implementation::Fxblob; "fxblob")]
607 #[fuchsia_async::run_singlethreaded(test)]
608 async fn remount(implementation: Implementation) {
609 let blobfs = BlobfsRamdisk::builder()
610 .implementation(implementation)
611 .with_blob(&b"test"[..])
612 .start()
613 .await
614 .unwrap();
615 let blobs = blobfs.list_blobs().unwrap();
616
617 let blobfs = blobfs.into_builder().await.unwrap().start().await.unwrap();
618
619 assert_eq!(blobs, blobfs.list_blobs().unwrap());
620
621 blobfs.stop().await.unwrap();
622 }
623
624 #[test_case(Implementation::CppBlobfs; "blobfs")]
625 #[test_case(Implementation::Fxblob; "fxblob")]
626 #[fuchsia_async::run_singlethreaded(test)]
627 async fn blob_appears_in_readdir(implementation: Implementation) {
628 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
629
630 let data = b"Hello blobfs!";
631 let hello_merkle = fuchsia_merkle::root_from_slice(data);
632 blobfs.write_blob(hello_merkle, data).await.unwrap();
633 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hello_merkle]));
634
635 blobfs.stop().await.unwrap();
636 }
637
638 #[fuchsia_async::run_singlethreaded(test)]
639 async fn ramdisk_builder_sets_block_count() {
640 for block_count in [1, 2, 3, 16] {
641 let ramdisk = Ramdisk::builder().block_count(block_count).start().await.unwrap();
642 let client_end = ramdisk.client.open().unwrap();
643 let proxy = client_end.into_proxy();
644 let info = proxy.get_info().await.unwrap().unwrap();
645 assert_eq!(info.block_count, block_count);
646 }
647 }
648
649 #[test_case(Implementation::CppBlobfs; "blobfs")]
650 #[test_case(Implementation::Fxblob; "fxblob")]
651 #[fuchsia_async::run_singlethreaded(test)]
652 async fn ramdisk_into_blobfs_formats_ramdisk(implementation: Implementation) {
653 let _: BlobfsRamdisk = Ramdisk::builder()
654 .into_blobfs_builder()
655 .await
656 .unwrap()
657 .implementation(implementation)
658 .start()
659 .await
660 .unwrap();
661 }
662
663 #[test_case(Implementation::CppBlobfs; "blobfs")]
664 #[test_case(Implementation::Fxblob; "fxblob")]
665 #[fuchsia_async::run_singlethreaded(test)]
666 async fn read_and_write(implementation: Implementation) {
667 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
668
669 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([]));
670 let data = "Hello blobfs!".as_bytes();
671 let merkle = fuchsia_merkle::root_from_slice(data);
672 blobfs.write_blob(merkle, data).await.unwrap();
673
674 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([merkle]));
675
676 blobfs.stop().await.unwrap();
677 }
678
679 #[test_case(Implementation::CppBlobfs; "blobfs")]
680 #[test_case(Implementation::Fxblob; "fxblob")]
681 #[fuchsia_async::run_singlethreaded(test)]
682 async fn blob_creator_api(implementation: Implementation) {
683 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
684 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([]));
685
686 let bytes = &[1u8; 40];
687 let hash = fuchsia_merkle::root_from_slice(bytes);
688 let compressed_data = Type1Blob::generate(bytes, CompressionMode::Always);
689
690 let blob_creator = blobfs.blob_creator_proxy().unwrap();
691 let blob_writer = blob_creator.create(&hash, false).await.unwrap().unwrap();
692 let mut blob_writer =
693 blob_writer::BlobWriter::create(blob_writer.into_proxy(), compressed_data.len() as u64)
694 .await
695 .unwrap();
696 let () = blob_writer.write(&compressed_data).await.unwrap();
697
698 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hash]));
699
700 blobfs.stop().await.unwrap();
701 }
702
703 #[test_case(Implementation::CppBlobfs; "blobfs")]
704 #[test_case(Implementation::Fxblob; "fxblob")]
705 #[fuchsia_async::run_singlethreaded(test)]
706 async fn blob_reader_api(implementation: Implementation) {
707 let data = "Hello blobfs!".as_bytes();
708 let hash = fuchsia_merkle::root_from_slice(data);
709 let blobfs = BlobfsRamdisk::builder()
710 .implementation(implementation)
711 .with_blob(data)
712 .start()
713 .await
714 .unwrap();
715
716 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hash]));
717
718 let blob_reader = blobfs.blob_reader_proxy().unwrap();
719 let vmo = blob_reader.get_vmo(&hash.into()).await.unwrap().unwrap();
720 let mut buf = vec![0; vmo.get_content_size().unwrap() as usize];
721 let () = vmo.read(&mut buf, 0).unwrap();
722 assert_eq!(buf, data);
723
724 blobfs.stop().await.unwrap();
725 }
726}