1#![deny(missing_docs)]
6
7use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29 #[error("while opening blobfs dir")]
30 OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32 #[error("while cloning the blobfs dir")]
33 CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35 #[error("while listing blobfs dir")]
36 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38 #[error("while deleting blob")]
39 Unlink(#[source] Status),
40
41 #[error("while sync'ing")]
42 Sync(#[source] Status),
43
44 #[error("while parsing blob merkle hash")]
45 ParseHash(#[from] ParseHashError),
46
47 #[error("FIDL error")]
48 Fidl(#[from] fidl::Error),
49
50 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51 ConnectToBlobCreator(#[source] anyhow::Error),
52
53 #[error("while connecting to fuchsia.fxfs/BlobReader")]
54 ConnectToBlobReader(#[source] anyhow::Error),
55
56 #[error("while setting the VmexResource")]
57 InitVmexResource(#[source] anyhow::Error),
58}
59
60#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64 #[error("the blob already exists or is being concurrently written")]
65 AlreadyExists,
66
67 #[error("while creating the blob")]
68 Io(#[source] fuchsia_fs::node::OpenError),
69
70 #[error("while converting the proxy into a client end")]
71 ConvertToClientEnd,
72
73 #[error("FIDL error")]
74 Fidl(#[from] fidl::Error),
75
76 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77 BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81 fn from(e: ffxfs::CreateBlobError) -> Self {
82 match e {
83 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85 }
86 }
87}
88
89#[derive(Default)]
91pub struct ClientBuilder {
92 readable: bool,
93 writable: bool,
94 executable: bool,
95}
96
97impl ClientBuilder {
98 pub async fn build(self) -> Result<Client, BlobfsError> {
102 let mut flags = fio::Flags::empty();
103 if self.readable {
104 flags |= fio::PERM_READABLE
105 }
106 if self.writable {
107 flags |= fio::PERM_WRITABLE
108 }
109 if self.executable {
110 flags |= fio::PERM_EXECUTABLE
111 }
112 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
113 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
114 fidl_fuchsia_kernel::VmexResourceMarker,
115 >() {
116 if let Ok(vmex) = client.get().await {
117 info!("Got vmex resource");
118 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
119 }
120 }
121 let reader = Some(
122 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
123 .map_err(BlobfsError::ConnectToBlobReader)?,
124 );
125
126 let creator = if self.writable {
127 Some(
128 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
129 .map_err(BlobfsError::ConnectToBlobCreator)?,
130 )
131 } else {
132 None
133 };
134
135 Ok(Client { dir, creator, reader })
136 }
137
138 pub fn readable(self) -> Self {
141 Self { readable: true, ..self }
142 }
143
144 pub fn writable(self) -> Self {
148 Self { writable: true, ..self }
149 }
150
151 pub fn executable(self) -> Self {
154 Self { executable: true, ..self }
155 }
156}
157
158impl Client {
159 pub fn builder() -> ClientBuilder {
161 Default::default()
162 }
163}
164#[derive(Debug, Clone)]
166pub struct Client {
167 dir: fio::DirectoryProxy,
168 creator: Option<ffxfs::BlobCreatorProxy>,
169 reader: Option<ffxfs::BlobReaderProxy>,
170}
171
172impl Client {
173 pub fn new(
178 dir: fio::DirectoryProxy,
179 creator: Option<ffxfs::BlobCreatorProxy>,
180 reader: Option<ffxfs::BlobReaderProxy>,
181 vmex: Option<zx::Resource>,
182 ) -> Result<Self, anyhow::Error> {
183 if let Some(vmex) = vmex {
184 vmo_blob::init_vmex_resource(vmex)?;
185 }
186 Ok(Self { dir, creator, reader })
187 }
188
189 pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
196 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
197
198 (Self { dir, creator: None, reader: None }, stream)
199 }
200
201 pub fn new_mock() -> (Self, mock::Mock) {
208 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
209
210 (Self { dir, creator: None, reader: None }, mock::Mock { stream })
211 }
212
213 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
215 if let Some(reader) = &self.reader {
216 reader
217 .get_vmo(hash)
218 .await
219 .map_err(GetBlobVmoError::Fidl)?
220 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
221 } else {
222 let blob =
223 fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
224 .await
225 .map_err(GetBlobVmoError::OpenBlob)?;
226 blob.get_backing_memory(fio::VmoFlags::READ)
227 .await
228 .map_err(GetBlobVmoError::Fidl)?
229 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
230 }
231 }
232
233 pub fn deprecated_open_blob_for_read(
236 &self,
237 blob: &Hash,
238 flags: fio::OpenFlags,
239 scope: ExecutionScope,
240 server_end: ServerEnd<fio::NodeMarker>,
241 ) -> Result<(), fidl::Error> {
242 let describe = flags.contains(fio::OpenFlags::DESCRIBE);
243 if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
245 send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
246 return Ok(());
247 }
248 if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
250 send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
251 return Ok(());
252 }
253 if let Some(reader) = &self.reader {
255 let object_request = flags.to_object_request(server_end);
256 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
257 Ok(())
258 } else {
259 self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
260 }
261 }
262
263 pub fn open_blob_for_read(
266 &self,
267 blob: &Hash,
268 flags: fio::Flags,
269 scope: ExecutionScope,
270 object_request: ObjectRequestRef<'_>,
271 ) -> Result<(), zx::Status> {
272 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
274 return Err(zx::Status::ACCESS_DENIED);
275 }
276 if flags.creation_mode() != vfs::CreationMode::Never {
278 return Err(zx::Status::NOT_SUPPORTED);
279 }
280 let object_request = object_request.take();
282 if let Some(reader) = &self.reader {
284 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
285 } else {
286 let _: Result<(), ()> = self
287 .dir
288 .open(
289 &blob.to_string(),
290 flags,
291 &object_request.options(),
292 object_request.into_channel(),
293 )
294 .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
295 }
296 Ok(())
297 }
298
299 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
301 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
307 fuchsia_fs::directory::readdir(&private_connection)
308 .await
309 .map_err(BlobfsError::ReadDir)?
310 .into_iter()
311 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
312 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
313 .collect()
314 }
315
316 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
318 self.dir
319 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
320 .await?
321 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
322 }
323
324 pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
326 Ok(if let Some(blob_creator) = &self.creator {
327 fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
328 } else {
329 fpkg::BlobWriter::File(
330 self.open_blob_proxy_from_dir_for_write(blob)
331 .await?
332 .into_channel()
333 .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
334 .into_zx_channel()
335 .into(),
336 )
337 })
338 }
339
340 async fn open_blob_proxy_from_dir_for_write(
342 &self,
343 blob: &Hash,
344 ) -> Result<fio::FileProxy, CreateError> {
345 let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
346
347 let path = delivery_blob::delivery_blob_path(blob);
348 fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
349 fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
350 CreateError::AlreadyExists
351 }
352 other => CreateError::Io(other),
353 })
354 }
355
356 pub async fn has_blob(&self, blob: &Hash) -> bool {
364 if let Some(reader) = &self.reader {
365 matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
367 } else {
368 let file = match fuchsia_fs::directory::open_file_async(
369 &self.dir,
370 &blob.to_string(),
371 fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
372 ) {
373 Ok(file) => file,
374 Err(_) => return false,
375 };
376
377 let mut events = file.take_event_stream();
378
379 let event = match events.next().await {
380 None => return false,
381 Some(event) => match event {
382 Err(_) => return false,
383 Ok(event) => match event {
384 fio::FileEvent::OnOpen_ { s, info } => {
385 if Status::from_raw(s) != Status::OK {
386 return false;
387 }
388
389 match info {
390 Some(info) => match *info {
391 fio::NodeInfoDeprecated::File(fio::FileObject {
392 event: Some(event),
393 stream: _, }) => event,
395 _ => return false,
396 },
397 _ => return false,
398 }
399 }
400 fio::FileEvent::OnRepresentation { payload } => match payload {
401 fio::Representation::File(fio::FileInfo {
402 observer: Some(event),
403 stream: _, ..
405 }) => event,
406 _ => return false,
407 },
408 fio::FileEvent::_UnknownEvent { .. } => return false,
409 },
410 },
411 };
412
413 match event
416 .wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST)
417 .to_result()
418 {
419 Ok(_) => true,
420 Err(status) => {
421 if status != Status::TIMED_OUT {
422 warn!("blobfs: unknown error asserting blob existence: {}", status);
423 }
424 false
425 }
426 }
427 }
428 }
429
430 pub async fn filter_to_missing_blobs(&self, candidates: &HashSet<Hash>) -> HashSet<Hash> {
441 stream::iter(candidates.clone())
449 .map(move |blob| async move {
450 if self.has_blob(&blob).await {
451 None
452 } else {
453 Some(blob)
454 }
455 })
456 .buffer_unordered(10)
459 .filter_map(|blob| async move { blob })
460 .collect()
461 .await
462 }
463
464 pub async fn sync(&self) -> Result<(), BlobfsError> {
466 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
467 }
468}
469
470fn open_blob_with_reader<P: ProtocolsExt + Send>(
473 reader: ffxfs::BlobReaderProxy,
474 blob_hash: Hash,
475 scope: ExecutionScope,
476 protocols: P,
477 object_request: ObjectRequest,
478) {
479 scope.clone().spawn(object_request.handle_async(async move |object_request| {
480 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
481 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
482 error!("Blob reader channel closed: {:?}", status);
483 status
484 } else {
485 error!("Transport error on get_vmo: {:?}", fidl_error);
486 zx::Status::INTERNAL
487 }
488 })?;
489 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
490 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
491 object_request
492 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
493 .await
494 }));
495}
496
497#[derive(thiserror::Error, Debug)]
498#[allow(missing_docs)]
499pub enum GetBlobVmoError {
500 #[error("getting the vmo")]
501 GetVmo(#[source] Status),
502
503 #[error("opening the blob")]
504 OpenBlob(#[source] fuchsia_fs::node::OpenError),
505
506 #[error("making a fidl request")]
507 Fidl(#[source] fidl::Error),
508}
509
510#[cfg(test)]
511impl Client {
512 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
523 Self::new(
524 blobfs.root_dir_proxy().unwrap(),
525 blobfs.blob_creator_proxy().unwrap(),
526 blobfs.blob_reader_proxy().unwrap(),
527 None,
528 )
529 .unwrap()
530 }
531}
532
533#[cfg(test)]
534#[allow(clippy::bool_assert_comparison)]
535mod tests {
536 use super::*;
537 use assert_matches::assert_matches;
538 use blobfs_ramdisk::BlobfsRamdisk;
539 use fuchsia_async as fasync;
540 use futures::stream::TryStreamExt;
541 use maplit::hashset;
542 use std::io::Write as _;
543 use std::sync::Arc;
544
545 #[fasync::run_singlethreaded(test)]
546 async fn list_known_blobs_empty() {
547 let blobfs = BlobfsRamdisk::start().await.unwrap();
548 let client = Client::for_ramdisk(&blobfs);
549
550 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
551 blobfs.stop().await.unwrap();
552 }
553
554 #[fasync::run_singlethreaded(test)]
555 async fn list_known_blobs() {
556 let blobfs = BlobfsRamdisk::builder()
557 .with_blob(&b"blob 1"[..])
558 .with_blob(&b"blob 2"[..])
559 .start()
560 .await
561 .unwrap();
562 let client = Client::for_ramdisk(&blobfs);
563
564 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
565 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
566 blobfs.stop().await.unwrap();
567 }
568
569 #[fasync::run_singlethreaded(test)]
570 async fn delete_blob_and_then_list() {
571 let blobfs = BlobfsRamdisk::builder()
572 .with_blob(&b"blob 1"[..])
573 .with_blob(&b"blob 2"[..])
574 .start()
575 .await
576 .unwrap();
577 let client = Client::for_ramdisk(&blobfs);
578
579 let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
580 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
581
582 let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
583 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
584 blobfs.stop().await.unwrap();
585 }
586
587 #[fasync::run_singlethreaded(test)]
588 async fn delete_non_existing_blob() {
589 let blobfs = BlobfsRamdisk::start().await.unwrap();
590 let client = Client::for_ramdisk(&blobfs);
591 let blob_merkle = Hash::from([1; 32]);
592
593 assert_matches!(
594 client.delete_blob(&blob_merkle).await,
595 Err(BlobfsError::Unlink(Status::NOT_FOUND))
596 );
597 blobfs.stop().await.unwrap();
598 }
599
600 #[fasync::run_singlethreaded(test)]
601 async fn delete_blob_mock() {
602 let (client, mut stream) = Client::new_test();
603 let blob_merkle = Hash::from([1; 32]);
604 fasync::Task::spawn(async move {
605 match stream.try_next().await.unwrap().unwrap() {
606 fio::DirectoryRequest::Unlink { name, responder, .. } => {
607 assert_eq!(name, blob_merkle.to_string());
608 responder.send(Ok(())).unwrap();
609 }
610 other => panic!("unexpected request: {other:?}"),
611 }
612 })
613 .detach();
614
615 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
616 }
617
618 #[fasync::run_singlethreaded(test)]
619 async fn has_blob() {
620 let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
621 let client = Client::for_ramdisk(&blobfs);
622
623 assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
624 assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
625
626 blobfs.stop().await.unwrap();
627 }
628
629 #[fasync::run_singlethreaded(test)]
630 async fn has_blob_fxblob() {
631 let blobfs =
632 BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
633 let client = Client::for_ramdisk(&blobfs);
634
635 assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
636 assert!(!client.has_blob(&Hash::from([1; 32])).await);
637
638 blobfs.stop().await.unwrap();
639 }
640
641 #[fasync::run_singlethreaded(test)]
642 async fn has_blob_return_false_if_blob_is_partially_written() {
643 let blobfs = BlobfsRamdisk::start().await.unwrap();
644 let client = Client::for_ramdisk(&blobfs);
645
646 let blob = [3; 1024];
647 let hash = fuchsia_merkle::from_slice(&blob).root();
648
649 let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
650 assert_eq!(client.has_blob(&hash).await, false);
651 file.set_len(blob.len() as u64).unwrap();
652 assert_eq!(client.has_blob(&hash).await, false);
653 file.write_all(&blob[..512]).unwrap();
654 assert_eq!(client.has_blob(&hash).await, false);
655 file.write_all(&blob[512..]).unwrap();
656 assert_eq!(client.has_blob(&hash).await, true);
657
658 blobfs.stop().await.unwrap();
659 }
660
661 async fn resize(blob: &fio::FileProxy, size: usize) {
662 let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
663 }
664
665 async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
666 assert_eq!(
667 blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
668 bytes.len() as u64
669 );
670 }
671
672 #[fasync::run_singlethreaded(test)]
673 async fn write_delivery_blob() {
674 let blobfs = BlobfsRamdisk::start().await.unwrap();
675 let client = Client::for_ramdisk(&blobfs);
676
677 let content = [3; 1024];
678 let hash = fuchsia_merkle::from_slice(&content).root();
679 let delivery_content =
680 delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
681
682 let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
683
684 let () = resize(&proxy, delivery_content.len()).await;
685 let () = write(&proxy, &delivery_content).await;
686
687 assert!(client.has_blob(&hash).await);
688
689 blobfs.stop().await.unwrap();
690 }
691
692 struct TestBlob {
696 _blob: fio::FileProxy,
697 hash: Hash,
698 }
699
700 async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
701 let hash = fuchsia_merkle::from_slice(content).root();
702 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
703 TestBlob { _blob, hash }
704 }
705
706 async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
707 let hash = fuchsia_merkle::from_slice(content).root();
708 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
709 let () = resize(&_blob, content.len()).await;
710 TestBlob { _blob, hash }
711 }
712
713 async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
714 let hash = fuchsia_merkle::from_slice(content).root();
715 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
716 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
717 let () = resize(&_blob, content.len()).await;
718 let () = write(&_blob, &content[..content.len() / 2]).await;
719 TestBlob { _blob, hash }
720 }
721
722 async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
723 let hash = fuchsia_merkle::from_slice(content).root();
724 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
725 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
726 let () = resize(&_blob, content.len()).await;
727 let () = write(&_blob, &content).await;
728 TestBlob { _blob, hash }
729 }
730
731 #[fasync::run_singlethreaded(test)]
732 async fn filter_to_missing_blobs() {
733 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
734 let client = Client::for_ramdisk(&blobfs);
735
736 let missing_hash0 = Hash::from([0; 32]);
737 let missing_hash1 = Hash::from([1; 32]);
738
739 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
740 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
741
742 assert_eq!(
743 client
744 .filter_to_missing_blobs(
745 &hashset! { missing_hash0, missing_hash1,
747 present_blob0.hash, present_blob1.hash
748 },
749 )
750 .await,
751 hashset! { missing_hash0, missing_hash1 }
752 );
753
754 blobfs.stop().await.unwrap();
755 }
756
757 #[fasync::run_singlethreaded(test)]
759 async fn filter_to_missing_blobs_with_partially_written_blobs() {
760 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
761 let client = Client::for_ramdisk(&blobfs);
762
763 let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
765
766 let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
768
769 let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
771
772 let present_blob = fully_write_blob(&client, &[3; 1024]).await;
774
775 assert_eq!(
776 client
777 .filter_to_missing_blobs(&hashset! {
778 missing_blob0.hash,
779 missing_blob1.hash,
780 missing_blob2.hash,
781 present_blob.hash
782 },)
783 .await,
784 hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
786 );
787
788 blobfs.stop().await.unwrap();
789 }
790
791 #[fasync::run_singlethreaded(test)]
792 async fn sync() {
793 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
794 let counter_clone = Arc::clone(&counter);
795 let (client, mut stream) = Client::new_test();
796 fasync::Task::spawn(async move {
797 match stream.try_next().await.unwrap().unwrap() {
798 fio::DirectoryRequest::Sync { responder } => {
799 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
800 responder.send(Ok(())).unwrap();
801 }
802 other => panic!("unexpected request: {other:?}"),
803 }
804 })
805 .detach();
806
807 assert_matches!(client.sync().await, Ok(()));
808 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
809 }
810
811 #[fasync::run_singlethreaded(test)]
812 async fn open_blob_for_write_uses_fxblob_if_configured() {
813 let (blob_creator, mut blob_creator_stream) =
814 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
815 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
816 let client = Client::new(
817 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
818 Some(blob_creator),
819 Some(blob_reader),
820 None,
821 )
822 .unwrap();
823
824 fuchsia_async::Task::spawn(async move {
825 match blob_creator_stream.next().await.unwrap().unwrap() {
826 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
827 assert_eq!(hash, [0; 32]);
828 assert!(!allow_existing);
829 let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
830 }
831 }
832 })
833 .detach();
834
835 assert_matches!(
836 client.open_blob_for_write(&[0; 32].into()).await,
837 Ok(fpkg::BlobWriter::Writer(_))
838 );
839 }
840
841 #[fasync::run_singlethreaded(test)]
842 async fn open_blob_for_write_fxblob_maps_already_exists() {
843 let (blob_creator, mut blob_creator_stream) =
844 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
845 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
846
847 let client = Client::new(
848 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
849 Some(blob_creator),
850 Some(blob_reader),
851 None,
852 )
853 .unwrap();
854
855 fuchsia_async::Task::spawn(async move {
856 match blob_creator_stream.next().await.unwrap().unwrap() {
857 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
858 assert_eq!(hash, [0; 32]);
859 assert!(!allow_existing);
860 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
861 }
862 }
863 })
864 .detach();
865
866 assert_matches!(
867 client.open_blob_for_write(&[0; 32].into()).await,
868 Err(CreateError::AlreadyExists)
869 );
870 }
871
872 #[fasync::run_singlethreaded(test)]
873 async fn concurrent_list_known_blobs_all_return_full_contents() {
874 use futures::StreamExt;
875 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
876 let client = Client::for_ramdisk(&blobfs);
877
878 for i in 0..256u16 {
885 let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
886 }
887
888 let () = futures::stream::iter(0..100)
889 .for_each_concurrent(None, |_| async {
890 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
891 })
892 .await;
893 }
894}