1#![deny(missing_docs)]
6
7use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29 #[error("while opening blobfs dir")]
30 OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32 #[error("while cloning the blobfs dir")]
33 CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35 #[error("while listing blobfs dir")]
36 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38 #[error("while deleting blob")]
39 Unlink(#[source] Status),
40
41 #[error("while sync'ing")]
42 Sync(#[source] Status),
43
44 #[error("while parsing blob merkle hash")]
45 ParseHash(#[from] ParseHashError),
46
47 #[error("FIDL error")]
48 Fidl(#[from] fidl::Error),
49
50 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51 ConnectToBlobCreator(#[source] anyhow::Error),
52
53 #[error("while connecting to fuchsia.fxfs/BlobReader")]
54 ConnectToBlobReader(#[source] anyhow::Error),
55
56 #[error("while setting the VmexResource")]
57 InitVmexResource(#[source] anyhow::Error),
58}
59
60#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64 #[error("the blob already exists or is being concurrently written")]
65 AlreadyExists,
66
67 #[error("while creating the blob")]
68 Io(#[source] fuchsia_fs::node::OpenError),
69
70 #[error("while converting the proxy into a client end")]
71 ConvertToClientEnd,
72
73 #[error("FIDL error")]
74 Fidl(#[from] fidl::Error),
75
76 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77 BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81 fn from(e: ffxfs::CreateBlobError) -> Self {
82 match e {
83 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85 }
86 }
87}
88
89#[derive(Default)]
91pub struct ClientBuilder {
92 readable: bool,
93 writable: bool,
94 executable: bool,
95}
96
97impl ClientBuilder {
98 pub async fn build(self) -> Result<Client, BlobfsError> {
102 let mut flags = fio::Flags::empty();
103 if self.readable {
104 flags |= fio::PERM_READABLE
105 }
106 if self.writable {
107 flags |= fio::PERM_WRITABLE
108 }
109 if self.executable {
110 flags |= fio::PERM_EXECUTABLE
111 }
112 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
113 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
114 fidl_fuchsia_kernel::VmexResourceMarker,
115 >() {
116 if let Ok(vmex) = client.get().await {
117 info!("Got vmex resource");
118 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
119 }
120 }
121 let reader = Some(
122 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
123 .map_err(BlobfsError::ConnectToBlobReader)?,
124 );
125
126 let creator = if self.writable {
127 Some(
128 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
129 .map_err(BlobfsError::ConnectToBlobCreator)?,
130 )
131 } else {
132 None
133 };
134
135 Ok(Client { dir, creator, reader })
136 }
137
138 pub fn readable(self) -> Self {
141 Self { readable: true, ..self }
142 }
143
144 pub fn writable(self) -> Self {
148 Self { writable: true, ..self }
149 }
150
151 pub fn executable(self) -> Self {
154 Self { executable: true, ..self }
155 }
156}
157
158impl Client {
159 pub fn builder() -> ClientBuilder {
161 Default::default()
162 }
163}
164#[derive(Debug, Clone)]
166pub struct Client {
167 dir: fio::DirectoryProxy,
168 creator: Option<ffxfs::BlobCreatorProxy>,
169 reader: Option<ffxfs::BlobReaderProxy>,
170}
171
172impl Client {
173 pub fn new(
178 dir: fio::DirectoryProxy,
179 creator: Option<ffxfs::BlobCreatorProxy>,
180 reader: Option<ffxfs::BlobReaderProxy>,
181 vmex: Option<zx::Resource>,
182 ) -> Result<Self, anyhow::Error> {
183 if let Some(vmex) = vmex {
184 vmo_blob::init_vmex_resource(vmex)?;
185 }
186 Ok(Self { dir, creator, reader })
187 }
188
189 pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
196 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
197
198 (Self { dir, creator: None, reader: None }, stream)
199 }
200
201 pub fn new_mock() -> (Self, mock::Mock) {
208 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
209
210 (Self { dir, creator: None, reader: None }, mock::Mock { stream })
211 }
212
213 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
215 if let Some(reader) = &self.reader {
216 reader
217 .get_vmo(hash)
218 .await
219 .map_err(GetBlobVmoError::Fidl)?
220 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
221 } else {
222 let blob =
223 fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
224 .await
225 .map_err(GetBlobVmoError::OpenBlob)?;
226 blob.get_backing_memory(fio::VmoFlags::READ)
227 .await
228 .map_err(GetBlobVmoError::Fidl)?
229 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
230 }
231 }
232
233 pub fn deprecated_open_blob_for_read(
236 &self,
237 blob: &Hash,
238 flags: fio::OpenFlags,
239 scope: ExecutionScope,
240 server_end: ServerEnd<fio::NodeMarker>,
241 ) -> Result<(), fidl::Error> {
242 let describe = flags.contains(fio::OpenFlags::DESCRIBE);
243 if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
245 send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
246 return Ok(());
247 }
248 if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
250 send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
251 return Ok(());
252 }
253 if let Some(reader) = &self.reader {
255 let object_request = flags.to_object_request(server_end);
256 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
257 Ok(())
258 } else {
259 self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
260 }
261 }
262
263 pub fn open_blob_for_read(
266 &self,
267 blob: &Hash,
268 flags: fio::Flags,
269 scope: ExecutionScope,
270 object_request: ObjectRequestRef<'_>,
271 ) -> Result<(), zx::Status> {
272 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
274 return Err(zx::Status::ACCESS_DENIED);
275 }
276 if flags.creation_mode() != vfs::CreationMode::Never {
278 return Err(zx::Status::NOT_SUPPORTED);
279 }
280 let object_request = object_request.take();
282 if let Some(reader) = &self.reader {
284 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
285 } else {
286 let _: Result<(), ()> = self
287 .dir
288 .open(
289 &blob.to_string(),
290 flags,
291 &object_request.options(),
292 object_request.into_channel(),
293 )
294 .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
295 }
296 Ok(())
297 }
298
299 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
301 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
307 fuchsia_fs::directory::readdir(&private_connection)
308 .await
309 .map_err(BlobfsError::ReadDir)?
310 .into_iter()
311 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
312 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
313 .collect()
314 }
315
316 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
318 self.dir
319 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
320 .await?
321 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
322 }
323
324 pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
326 Ok(if let Some(blob_creator) = &self.creator {
327 fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
328 } else {
329 fpkg::BlobWriter::File(
330 self.open_blob_proxy_from_dir_for_write(blob)
331 .await?
332 .into_channel()
333 .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
334 .into_zx_channel()
335 .into(),
336 )
337 })
338 }
339
340 async fn open_blob_proxy_from_dir_for_write(
342 &self,
343 blob: &Hash,
344 ) -> Result<fio::FileProxy, CreateError> {
345 let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
346
347 let path = delivery_blob::delivery_blob_path(blob);
348 fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
349 fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
350 CreateError::AlreadyExists
351 }
352 other => CreateError::Io(other),
353 })
354 }
355
356 pub async fn has_blob(&self, blob: &Hash) -> bool {
364 if let Some(reader) = &self.reader {
365 matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
367 } else {
368 let file = match fuchsia_fs::directory::open_file_async(
369 &self.dir,
370 &blob.to_string(),
371 fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
372 ) {
373 Ok(file) => file,
374 Err(_) => return false,
375 };
376
377 let mut events = file.take_event_stream();
378
379 let event = match events.next().await {
380 None => return false,
381 Some(event) => match event {
382 Err(_) => return false,
383 Ok(event) => match event {
384 fio::FileEvent::OnOpen_ { s, info } => {
385 if Status::from_raw(s) != Status::OK {
386 return false;
387 }
388
389 match info {
390 Some(info) => match *info {
391 fio::NodeInfoDeprecated::File(fio::FileObject {
392 event: Some(event),
393 stream: _, }) => event,
395 _ => return false,
396 },
397 _ => return false,
398 }
399 }
400 fio::FileEvent::OnRepresentation { payload } => match payload {
401 fio::Representation::File(fio::FileInfo {
402 observer: Some(event),
403 stream: _, ..
405 }) => event,
406 _ => return false,
407 },
408 fio::FileEvent::_UnknownEvent { .. } => return false,
409 },
410 },
411 };
412
413 match event
416 .wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST)
417 .to_result()
418 {
419 Ok(_) => true,
420 Err(status) => {
421 if status != Status::TIMED_OUT {
422 warn!("blobfs: unknown error asserting blob existence: {}", status);
423 }
424 false
425 }
426 }
427 }
428 }
429
430 pub async fn filter_to_missing_blobs(
441 &self,
442 candidates: impl IntoIterator<Item = Hash>,
443 ) -> HashSet<Hash> {
444 stream::iter(candidates)
452 .map(move |blob| async move {
453 if self.has_blob(&blob).await {
454 None
455 } else {
456 Some(blob)
457 }
458 })
459 .buffer_unordered(10)
462 .filter_map(|blob| async move { blob })
463 .collect()
464 .await
465 }
466
467 pub async fn sync(&self) -> Result<(), BlobfsError> {
469 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
470 }
471}
472
473fn open_blob_with_reader<P: ProtocolsExt + Send>(
476 reader: ffxfs::BlobReaderProxy,
477 blob_hash: Hash,
478 scope: ExecutionScope,
479 protocols: P,
480 object_request: ObjectRequest,
481) {
482 scope.clone().spawn(object_request.handle_async(async move |object_request| {
483 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
484 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
485 error!("Blob reader channel closed: {:?}", status);
486 status
487 } else {
488 error!("Transport error on get_vmo: {:?}", fidl_error);
489 zx::Status::INTERNAL
490 }
491 })?;
492 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
493 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
494 object_request
495 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
496 .await
497 }));
498}
499
500#[derive(thiserror::Error, Debug)]
501#[allow(missing_docs)]
502pub enum GetBlobVmoError {
503 #[error("getting the vmo")]
504 GetVmo(#[source] Status),
505
506 #[error("opening the blob")]
507 OpenBlob(#[source] fuchsia_fs::node::OpenError),
508
509 #[error("making a fidl request")]
510 Fidl(#[source] fidl::Error),
511}
512
513#[cfg(test)]
514impl Client {
515 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
526 Self::new(
527 blobfs.root_dir_proxy().unwrap(),
528 blobfs.blob_creator_proxy().unwrap(),
529 blobfs.blob_reader_proxy().unwrap(),
530 None,
531 )
532 .unwrap()
533 }
534}
535
536#[cfg(test)]
537#[allow(clippy::bool_assert_comparison)]
538mod tests {
539 use super::*;
540 use assert_matches::assert_matches;
541 use blobfs_ramdisk::BlobfsRamdisk;
542 use fuchsia_async as fasync;
543 use futures::stream::TryStreamExt;
544 use maplit::hashset;
545 use std::io::Write as _;
546 use std::sync::Arc;
547
548 #[fasync::run_singlethreaded(test)]
549 async fn list_known_blobs_empty() {
550 let blobfs = BlobfsRamdisk::start().await.unwrap();
551 let client = Client::for_ramdisk(&blobfs);
552
553 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
554 blobfs.stop().await.unwrap();
555 }
556
557 #[fasync::run_singlethreaded(test)]
558 async fn list_known_blobs() {
559 let blobfs = BlobfsRamdisk::builder()
560 .with_blob(&b"blob 1"[..])
561 .with_blob(&b"blob 2"[..])
562 .start()
563 .await
564 .unwrap();
565 let client = Client::for_ramdisk(&blobfs);
566
567 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
568 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
569 blobfs.stop().await.unwrap();
570 }
571
572 #[fasync::run_singlethreaded(test)]
573 async fn delete_blob_and_then_list() {
574 let blobfs = BlobfsRamdisk::builder()
575 .with_blob(&b"blob 1"[..])
576 .with_blob(&b"blob 2"[..])
577 .start()
578 .await
579 .unwrap();
580 let client = Client::for_ramdisk(&blobfs);
581
582 let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
583 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
584
585 let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
586 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
587 blobfs.stop().await.unwrap();
588 }
589
590 #[fasync::run_singlethreaded(test)]
591 async fn delete_non_existing_blob() {
592 let blobfs = BlobfsRamdisk::start().await.unwrap();
593 let client = Client::for_ramdisk(&blobfs);
594 let blob_merkle = Hash::from([1; 32]);
595
596 assert_matches!(
597 client.delete_blob(&blob_merkle).await,
598 Err(BlobfsError::Unlink(Status::NOT_FOUND))
599 );
600 blobfs.stop().await.unwrap();
601 }
602
603 #[fasync::run_singlethreaded(test)]
604 async fn delete_blob_mock() {
605 let (client, mut stream) = Client::new_test();
606 let blob_merkle = Hash::from([1; 32]);
607 fasync::Task::spawn(async move {
608 match stream.try_next().await.unwrap().unwrap() {
609 fio::DirectoryRequest::Unlink { name, responder, .. } => {
610 assert_eq!(name, blob_merkle.to_string());
611 responder.send(Ok(())).unwrap();
612 }
613 other => panic!("unexpected request: {other:?}"),
614 }
615 })
616 .detach();
617
618 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
619 }
620
621 #[fasync::run_singlethreaded(test)]
622 async fn has_blob() {
623 let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
624 let client = Client::for_ramdisk(&blobfs);
625
626 assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
627 assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
628
629 blobfs.stop().await.unwrap();
630 }
631
632 #[fasync::run_singlethreaded(test)]
633 async fn has_blob_fxblob() {
634 let blobfs =
635 BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
636 let client = Client::for_ramdisk(&blobfs);
637
638 assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
639 assert!(!client.has_blob(&Hash::from([1; 32])).await);
640
641 blobfs.stop().await.unwrap();
642 }
643
644 #[fasync::run_singlethreaded(test)]
645 async fn has_blob_return_false_if_blob_is_partially_written() {
646 let blobfs = BlobfsRamdisk::start().await.unwrap();
647 let client = Client::for_ramdisk(&blobfs);
648
649 let blob = [3; 1024];
650 let hash = fuchsia_merkle::from_slice(&blob).root();
651
652 let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
653 assert_eq!(client.has_blob(&hash).await, false);
654 file.set_len(blob.len() as u64).unwrap();
655 assert_eq!(client.has_blob(&hash).await, false);
656 file.write_all(&blob[..512]).unwrap();
657 assert_eq!(client.has_blob(&hash).await, false);
658 file.write_all(&blob[512..]).unwrap();
659 assert_eq!(client.has_blob(&hash).await, true);
660
661 blobfs.stop().await.unwrap();
662 }
663
664 async fn resize(blob: &fio::FileProxy, size: usize) {
665 let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
666 }
667
668 async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
669 assert_eq!(
670 blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
671 bytes.len() as u64
672 );
673 }
674
675 #[fasync::run_singlethreaded(test)]
676 async fn write_delivery_blob() {
677 let blobfs = BlobfsRamdisk::start().await.unwrap();
678 let client = Client::for_ramdisk(&blobfs);
679
680 let content = [3; 1024];
681 let hash = fuchsia_merkle::from_slice(&content).root();
682 let delivery_content =
683 delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
684
685 let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
686
687 let () = resize(&proxy, delivery_content.len()).await;
688 let () = write(&proxy, &delivery_content).await;
689
690 assert!(client.has_blob(&hash).await);
691
692 blobfs.stop().await.unwrap();
693 }
694
695 struct TestBlob {
699 _blob: fio::FileProxy,
700 hash: Hash,
701 }
702
703 async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
704 let hash = fuchsia_merkle::from_slice(content).root();
705 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
706 TestBlob { _blob, hash }
707 }
708
709 async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
710 let hash = fuchsia_merkle::from_slice(content).root();
711 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
712 let () = resize(&_blob, content.len()).await;
713 TestBlob { _blob, hash }
714 }
715
716 async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
717 let hash = fuchsia_merkle::from_slice(content).root();
718 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
719 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
720 let () = resize(&_blob, content.len()).await;
721 let () = write(&_blob, &content[..content.len() / 2]).await;
722 TestBlob { _blob, hash }
723 }
724
725 async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
726 let hash = fuchsia_merkle::from_slice(content).root();
727 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
728 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
729 let () = resize(&_blob, content.len()).await;
730 let () = write(&_blob, &content).await;
731 TestBlob { _blob, hash }
732 }
733
734 #[fasync::run_singlethreaded(test)]
735 async fn filter_to_missing_blobs() {
736 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
737 let client = Client::for_ramdisk(&blobfs);
738
739 let missing_hash0 = Hash::from([0; 32]);
740 let missing_hash1 = Hash::from([1; 32]);
741
742 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
743 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
744
745 assert_eq!(
746 client
747 .filter_to_missing_blobs(
748 [missing_hash0, missing_hash1, present_blob0.hash, present_blob1.hash]
750 )
751 .await,
752 hashset! { missing_hash0, missing_hash1 }
753 );
754
755 blobfs.stop().await.unwrap();
756 }
757
758 #[fasync::run_singlethreaded(test)]
760 async fn filter_to_missing_blobs_with_partially_written_blobs() {
761 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
762 let client = Client::for_ramdisk(&blobfs);
763
764 let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
766
767 let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
769
770 let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
772
773 let present_blob = fully_write_blob(&client, &[3; 1024]).await;
775
776 assert_eq!(
777 client
778 .filter_to_missing_blobs([
779 missing_blob0.hash,
780 missing_blob1.hash,
781 missing_blob2.hash,
782 present_blob.hash
783 ])
784 .await,
785 hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
787 );
788
789 blobfs.stop().await.unwrap();
790 }
791
792 #[fasync::run_singlethreaded(test)]
793 async fn sync() {
794 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
795 let counter_clone = Arc::clone(&counter);
796 let (client, mut stream) = Client::new_test();
797 fasync::Task::spawn(async move {
798 match stream.try_next().await.unwrap().unwrap() {
799 fio::DirectoryRequest::Sync { responder } => {
800 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
801 responder.send(Ok(())).unwrap();
802 }
803 other => panic!("unexpected request: {other:?}"),
804 }
805 })
806 .detach();
807
808 assert_matches!(client.sync().await, Ok(()));
809 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
810 }
811
812 #[fasync::run_singlethreaded(test)]
813 async fn open_blob_for_write_uses_fxblob_if_configured() {
814 let (blob_creator, mut blob_creator_stream) =
815 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
816 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
817 let client = Client::new(
818 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
819 Some(blob_creator),
820 Some(blob_reader),
821 None,
822 )
823 .unwrap();
824
825 fuchsia_async::Task::spawn(async move {
826 match blob_creator_stream.next().await.unwrap().unwrap() {
827 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
828 assert_eq!(hash, [0; 32]);
829 assert!(!allow_existing);
830 let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
831 }
832 }
833 })
834 .detach();
835
836 assert_matches!(
837 client.open_blob_for_write(&[0; 32].into()).await,
838 Ok(fpkg::BlobWriter::Writer(_))
839 );
840 }
841
842 #[fasync::run_singlethreaded(test)]
843 async fn open_blob_for_write_fxblob_maps_already_exists() {
844 let (blob_creator, mut blob_creator_stream) =
845 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
846 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
847
848 let client = Client::new(
849 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
850 Some(blob_creator),
851 Some(blob_reader),
852 None,
853 )
854 .unwrap();
855
856 fuchsia_async::Task::spawn(async move {
857 match blob_creator_stream.next().await.unwrap().unwrap() {
858 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
859 assert_eq!(hash, [0; 32]);
860 assert!(!allow_existing);
861 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
862 }
863 }
864 })
865 .detach();
866
867 assert_matches!(
868 client.open_blob_for_write(&[0; 32].into()).await,
869 Err(CreateError::AlreadyExists)
870 );
871 }
872
873 #[fasync::run_singlethreaded(test)]
874 async fn concurrent_list_known_blobs_all_return_full_contents() {
875 use futures::StreamExt;
876 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
877 let client = Client::for_ramdisk(&blobfs);
878
879 for i in 0..256u16 {
886 let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
887 }
888
889 let () = futures::stream::iter(0..100)
890 .for_each_concurrent(None, |_| async {
891 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
892 })
893 .await;
894 }
895}