1#![deny(missing_docs)]
6
7use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29 #[error("while opening blobfs dir")]
30 OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32 #[error("while cloning the blobfs dir")]
33 CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35 #[error("while listing blobfs dir")]
36 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38 #[error("while deleting blob")]
39 Unlink(#[source] Status),
40
41 #[error("while sync'ing")]
42 Sync(#[source] Status),
43
44 #[error("while parsing blob merkle hash")]
45 ParseHash(#[from] ParseHashError),
46
47 #[error("FIDL error")]
48 Fidl(#[from] fidl::Error),
49
50 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51 ConnectToBlobCreator(#[source] anyhow::Error),
52
53 #[error("while connecting to fuchsia.fxfs/BlobReader")]
54 ConnectToBlobReader(#[source] anyhow::Error),
55
56 #[error("while setting the VmexResource")]
57 InitVmexResource(#[source] anyhow::Error),
58}
59
60#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64 #[error("the blob already exists or is being concurrently written")]
65 AlreadyExists,
66
67 #[error("while creating the blob")]
68 Io(#[source] fuchsia_fs::node::OpenError),
69
70 #[error("while converting the proxy into a client end")]
71 ConvertToClientEnd,
72
73 #[error("FIDL error")]
74 Fidl(#[from] fidl::Error),
75
76 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77 BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81 fn from(e: ffxfs::CreateBlobError) -> Self {
82 match e {
83 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85 }
86 }
87}
88
89#[derive(Default)]
91pub struct ClientBuilder {
92 use_reader: Reader,
93 use_creator: bool,
94 readable: bool,
95 writable: bool,
96 executable: bool,
97}
98
99#[derive(Default)]
100enum Reader {
101 #[default]
102 DontUse,
103 Use {
104 use_vmex: bool,
105 },
106}
107
108impl ClientBuilder {
109 pub async fn build(self) -> Result<Client, BlobfsError> {
113 let mut flags = fio::Flags::empty();
114 if self.readable {
115 flags |= fio::PERM_READABLE
116 }
117 if self.writable {
118 flags |= fio::PERM_WRITABLE
119 }
120 if self.executable {
121 flags |= fio::PERM_EXECUTABLE
122 }
123 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
124 let reader = match self.use_reader {
125 Reader::DontUse => None,
126 Reader::Use { use_vmex } => {
127 if use_vmex {
128 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
129 fidl_fuchsia_kernel::VmexResourceMarker,
130 >() {
131 if let Ok(vmex) = client.get().await {
132 info!("Got vmex resource");
133 vmo_blob::init_vmex_resource(vmex)
134 .map_err(BlobfsError::InitVmexResource)?;
135 }
136 }
137 }
138 Some(
139 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
140 .map_err(BlobfsError::ConnectToBlobReader)?,
141 )
142 }
143 };
144
145 let creator = if self.use_creator {
146 Some(
147 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
148 .map_err(BlobfsError::ConnectToBlobCreator)?,
149 )
150 } else {
151 None
152 };
153
154 Ok(Client { dir, creator, reader })
155 }
156
157 pub fn use_reader(self) -> Self {
160 Self { use_reader: Reader::Use { use_vmex: true }, ..self }
161 }
162
163 pub fn use_reader_no_vmex(self) -> Self {
166 Self { use_reader: Reader::Use { use_vmex: false }, ..self }
167 }
168
169 pub fn use_creator(self) -> Self {
171 Self { use_creator: true, ..self }
172 }
173
174 pub fn readable(self) -> Self {
177 Self { readable: true, ..self }
178 }
179
180 pub fn writable(self) -> Self {
183 Self { writable: true, ..self }
184 }
185
186 pub fn executable(self) -> Self {
189 Self { executable: true, ..self }
190 }
191}
192
193impl Client {
194 pub fn builder() -> ClientBuilder {
196 Default::default()
197 }
198}
199#[derive(Debug, Clone)]
201pub struct Client {
202 dir: fio::DirectoryProxy,
203 creator: Option<ffxfs::BlobCreatorProxy>,
204 reader: Option<ffxfs::BlobReaderProxy>,
205}
206
207impl Client {
208 pub fn new(
213 dir: fio::DirectoryProxy,
214 creator: Option<ffxfs::BlobCreatorProxy>,
215 reader: Option<ffxfs::BlobReaderProxy>,
216 vmex: Option<zx::Resource>,
217 ) -> Result<Self, anyhow::Error> {
218 if let Some(vmex) = vmex {
219 vmo_blob::init_vmex_resource(vmex)?;
220 }
221 Ok(Self { dir, creator, reader })
222 }
223
224 pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
231 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
232
233 (Self { dir, creator: None, reader: None }, stream)
234 }
235
236 pub fn new_mock() -> (Self, mock::Mock) {
243 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
244
245 (Self { dir, creator: None, reader: None }, mock::Mock { stream })
246 }
247
248 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
250 if let Some(reader) = &self.reader {
251 reader
252 .get_vmo(hash)
253 .await
254 .map_err(GetBlobVmoError::Fidl)?
255 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
256 } else {
257 let blob =
258 fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
259 .await
260 .map_err(GetBlobVmoError::OpenBlob)?;
261 blob.get_backing_memory(fio::VmoFlags::READ)
262 .await
263 .map_err(GetBlobVmoError::Fidl)?
264 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
265 }
266 }
267
268 pub fn deprecated_open_blob_for_read(
271 &self,
272 blob: &Hash,
273 flags: fio::OpenFlags,
274 scope: ExecutionScope,
275 server_end: ServerEnd<fio::NodeMarker>,
276 ) -> Result<(), fidl::Error> {
277 let describe = flags.contains(fio::OpenFlags::DESCRIBE);
278 if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
280 send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
281 return Ok(());
282 }
283 if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
285 send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
286 return Ok(());
287 }
288 if let Some(reader) = &self.reader {
290 let object_request = flags.to_object_request(server_end);
291 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
292 Ok(())
293 } else {
294 self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
295 }
296 }
297
298 pub fn open_blob_for_read(
301 &self,
302 blob: &Hash,
303 flags: fio::Flags,
304 scope: ExecutionScope,
305 object_request: ObjectRequestRef<'_>,
306 ) -> Result<(), zx::Status> {
307 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
309 return Err(zx::Status::ACCESS_DENIED);
310 }
311 if flags.creation_mode() != vfs::CreationMode::Never {
313 return Err(zx::Status::NOT_SUPPORTED);
314 }
315 let object_request = object_request.take();
317 if let Some(reader) = &self.reader {
319 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
320 } else {
321 let _: Result<(), ()> = self
322 .dir
323 .open(
324 &blob.to_string(),
325 flags,
326 &object_request.options(),
327 object_request.into_channel(),
328 )
329 .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
330 }
331 Ok(())
332 }
333
334 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
336 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
342 fuchsia_fs::directory::readdir(&private_connection)
343 .await
344 .map_err(BlobfsError::ReadDir)?
345 .into_iter()
346 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
347 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
348 .collect()
349 }
350
351 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
353 self.dir
354 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
355 .await?
356 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
357 }
358
359 pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
361 Ok(if let Some(blob_creator) = &self.creator {
362 fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
363 } else {
364 fpkg::BlobWriter::File(
365 self.open_blob_proxy_from_dir_for_write(blob)
366 .await?
367 .into_channel()
368 .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
369 .into_zx_channel()
370 .into(),
371 )
372 })
373 }
374
375 async fn open_blob_proxy_from_dir_for_write(
377 &self,
378 blob: &Hash,
379 ) -> Result<fio::FileProxy, CreateError> {
380 let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
381
382 let path = delivery_blob::delivery_blob_path(blob);
383 fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
384 fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
385 CreateError::AlreadyExists
386 }
387 other => CreateError::Io(other),
388 })
389 }
390
391 pub async fn has_blob(&self, blob: &Hash) -> bool {
399 if let Some(reader) = &self.reader {
400 matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
402 } else {
403 let file = match fuchsia_fs::directory::open_file_async(
404 &self.dir,
405 &blob.to_string(),
406 fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
407 ) {
408 Ok(file) => file,
409 Err(_) => return false,
410 };
411
412 let mut events = file.take_event_stream();
413
414 let event = match events.next().await {
415 None => return false,
416 Some(event) => match event {
417 Err(_) => return false,
418 Ok(event) => match event {
419 fio::FileEvent::OnOpen_ { s, info } => {
420 if Status::from_raw(s) != Status::OK {
421 return false;
422 }
423
424 match info {
425 Some(info) => match *info {
426 fio::NodeInfoDeprecated::File(fio::FileObject {
427 event: Some(event),
428 stream: _, }) => event,
430 _ => return false,
431 },
432 _ => return false,
433 }
434 }
435 fio::FileEvent::OnRepresentation { payload } => match payload {
436 fio::Representation::File(fio::FileInfo {
437 observer: Some(event),
438 stream: _, ..
440 }) => event,
441 _ => return false,
442 },
443 fio::FileEvent::_UnknownEvent { .. } => return false,
444 },
445 },
446 };
447
448 match event.wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST) {
451 Ok(_) => true,
452 Err(status) => {
453 if status != Status::TIMED_OUT {
454 warn!("blobfs: unknown error asserting blob existence: {}", status);
455 }
456 false
457 }
458 }
459 }
460 }
461
462 pub async fn filter_to_missing_blobs(&self, candidates: &HashSet<Hash>) -> HashSet<Hash> {
473 stream::iter(candidates.clone())
481 .map(move |blob| async move {
482 if self.has_blob(&blob).await {
483 None
484 } else {
485 Some(blob)
486 }
487 })
488 .buffer_unordered(10)
491 .filter_map(|blob| async move { blob })
492 .collect()
493 .await
494 }
495
496 pub async fn sync(&self) -> Result<(), BlobfsError> {
498 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
499 }
500}
501
502fn open_blob_with_reader<P: ProtocolsExt + Send>(
505 reader: ffxfs::BlobReaderProxy,
506 blob_hash: Hash,
507 scope: ExecutionScope,
508 protocols: P,
509 object_request: ObjectRequest,
510) {
511 scope.clone().spawn(object_request.handle_async(async move |object_request| {
512 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
513 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
514 error!("Blob reader channel closed: {:?}", status);
515 status
516 } else {
517 error!("Transport error on get_vmo: {:?}", fidl_error);
518 zx::Status::INTERNAL
519 }
520 })?;
521 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
522 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
523 object_request
524 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
525 .await
526 }));
527}
528
529#[derive(thiserror::Error, Debug)]
530#[allow(missing_docs)]
531pub enum GetBlobVmoError {
532 #[error("getting the vmo")]
533 GetVmo(#[source] Status),
534
535 #[error("opening the blob")]
536 OpenBlob(#[source] fuchsia_fs::node::OpenError),
537
538 #[error("making a fidl request")]
539 Fidl(#[source] fidl::Error),
540}
541
542#[cfg(test)]
543impl Client {
544 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
555 Self::new(
556 blobfs.root_dir_proxy().unwrap(),
557 blobfs.blob_creator_proxy().unwrap(),
558 blobfs.blob_reader_proxy().unwrap(),
559 None,
560 )
561 .unwrap()
562 }
563}
564
565#[cfg(test)]
566#[allow(clippy::bool_assert_comparison)]
567mod tests {
568 use super::*;
569 use assert_matches::assert_matches;
570 use blobfs_ramdisk::BlobfsRamdisk;
571 use fuchsia_async as fasync;
572 use futures::stream::TryStreamExt;
573 use maplit::hashset;
574 use std::io::Write as _;
575 use std::sync::Arc;
576
577 #[fasync::run_singlethreaded(test)]
578 async fn list_known_blobs_empty() {
579 let blobfs = BlobfsRamdisk::start().await.unwrap();
580 let client = Client::for_ramdisk(&blobfs);
581
582 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
583 blobfs.stop().await.unwrap();
584 }
585
586 #[fasync::run_singlethreaded(test)]
587 async fn list_known_blobs() {
588 let blobfs = BlobfsRamdisk::builder()
589 .with_blob(&b"blob 1"[..])
590 .with_blob(&b"blob 2"[..])
591 .start()
592 .await
593 .unwrap();
594 let client = Client::for_ramdisk(&blobfs);
595
596 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
597 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
598 blobfs.stop().await.unwrap();
599 }
600
601 #[fasync::run_singlethreaded(test)]
602 async fn delete_blob_and_then_list() {
603 let blobfs = BlobfsRamdisk::builder()
604 .with_blob(&b"blob 1"[..])
605 .with_blob(&b"blob 2"[..])
606 .start()
607 .await
608 .unwrap();
609 let client = Client::for_ramdisk(&blobfs);
610
611 let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
612 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
613
614 let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
615 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
616 blobfs.stop().await.unwrap();
617 }
618
619 #[fasync::run_singlethreaded(test)]
620 async fn delete_non_existing_blob() {
621 let blobfs = BlobfsRamdisk::start().await.unwrap();
622 let client = Client::for_ramdisk(&blobfs);
623 let blob_merkle = Hash::from([1; 32]);
624
625 assert_matches!(
626 client.delete_blob(&blob_merkle).await,
627 Err(BlobfsError::Unlink(Status::NOT_FOUND))
628 );
629 blobfs.stop().await.unwrap();
630 }
631
632 #[fasync::run_singlethreaded(test)]
633 async fn delete_blob_mock() {
634 let (client, mut stream) = Client::new_test();
635 let blob_merkle = Hash::from([1; 32]);
636 fasync::Task::spawn(async move {
637 match stream.try_next().await.unwrap().unwrap() {
638 fio::DirectoryRequest::Unlink { name, responder, .. } => {
639 assert_eq!(name, blob_merkle.to_string());
640 responder.send(Ok(())).unwrap();
641 }
642 other => panic!("unexpected request: {other:?}"),
643 }
644 })
645 .detach();
646
647 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
648 }
649
650 #[fasync::run_singlethreaded(test)]
651 async fn has_blob() {
652 let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
653 let client = Client::for_ramdisk(&blobfs);
654
655 assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
656 assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
657
658 blobfs.stop().await.unwrap();
659 }
660
661 #[fasync::run_singlethreaded(test)]
662 async fn has_blob_fxblob() {
663 let blobfs =
664 BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
665 let client = Client::for_ramdisk(&blobfs);
666
667 assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
668 assert!(!client.has_blob(&Hash::from([1; 32])).await);
669
670 blobfs.stop().await.unwrap();
671 }
672
673 #[fasync::run_singlethreaded(test)]
674 async fn has_blob_return_false_if_blob_is_partially_written() {
675 let blobfs = BlobfsRamdisk::start().await.unwrap();
676 let client = Client::for_ramdisk(&blobfs);
677
678 let blob = [3; 1024];
679 let hash = fuchsia_merkle::from_slice(&blob).root();
680
681 let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
682 assert_eq!(client.has_blob(&hash).await, false);
683 file.set_len(blob.len() as u64).unwrap();
684 assert_eq!(client.has_blob(&hash).await, false);
685 file.write_all(&blob[..512]).unwrap();
686 assert_eq!(client.has_blob(&hash).await, false);
687 file.write_all(&blob[512..]).unwrap();
688 assert_eq!(client.has_blob(&hash).await, true);
689
690 blobfs.stop().await.unwrap();
691 }
692
693 async fn resize(blob: &fio::FileProxy, size: usize) {
694 let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
695 }
696
697 async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
698 assert_eq!(
699 blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
700 bytes.len() as u64
701 );
702 }
703
704 #[fasync::run_singlethreaded(test)]
705 async fn write_delivery_blob() {
706 let blobfs = BlobfsRamdisk::start().await.unwrap();
707 let client = Client::for_ramdisk(&blobfs);
708
709 let content = [3; 1024];
710 let hash = fuchsia_merkle::from_slice(&content).root();
711 let delivery_content =
712 delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
713
714 let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
715
716 let () = resize(&proxy, delivery_content.len()).await;
717 let () = write(&proxy, &delivery_content).await;
718
719 assert!(client.has_blob(&hash).await);
720
721 blobfs.stop().await.unwrap();
722 }
723
724 struct TestBlob {
728 _blob: fio::FileProxy,
729 hash: Hash,
730 }
731
732 async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
733 let hash = fuchsia_merkle::from_slice(content).root();
734 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
735 TestBlob { _blob, hash }
736 }
737
738 async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
739 let hash = fuchsia_merkle::from_slice(content).root();
740 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
741 let () = resize(&_blob, content.len()).await;
742 TestBlob { _blob, hash }
743 }
744
745 async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
746 let hash = fuchsia_merkle::from_slice(content).root();
747 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
748 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
749 let () = resize(&_blob, content.len()).await;
750 let () = write(&_blob, &content[..content.len() / 2]).await;
751 TestBlob { _blob, hash }
752 }
753
754 async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
755 let hash = fuchsia_merkle::from_slice(content).root();
756 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
757 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
758 let () = resize(&_blob, content.len()).await;
759 let () = write(&_blob, &content).await;
760 TestBlob { _blob, hash }
761 }
762
763 #[fasync::run_singlethreaded(test)]
764 async fn filter_to_missing_blobs() {
765 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
766 let client = Client::for_ramdisk(&blobfs);
767
768 let missing_hash0 = Hash::from([0; 32]);
769 let missing_hash1 = Hash::from([1; 32]);
770
771 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
772 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
773
774 assert_eq!(
775 client
776 .filter_to_missing_blobs(
777 &hashset! { missing_hash0, missing_hash1,
779 present_blob0.hash, present_blob1.hash
780 },
781 )
782 .await,
783 hashset! { missing_hash0, missing_hash1 }
784 );
785
786 blobfs.stop().await.unwrap();
787 }
788
789 #[fasync::run_singlethreaded(test)]
791 async fn filter_to_missing_blobs_with_partially_written_blobs() {
792 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
793 let client = Client::for_ramdisk(&blobfs);
794
795 let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
797
798 let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
800
801 let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
803
804 let present_blob = fully_write_blob(&client, &[3; 1024]).await;
806
807 assert_eq!(
808 client
809 .filter_to_missing_blobs(&hashset! {
810 missing_blob0.hash,
811 missing_blob1.hash,
812 missing_blob2.hash,
813 present_blob.hash
814 },)
815 .await,
816 hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
818 );
819
820 blobfs.stop().await.unwrap();
821 }
822
823 #[fasync::run_singlethreaded(test)]
824 async fn sync() {
825 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
826 let counter_clone = Arc::clone(&counter);
827 let (client, mut stream) = Client::new_test();
828 fasync::Task::spawn(async move {
829 match stream.try_next().await.unwrap().unwrap() {
830 fio::DirectoryRequest::Sync { responder } => {
831 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
832 responder.send(Ok(())).unwrap();
833 }
834 other => panic!("unexpected request: {other:?}"),
835 }
836 })
837 .detach();
838
839 assert_matches!(client.sync().await, Ok(()));
840 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
841 }
842
843 #[fasync::run_singlethreaded(test)]
844 async fn open_blob_for_write_uses_fxblob_if_configured() {
845 let (blob_creator, mut blob_creator_stream) =
846 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
847 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
848 let client = Client::new(
849 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
850 Some(blob_creator),
851 Some(blob_reader),
852 None,
853 )
854 .unwrap();
855
856 fuchsia_async::Task::spawn(async move {
857 match blob_creator_stream.next().await.unwrap().unwrap() {
858 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
859 assert_eq!(hash, [0; 32]);
860 assert!(!allow_existing);
861 let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
862 }
863 }
864 })
865 .detach();
866
867 assert_matches!(
868 client.open_blob_for_write(&[0; 32].into()).await,
869 Ok(fpkg::BlobWriter::Writer(_))
870 );
871 }
872
873 #[fasync::run_singlethreaded(test)]
874 async fn open_blob_for_write_fxblob_maps_already_exists() {
875 let (blob_creator, mut blob_creator_stream) =
876 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
877 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
878
879 let client = Client::new(
880 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
881 Some(blob_creator),
882 Some(blob_reader),
883 None,
884 )
885 .unwrap();
886
887 fuchsia_async::Task::spawn(async move {
888 match blob_creator_stream.next().await.unwrap().unwrap() {
889 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
890 assert_eq!(hash, [0; 32]);
891 assert!(!allow_existing);
892 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
893 }
894 }
895 })
896 .detach();
897
898 assert_matches!(
899 client.open_blob_for_write(&[0; 32].into()).await,
900 Err(CreateError::AlreadyExists)
901 );
902 }
903
904 #[fasync::run_singlethreaded(test)]
905 async fn concurrent_list_known_blobs_all_return_full_contents() {
906 use futures::StreamExt;
907 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
908 let client = Client::for_ramdisk(&blobfs);
909
910 for i in 0..256u16 {
917 let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
918 }
919
920 let () = futures::stream::iter(0..100)
921 .for_each_concurrent(None, |_| async {
922 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
923 })
924 .await;
925 }
926}