1#![deny(missing_docs)]
6
7use fidl::endpoints::ClientEnd;
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::execution_scope::ExecutionScope;
16use vfs::file::StreamIoConnection;
17use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt};
18use zx::{self as zx, Status};
19use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
20
21pub mod mock;
22pub use mock::Mock;
23
24#[derive(Debug, Error)]
25#[allow(missing_docs)]
26pub enum BlobStatusError {
27 #[error("this client was not created with a blob creator so it cannot write blobs")]
28 WritingNotConfigured,
29
30 #[error("the fidl call returned an unexpected error")]
31 NeedsOverwrite(#[source] Status),
32}
33
34#[derive(Debug, Error)]
36#[allow(missing_docs)]
37pub enum BlobfsError {
38 #[error("while opening blobfs dir")]
39 OpenDir(#[from] fuchsia_fs::node::OpenError),
40
41 #[error("while cloning the blobfs dir")]
42 CloneDir(#[from] fuchsia_fs::node::CloneError),
43
44 #[error("while listing blobfs dir")]
45 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
46
47 #[error("while deleting blob")]
48 Unlink(#[source] Status),
49
50 #[error("while sync'ing")]
51 Sync(#[source] Status),
52
53 #[error("while parsing blob merkle hash")]
54 ParseHash(#[from] ParseHashError),
55
56 #[error("FIDL error")]
57 Fidl(#[from] fidl::Error),
58
59 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
60 ConnectToBlobCreator(#[source] anyhow::Error),
61
62 #[error("while connecting to fuchsia.fxfs/BlobReader")]
63 ConnectToBlobReader(#[source] anyhow::Error),
64
65 #[error("while setting the VmexResource")]
66 InitVmexResource(#[source] anyhow::Error),
67
68 #[error("directory operation requested but blobfs directory was not configured")]
69 DirectoryNotConfigured,
70
71 #[error("while checking NeedsOverwrite for blob status")]
72 BlobStatus(BlobStatusError),
73}
74
75#[derive(Debug, Error)]
77#[allow(missing_docs)]
78pub enum CreateError {
79 #[error("the blob already exists or is being concurrently written")]
80 AlreadyExists,
81
82 #[error("while creating the blob")]
83 Io(#[source] fuchsia_fs::node::OpenError),
84
85 #[error("while converting the proxy into a client end")]
86 ConvertToClientEnd,
87
88 #[error("FIDL error")]
89 Fidl(#[from] fidl::Error),
90
91 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
92 BlobCreator(ffxfs::CreateBlobError),
93
94 #[error("this client was not created with a blob creator so it cannot write blobs")]
95 WritingNotConfigured,
96}
97
98pub enum BlobStatus {
100 UpToDate,
102
103 NeedsOverwrite,
105
106 Absent,
108}
109
110impl From<ffxfs::CreateBlobError> for CreateError {
111 fn from(e: ffxfs::CreateBlobError) -> Self {
112 match e {
113 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
114 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
115 }
116 }
117}
118
119#[derive(Default)]
121pub struct ClientBuilder {
122 readable: bool,
123 writable: bool,
124 executable: bool,
125 creator: bool,
126}
127
128impl ClientBuilder {
129 pub async fn build(self) -> Result<Client, BlobfsError> {
133 let mut flags = fio::Flags::empty();
134 if self.readable {
135 flags |= fio::PERM_READABLE
136 }
137 if self.writable {
138 flags |= fio::PERM_WRITABLE
139 }
140 if self.executable {
141 flags |= fio::PERM_EXECUTABLE
142 }
143
144 let dir = if !flags.is_empty() {
145 Some(fuchsia_fs::directory::open_in_namespace("/blob", flags)?)
146 } else {
147 None
148 };
149
150 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
151 fidl_fuchsia_kernel::VmexResourceMarker,
152 >() && let Ok(vmex) = client.get().await
153 {
154 info!("Got vmex resource");
155 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
156 }
157 let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
158 .map_err(BlobfsError::ConnectToBlobReader)?;
159 let creator = if self.writable || self.creator {
160 Some(
161 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
162 .map_err(BlobfsError::ConnectToBlobCreator)?,
163 )
164 } else {
165 None
166 };
167
168 Ok(Client { dir, creator, reader })
169 }
170
171 pub fn readable(self) -> Self {
174 Self { readable: true, ..self }
175 }
176
177 pub fn writable(self) -> Self {
181 Self { writable: true, ..self }
182 }
183
184 pub fn executable(self) -> Self {
187 Self { executable: true, ..self }
188 }
189
190 pub fn creator(self) -> Self {
193 Self { creator: true, ..self }
194 }
195}
196
197impl Client {
198 pub fn builder() -> ClientBuilder {
200 Default::default()
201 }
202}
203#[derive(Debug, Clone)]
205pub struct Client {
206 dir: Option<fio::DirectoryProxy>,
207 creator: Option<ffxfs::BlobCreatorProxy>,
208 reader: ffxfs::BlobReaderProxy,
209}
210
211impl Client {
212 pub fn new(
216 dir: fio::DirectoryProxy,
217 creator: Option<ffxfs::BlobCreatorProxy>,
218 reader: ffxfs::BlobReaderProxy,
219 vmex: Option<zx::Resource>,
220 ) -> Result<Self, anyhow::Error> {
221 if let Some(vmex) = vmex {
222 vmo_blob::init_vmex_resource(vmex)?;
223 }
224 Ok(Self { dir: Some(dir), creator, reader })
225 }
226
227 pub fn new_test() -> (
234 Self,
235 fio::DirectoryRequestStream,
236 ffxfs::BlobReaderRequestStream,
237 ffxfs::BlobCreatorRequestStream,
238 ) {
239 let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240 let (reader, reader_stream) =
241 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
242 let (creator, creator_stream) =
243 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
244
245 (
246 Self { dir: Some(dir), creator: Some(creator), reader },
247 dir_stream,
248 reader_stream,
249 creator_stream,
250 )
251 }
252
253 pub fn new_mock() -> (Self, mock::Mock) {
260 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
261 let (reader, reader_stream) =
262 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
263 let (creator, creator_stream) =
264 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
265
266 (
267 Self { dir: Some(dir), creator: Some(creator), reader },
268 mock::Mock { stream, reader_stream, creator_stream },
269 )
270 }
271
272 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
274 self.reader
275 .get_vmo(hash)
276 .await
277 .map_err(GetBlobVmoError::Fidl)?
278 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
279 }
280
281 pub fn open_blob_for_read(
284 &self,
285 blob: &Hash,
286 flags: fio::Flags,
287 scope: ExecutionScope,
288 object_request: ObjectRequestRef<'_>,
289 ) -> Result<(), zx::Status> {
290 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
291 return Err(zx::Status::ACCESS_DENIED);
292 }
293 if flags.creation_mode() != vfs::CreationMode::Never {
294 return Err(zx::Status::NOT_SUPPORTED);
295 }
296 let object_request = object_request.take();
298 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
299 Ok(())
300 }
301
302 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
304 let dir = self.dir.as_ref().ok_or(BlobfsError::DirectoryNotConfigured)?;
310 let private_connection = fuchsia_fs::directory::clone(dir)?;
311 fuchsia_fs::directory::readdir(&private_connection)
312 .await
313 .map_err(BlobfsError::ReadDir)?
314 .into_iter()
315 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
316 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
317 .collect()
318 }
319
320 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
322 let dir = self.dir.as_ref().ok_or(BlobfsError::DirectoryNotConfigured)?;
323 dir.unlink(&blob.to_string(), &fio::UnlinkOptions::default())
324 .await?
325 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
326 }
327
328 pub async fn open_blob_for_write(
330 &self,
331 blob: &Hash,
332 allow_existing: bool,
333 ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
334 let Some(creator) = &self.creator else {
335 return Err(CreateError::WritingNotConfigured);
336 };
337 Ok(creator.create(blob, allow_existing).await??)
338 }
339
340 pub async fn blob_present_and_up_to_date(&self, blob: &Hash) -> bool {
342 matches!(
345 self.creator.as_ref().expect("Missing BlobCreator access").needs_overwrite(blob).await,
346 Ok(Ok(false))
347 )
348 }
349
350 pub async fn blob_status(&self, blob: &Hash) -> Result<BlobStatus, BlobfsError> {
352 let Some(creator) = &self.creator else {
353 return Err(BlobfsError::BlobStatus(BlobStatusError::WritingNotConfigured));
354 };
355 match creator.needs_overwrite(blob).await? {
356 Ok(true) => Ok(BlobStatus::NeedsOverwrite),
357 Ok(false) => Ok(BlobStatus::UpToDate),
358 Err(status) if status == Status::NOT_FOUND.into_raw() => Ok(BlobStatus::Absent),
359 Err(s) => {
360 Err(BlobfsError::BlobStatus(BlobStatusError::NeedsOverwrite(Status::from_raw(s))))
361 }
362 }
363 }
364
365 pub async fn filter_to_missing_blobs(
376 &self,
377 candidates: impl IntoIterator<Item = Hash>,
378 ) -> HashSet<Hash> {
379 stream::iter(candidates)
387 .map(move |blob| async move {
388 if self.blob_present_and_up_to_date(&blob).await { None } else { Some(blob) }
389 })
390 .buffer_unordered(10)
393 .filter_map(|blob| async move { blob })
394 .collect()
395 .await
396 }
397
398 pub async fn sync(&self) -> Result<(), BlobfsError> {
400 let dir = self.dir.as_ref().ok_or(BlobfsError::DirectoryNotConfigured)?;
401 dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
402 }
403}
404
405fn open_blob_with_reader<P: ProtocolsExt + Send>(
408 reader: ffxfs::BlobReaderProxy,
409 blob_hash: Hash,
410 scope: ExecutionScope,
411 protocols: P,
412 object_request: ObjectRequest,
413) {
414 scope.clone().spawn(object_request.handle_async(async move |object_request| {
415 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
416 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
417 error!("Blob reader channel closed: {:?}", status);
418 status
419 } else {
420 error!("Transport error on get_vmo: {:?}", fidl_error);
421 zx::Status::INTERNAL
422 }
423 })?;
424 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
425 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
426 object_request
427 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
428 .await
429 }));
430}
431
432#[derive(thiserror::Error, Debug)]
433#[allow(missing_docs)]
434pub enum GetBlobVmoError {
435 #[error("getting the vmo")]
436 GetVmo(#[source] Status),
437
438 #[error("opening the blob")]
439 OpenBlob(#[source] fuchsia_fs::node::OpenError),
440
441 #[error("making a fidl request")]
442 Fidl(#[source] fidl::Error),
443}
444
445#[cfg(test)]
446impl Client {
447 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
458 Self::new(
459 blobfs.root_dir_proxy().unwrap(),
460 Some(blobfs.blob_creator_proxy().unwrap()),
461 blobfs.blob_reader_proxy().unwrap(),
462 None,
463 )
464 .unwrap()
465 }
466}
467
468#[cfg(test)]
469#[allow(clippy::bool_assert_comparison)]
470mod tests {
471 use super::*;
472 use assert_matches::assert_matches;
473 use blobfs_ramdisk::BlobfsRamdisk;
474 use fuchsia_async as fasync;
475 use futures::stream::TryStreamExt as _;
476 use std::sync::Arc;
477 use test_case::test_case;
478
479 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
480 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
481 #[fuchsia::test]
482 async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
483 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
484 let client = Client::for_ramdisk(&blobfs);
485
486 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
487 blobfs.stop().await.unwrap();
488 }
489
490 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
491 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
492 #[fuchsia::test]
493 async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
494 let blobfs = BlobfsRamdisk::builder()
495 .implementation(blob_impl)
496 .with_blob(&b"blob 1"[..])
497 .with_blob(&b"blob 2"[..])
498 .start()
499 .await
500 .unwrap();
501 let client = Client::for_ramdisk(&blobfs);
502
503 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
504 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
505 blobfs.stop().await.unwrap();
506 }
507
508 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
509 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
510 #[fuchsia::test]
511 async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
512 let blobfs = BlobfsRamdisk::builder()
513 .implementation(blob_impl)
514 .with_blob(&b"blob 1"[..])
515 .with_blob(&b"blob 2"[..])
516 .start()
517 .await
518 .unwrap();
519 let client = Client::for_ramdisk(&blobfs);
520
521 let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
522 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
523
524 let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
525 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
526 blobfs.stop().await.unwrap();
527 }
528
529 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
530 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
531 #[fuchsia::test]
532 async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
533 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
534 let client = Client::for_ramdisk(&blobfs);
535 let blob_merkle = Hash::from([1; 32]);
536
537 assert_matches!(
538 client.delete_blob(&blob_merkle).await,
539 Err(BlobfsError::Unlink(Status::NOT_FOUND))
540 );
541 blobfs.stop().await.unwrap();
542 }
543
544 #[fuchsia::test]
545 async fn delete_blob_mock() {
546 let (client, mut stream, _, _) = Client::new_test();
547 let blob_merkle = Hash::from([1; 32]);
548 fasync::Task::spawn(async move {
549 match stream.try_next().await.unwrap().unwrap() {
550 fio::DirectoryRequest::Unlink { name, responder, .. } => {
551 assert_eq!(name, blob_merkle.to_string());
552 responder.send(Ok(())).unwrap();
553 }
554 other => panic!("unexpected request: {other:?}"),
555 }
556 })
557 .detach();
558
559 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
560 }
561
562 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
563 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
564 #[fuchsia::test]
565 async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
566 let blobfs = BlobfsRamdisk::builder()
567 .implementation(blob_impl)
568 .with_blob(&b"blob 1"[..])
569 .start()
570 .await
571 .unwrap();
572 let client = Client::for_ramdisk(&blobfs);
573
574 assert!(
575 client.blob_present_and_up_to_date(&fuchsia_merkle::root_from_slice(b"blob 1")).await
576 );
577 assert!(!client.blob_present_and_up_to_date(&Hash::from([1; 32])).await);
578
579 blobfs.stop().await.unwrap();
580 }
581
582 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
583 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
584 #[fuchsia::test]
585 async fn has_blob_return_false_if_blob_is_partially_written(
586 blob_impl: blobfs_ramdisk::Implementation,
587 ) {
588 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
589 let client = Client::for_ramdisk(&blobfs);
590
591 let content = &[3; 1024];
592 let hash = fuchsia_merkle::root_from_slice(content);
593 let delivery_content =
594 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
595
596 let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
597 assert!(!client.blob_present_and_up_to_date(&hash).await);
598
599 let n = delivery_content.len();
600 let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
601 assert!(!client.blob_present_and_up_to_date(&hash).await);
602
603 let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
604 let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
605 assert!(!client.blob_present_and_up_to_date(&hash).await);
606
607 let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
608 let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
609 assert!(client.blob_present_and_up_to_date(&hash).await);
610
611 blobfs.stop().await.unwrap();
612 }
613
614 async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
615 let hash = fuchsia_merkle::root_from_slice(content);
616 let delivery_content =
617 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
618 let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
619 let vmo = writer
620 .get_vmo(delivery_content.len().try_into().unwrap())
621 .await
622 .expect("a")
623 .map_err(zx::Status::from_raw)
624 .expect("b");
625 let () = vmo.write(&delivery_content, 0).unwrap();
626 let () =
627 writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
628 hash
629 }
630
631 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
632 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
633 #[fuchsia::test]
634 async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
635 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
636 let client = Client::for_ramdisk(&blobfs);
637
638 let missing_hash0 = Hash::from([0; 32]);
639 let missing_hash1 = Hash::from([1; 32]);
640
641 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
642 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
643
644 assert_eq!(
645 client
646 .filter_to_missing_blobs([
647 missing_hash0,
648 missing_hash1,
649 present_blob0,
650 present_blob1
651 ])
652 .await,
653 HashSet::from([missing_hash0, missing_hash1])
654 );
655
656 blobfs.stop().await.unwrap();
657 }
658
659 #[fuchsia::test]
660 async fn sync() {
661 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
662 let counter_clone = Arc::clone(&counter);
663 let (client, mut stream, _, _) = Client::new_test();
664 fasync::Task::spawn(async move {
665 match stream.try_next().await.unwrap().unwrap() {
666 fio::DirectoryRequest::Sync { responder } => {
667 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
668 responder.send(Ok(())).unwrap();
669 }
670 other => panic!("unexpected request: {other:?}"),
671 }
672 })
673 .detach();
674
675 assert_matches!(client.sync().await, Ok(()));
676 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
677 }
678
679 #[fuchsia::test]
680 async fn open_blob_for_write_maps_already_exists() {
681 let (blob_creator, mut blob_creator_stream) =
682 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
683 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
684
685 let client = Client::new(
686 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
687 Some(blob_creator),
688 blob_reader,
689 None,
690 )
691 .unwrap();
692
693 fuchsia_async::Task::spawn(async move {
694 match blob_creator_stream.next().await.unwrap().unwrap() {
695 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
696 assert_eq!(hash, [0; 32]);
697 assert!(!allow_existing);
698 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
699 }
700 ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
701 unreachable!("This code path is not yet exercised.");
702 }
703 }
704 })
705 .detach();
706
707 assert_matches!(
708 client.open_blob_for_write(&[0; 32].into(), false).await,
709 Err(CreateError::AlreadyExists)
710 );
711 }
712
713 #[fuchsia::test]
714 async fn concurrent_list_known_blobs_all_return_full_contents() {
715 use futures::StreamExt;
716 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
717 let client = Client::for_ramdisk(&blobfs);
718
719 for i in 0..256u16 {
726 let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
727 }
728
729 let () = futures::stream::iter(0..100)
730 .for_each_concurrent(None, |_| async {
731 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
732 })
733 .await;
734 }
735}