Skip to main content

blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::ClientEnd;
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::execution_scope::ExecutionScope;
16use vfs::file::StreamIoConnection;
17use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt};
18use zx::{self as zx, Status};
19use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
20
21pub mod mock;
22pub use mock::Mock;
23
24#[derive(Debug, Error)]
25#[allow(missing_docs)]
26pub enum BlobStatusError {
27    #[error("this client was not created with a blob creator so it cannot write blobs")]
28    WritingNotConfigured,
29
30    #[error("the fidl call returned an unexpected error")]
31    NeedsOverwrite(#[source] Status),
32}
33
34/// Blobfs client errors.
35#[derive(Debug, Error)]
36#[allow(missing_docs)]
37pub enum BlobfsError {
38    #[error("while opening blobfs dir")]
39    OpenDir(#[from] fuchsia_fs::node::OpenError),
40
41    #[error("while cloning the blobfs dir")]
42    CloneDir(#[from] fuchsia_fs::node::CloneError),
43
44    #[error("while listing blobfs dir")]
45    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
46
47    #[error("while deleting blob")]
48    Unlink(#[source] Status),
49
50    #[error("while sync'ing")]
51    Sync(#[source] Status),
52
53    #[error("while parsing blob merkle hash")]
54    ParseHash(#[from] ParseHashError),
55
56    #[error("FIDL error")]
57    Fidl(#[from] fidl::Error),
58
59    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
60    ConnectToBlobCreator(#[source] anyhow::Error),
61
62    #[error("while connecting to fuchsia.fxfs/BlobReader")]
63    ConnectToBlobReader(#[source] anyhow::Error),
64
65    #[error("while setting the VmexResource")]
66    InitVmexResource(#[source] anyhow::Error),
67
68    #[error("directory operation requested but blobfs directory was not configured")]
69    DirectoryNotConfigured,
70
71    #[error("while checking NeedsOverwrite for blob status")]
72    BlobStatus(BlobStatusError),
73}
74
75/// An error encountered while creating a blob
76#[derive(Debug, Error)]
77#[allow(missing_docs)]
78pub enum CreateError {
79    #[error("the blob already exists or is being concurrently written")]
80    AlreadyExists,
81
82    #[error("while creating the blob")]
83    Io(#[source] fuchsia_fs::node::OpenError),
84
85    #[error("while converting the proxy into a client end")]
86    ConvertToClientEnd,
87
88    #[error("FIDL error")]
89    Fidl(#[from] fidl::Error),
90
91    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
92    BlobCreator(ffxfs::CreateBlobError),
93
94    #[error("this client was not created with a blob creator so it cannot write blobs")]
95    WritingNotConfigured,
96}
97
98/// The response to a `BlobCreator.NeedsOverwrite` call, excepting unexpected internal errors.
99pub enum BlobStatus {
100    /// The blob is present and considered up to date.
101    UpToDate,
102
103    /// The blob is present, but should be overwritten.
104    NeedsOverwrite,
105
106    /// The blob is not present.
107    Absent,
108}
109
110impl From<ffxfs::CreateBlobError> for CreateError {
111    fn from(e: ffxfs::CreateBlobError) -> Self {
112        match e {
113            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
114            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
115        }
116    }
117}
118
119/// A builder for [`Client`]
120#[derive(Default)]
121pub struct ClientBuilder {
122    readable: bool,
123    writable: bool,
124    executable: bool,
125    creator: bool,
126}
127
128impl ClientBuilder {
129    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
130    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
131    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
132    pub async fn build(self) -> Result<Client, BlobfsError> {
133        let mut flags = fio::Flags::empty();
134        if self.readable {
135            flags |= fio::PERM_READABLE
136        }
137        if self.writable {
138            flags |= fio::PERM_WRITABLE
139        }
140        if self.executable {
141            flags |= fio::PERM_EXECUTABLE
142        }
143
144        let dir = if !flags.is_empty() {
145            Some(fuchsia_fs::directory::open_in_namespace("/blob", flags)?)
146        } else {
147            None
148        };
149
150        if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
151            fidl_fuchsia_kernel::VmexResourceMarker,
152        >() && let Ok(vmex) = client.get().await
153        {
154            info!("Got vmex resource");
155            vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
156        }
157        let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
158            .map_err(BlobfsError::ConnectToBlobReader)?;
159        let creator = if self.writable || self.creator {
160            Some(
161                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
162                    .map_err(BlobfsError::ConnectToBlobCreator)?,
163            )
164        } else {
165            None
166        };
167
168        Ok(Client { dir, creator, reader })
169    }
170
171    /// If set, [`Client`] will connect to /blob in the current component's namespace with
172    /// [`fio::PERM_READABLE`].
173    pub fn readable(self) -> Self {
174        Self { readable: true, ..self }
175    }
176
177    /// If set, [`Client`] will connect to /blob in the current component's namespace with
178    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file,
179    /// and [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
180    pub fn writable(self) -> Self {
181        Self { writable: true, ..self }
182    }
183
184    /// If set, [`Client`] will connect to /blob in the current component's namespace with
185    /// [`fio::PERM_EXECUTABLE`].
186    pub fn executable(self) -> Self {
187        Self { executable: true, ..self }
188    }
189
190    /// If set, [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
191    /// This is independent of requiring writable directory access to `/blob`.
192    pub fn creator(self) -> Self {
193        Self { creator: true, ..self }
194    }
195}
196
197impl Client {
198    /// Create an empty `ClientBuilder`
199    pub fn builder() -> ClientBuilder {
200        Default::default()
201    }
202}
203/// Blobfs client
204#[derive(Debug, Clone)]
205pub struct Client {
206    dir: Option<fio::DirectoryProxy>,
207    creator: Option<ffxfs::BlobCreatorProxy>,
208    reader: ffxfs::BlobReaderProxy,
209}
210
211impl Client {
212    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
213    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
214    /// as executable. If `creator` is not supplied, writes will fail.
215    pub fn new(
216        dir: fio::DirectoryProxy,
217        creator: Option<ffxfs::BlobCreatorProxy>,
218        reader: ffxfs::BlobReaderProxy,
219        vmex: Option<zx::Resource>,
220    ) -> Result<Self, anyhow::Error> {
221        if let Some(vmex) = vmex {
222            vmo_blob::init_vmex_resource(vmex)?;
223        }
224        Ok(Self { dir: Some(dir), creator, reader })
225    }
226
227    /// Creates a new client backed by the returned request stream. This constructor should not be
228    /// used outside of tests.
229    ///
230    /// # Panics
231    ///
232    /// Panics on error
233    pub fn new_test() -> (
234        Self,
235        fio::DirectoryRequestStream,
236        ffxfs::BlobReaderRequestStream,
237        ffxfs::BlobCreatorRequestStream,
238    ) {
239        let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240        let (reader, reader_stream) =
241            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
242        let (creator, creator_stream) =
243            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
244
245        (
246            Self { dir: Some(dir), creator: Some(creator), reader },
247            dir_stream,
248            reader_stream,
249            creator_stream,
250        )
251    }
252
253    /// Creates a new client backed by the returned mock. This constructor should not be used
254    /// outside of tests.
255    ///
256    /// # Panics
257    ///
258    /// Panics on error
259    pub fn new_mock() -> (Self, mock::Mock) {
260        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
261        let (reader, reader_stream) =
262            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
263        let (creator, creator_stream) =
264            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
265
266        (
267            Self { dir: Some(dir), creator: Some(creator), reader },
268            mock::Mock { stream, reader_stream, creator_stream },
269        )
270    }
271
272    /// Returns the read-only VMO backing the blob.
273    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
274        self.reader
275            .get_vmo(hash)
276            .await
277            .map_err(GetBlobVmoError::Fidl)?
278            .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
279    }
280
281    /// Open a blob for read using open3. `scope` will only be used if the client was configured to
282    /// use fuchsia.fxfs.BlobReader.
283    pub fn open_blob_for_read(
284        &self,
285        blob: &Hash,
286        flags: fio::Flags,
287        scope: ExecutionScope,
288        object_request: ObjectRequestRef<'_>,
289    ) -> Result<(), zx::Status> {
290        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
291            return Err(zx::Status::ACCESS_DENIED);
292        }
293        if flags.creation_mode() != vfs::CreationMode::Never {
294            return Err(zx::Status::NOT_SUPPORTED);
295        }
296        // Errors below will be communicated via the `object_request` channel.
297        let object_request = object_request.take();
298        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
299        Ok(())
300    }
301
302    /// Returns the list of known blobs in blobfs.
303    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
304        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
305        // directory entries. To prevent contention over this index by concurrent calls (either
306        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
307        // or other clones of the DirectoryProxy this object was made from), create a new
308        // connection which will have its own index.
309        let dir = self.dir.as_ref().ok_or(BlobfsError::DirectoryNotConfigured)?;
310        let private_connection = fuchsia_fs::directory::clone(dir)?;
311        fuchsia_fs::directory::readdir(&private_connection)
312            .await
313            .map_err(BlobfsError::ReadDir)?
314            .into_iter()
315            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
316            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
317            .collect()
318    }
319
320    /// Delete the blob with the given merkle hash.
321    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
322        let dir = self.dir.as_ref().ok_or(BlobfsError::DirectoryNotConfigured)?;
323        dir.unlink(&blob.to_string(), &fio::UnlinkOptions::default())
324            .await?
325            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
326    }
327
328    /// Open a new blob for write.
329    pub async fn open_blob_for_write(
330        &self,
331        blob: &Hash,
332        allow_existing: bool,
333    ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
334        let Some(creator) = &self.creator else {
335            return Err(CreateError::WritingNotConfigured);
336        };
337        Ok(creator.create(blob, allow_existing).await??)
338    }
339
340    /// Returns whether blobfs has a blob with the given hash and blobfs considers it up to date.
341    pub async fn blob_present_and_up_to_date(&self, blob: &Hash) -> bool {
342        // This call is only used when we're considering writing a blob, so we should have a
343        // creator.
344        matches!(
345            self.creator.as_ref().expect("Missing BlobCreator access").needs_overwrite(blob).await,
346            Ok(Ok(false))
347        )
348    }
349
350    /// Looks up the current status of a blob using `BlobCreator.NeedsOverwrite`.
351    pub async fn blob_status(&self, blob: &Hash) -> Result<BlobStatus, BlobfsError> {
352        let Some(creator) = &self.creator else {
353            return Err(BlobfsError::BlobStatus(BlobStatusError::WritingNotConfigured));
354        };
355        match creator.needs_overwrite(blob).await? {
356            Ok(true) => Ok(BlobStatus::NeedsOverwrite),
357            Ok(false) => Ok(BlobStatus::UpToDate),
358            Err(status) if status == Status::NOT_FOUND.into_raw() => Ok(BlobStatus::Absent),
359            Err(s) => {
360                Err(BlobfsError::BlobStatus(BlobStatusError::NeedsOverwrite(Status::from_raw(s))))
361            }
362        }
363    }
364
365    /// Determines which blobs of `candidates` are missing from blobfs.
366    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
367    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
368    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
369    /// following race condition:
370    /// 1. blob is partially written by resolve A
371    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
372    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
373    ///    write, which collides with the partially written blob from (1) that is being kept alive
374    ///    by (2) and so fails
375    pub async fn filter_to_missing_blobs(
376        &self,
377        candidates: impl IntoIterator<Item = Hash>,
378    ) -> HashSet<Hash> {
379        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
380        // metadata corruption.
381        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
382        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
383        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
384        // slower and has_blob is faster). The minor speedup on packages with a great number of
385        // missing blobs is not worth a rarely taken branch deep within package resolution.
386        stream::iter(candidates)
387            .map(move |blob| async move {
388                if self.blob_present_and_up_to_date(&blob).await { None } else { Some(blob) }
389            })
390            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
391            // even three concurrent `has_blob` calls.
392            .buffer_unordered(10)
393            .filter_map(|blob| async move { blob })
394            .collect()
395            .await
396    }
397
398    /// Call fuchsia.io/Node.Sync on the blobfs directory.
399    pub async fn sync(&self) -> Result<(), BlobfsError> {
400        let dir = self.dir.as_ref().ok_or(BlobfsError::DirectoryNotConfigured)?;
401        dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
402    }
403}
404
405/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
406/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
407fn open_blob_with_reader<P: ProtocolsExt + Send>(
408    reader: ffxfs::BlobReaderProxy,
409    blob_hash: Hash,
410    scope: ExecutionScope,
411    protocols: P,
412    object_request: ObjectRequest,
413) {
414    scope.clone().spawn(object_request.handle_async(async move |object_request| {
415        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
416            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
417                error!("Blob reader channel closed: {:?}", status);
418                status
419            } else {
420                error!("Transport error on get_vmo: {:?}", fidl_error);
421                zx::Status::INTERNAL
422            }
423        })?;
424        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
425        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
426        object_request
427            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
428            .await
429    }));
430}
431
432#[derive(thiserror::Error, Debug)]
433#[allow(missing_docs)]
434pub enum GetBlobVmoError {
435    #[error("getting the vmo")]
436    GetVmo(#[source] Status),
437
438    #[error("opening the blob")]
439    OpenBlob(#[source] fuchsia_fs::node::OpenError),
440
441    #[error("making a fidl request")]
442    Fidl(#[source] fidl::Error),
443}
444
445#[cfg(test)]
446impl Client {
447    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
448    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
449    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
450    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
451    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
452    /// specific functionality.
453    ///
454    /// # Panics
455    ///
456    /// Panics on error.
457    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
458        Self::new(
459            blobfs.root_dir_proxy().unwrap(),
460            Some(blobfs.blob_creator_proxy().unwrap()),
461            blobfs.blob_reader_proxy().unwrap(),
462            None,
463        )
464        .unwrap()
465    }
466}
467
468#[cfg(test)]
469#[allow(clippy::bool_assert_comparison)]
470mod tests {
471    use super::*;
472    use assert_matches::assert_matches;
473    use blobfs_ramdisk::BlobfsRamdisk;
474    use fuchsia_async as fasync;
475    use futures::stream::TryStreamExt as _;
476    use std::sync::Arc;
477    use test_case::test_case;
478
479    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
480    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
481    #[fuchsia::test]
482    async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
483        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
484        let client = Client::for_ramdisk(&blobfs);
485
486        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
487        blobfs.stop().await.unwrap();
488    }
489
490    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
491    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
492    #[fuchsia::test]
493    async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
494        let blobfs = BlobfsRamdisk::builder()
495            .implementation(blob_impl)
496            .with_blob(&b"blob 1"[..])
497            .with_blob(&b"blob 2"[..])
498            .start()
499            .await
500            .unwrap();
501        let client = Client::for_ramdisk(&blobfs);
502
503        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
504        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
505        blobfs.stop().await.unwrap();
506    }
507
508    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
509    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
510    #[fuchsia::test]
511    async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
512        let blobfs = BlobfsRamdisk::builder()
513            .implementation(blob_impl)
514            .with_blob(&b"blob 1"[..])
515            .with_blob(&b"blob 2"[..])
516            .start()
517            .await
518            .unwrap();
519        let client = Client::for_ramdisk(&blobfs);
520
521        let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
522        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
523
524        let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
525        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
526        blobfs.stop().await.unwrap();
527    }
528
529    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
530    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
531    #[fuchsia::test]
532    async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
533        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
534        let client = Client::for_ramdisk(&blobfs);
535        let blob_merkle = Hash::from([1; 32]);
536
537        assert_matches!(
538            client.delete_blob(&blob_merkle).await,
539            Err(BlobfsError::Unlink(Status::NOT_FOUND))
540        );
541        blobfs.stop().await.unwrap();
542    }
543
544    #[fuchsia::test]
545    async fn delete_blob_mock() {
546        let (client, mut stream, _, _) = Client::new_test();
547        let blob_merkle = Hash::from([1; 32]);
548        fasync::Task::spawn(async move {
549            match stream.try_next().await.unwrap().unwrap() {
550                fio::DirectoryRequest::Unlink { name, responder, .. } => {
551                    assert_eq!(name, blob_merkle.to_string());
552                    responder.send(Ok(())).unwrap();
553                }
554                other => panic!("unexpected request: {other:?}"),
555            }
556        })
557        .detach();
558
559        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
560    }
561
562    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
563    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
564    #[fuchsia::test]
565    async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
566        let blobfs = BlobfsRamdisk::builder()
567            .implementation(blob_impl)
568            .with_blob(&b"blob 1"[..])
569            .start()
570            .await
571            .unwrap();
572        let client = Client::for_ramdisk(&blobfs);
573
574        assert!(
575            client.blob_present_and_up_to_date(&fuchsia_merkle::root_from_slice(b"blob 1")).await
576        );
577        assert!(!client.blob_present_and_up_to_date(&Hash::from([1; 32])).await);
578
579        blobfs.stop().await.unwrap();
580    }
581
582    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
583    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
584    #[fuchsia::test]
585    async fn has_blob_return_false_if_blob_is_partially_written(
586        blob_impl: blobfs_ramdisk::Implementation,
587    ) {
588        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
589        let client = Client::for_ramdisk(&blobfs);
590
591        let content = &[3; 1024];
592        let hash = fuchsia_merkle::root_from_slice(content);
593        let delivery_content =
594            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
595
596        let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
597        assert!(!client.blob_present_and_up_to_date(&hash).await);
598
599        let n = delivery_content.len();
600        let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
601        assert!(!client.blob_present_and_up_to_date(&hash).await);
602
603        let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
604        let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
605        assert!(!client.blob_present_and_up_to_date(&hash).await);
606
607        let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
608        let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
609        assert!(client.blob_present_and_up_to_date(&hash).await);
610
611        blobfs.stop().await.unwrap();
612    }
613
614    async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
615        let hash = fuchsia_merkle::root_from_slice(content);
616        let delivery_content =
617            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
618        let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
619        let vmo = writer
620            .get_vmo(delivery_content.len().try_into().unwrap())
621            .await
622            .expect("a")
623            .map_err(zx::Status::from_raw)
624            .expect("b");
625        let () = vmo.write(&delivery_content, 0).unwrap();
626        let () =
627            writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
628        hash
629    }
630
631    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
632    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
633    #[fuchsia::test]
634    async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
635        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
636        let client = Client::for_ramdisk(&blobfs);
637
638        let missing_hash0 = Hash::from([0; 32]);
639        let missing_hash1 = Hash::from([1; 32]);
640
641        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
642        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
643
644        assert_eq!(
645            client
646                .filter_to_missing_blobs([
647                    missing_hash0,
648                    missing_hash1,
649                    present_blob0,
650                    present_blob1
651                ])
652                .await,
653            HashSet::from([missing_hash0, missing_hash1])
654        );
655
656        blobfs.stop().await.unwrap();
657    }
658
659    #[fuchsia::test]
660    async fn sync() {
661        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
662        let counter_clone = Arc::clone(&counter);
663        let (client, mut stream, _, _) = Client::new_test();
664        fasync::Task::spawn(async move {
665            match stream.try_next().await.unwrap().unwrap() {
666                fio::DirectoryRequest::Sync { responder } => {
667                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
668                    responder.send(Ok(())).unwrap();
669                }
670                other => panic!("unexpected request: {other:?}"),
671            }
672        })
673        .detach();
674
675        assert_matches!(client.sync().await, Ok(()));
676        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
677    }
678
679    #[fuchsia::test]
680    async fn open_blob_for_write_maps_already_exists() {
681        let (blob_creator, mut blob_creator_stream) =
682            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
683        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
684
685        let client = Client::new(
686            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
687            Some(blob_creator),
688            blob_reader,
689            None,
690        )
691        .unwrap();
692
693        fuchsia_async::Task::spawn(async move {
694            match blob_creator_stream.next().await.unwrap().unwrap() {
695                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
696                    assert_eq!(hash, [0; 32]);
697                    assert!(!allow_existing);
698                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
699                }
700                ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
701                    unreachable!("This code path is not yet exercised.");
702                }
703            }
704        })
705        .detach();
706
707        assert_matches!(
708            client.open_blob_for_write(&[0; 32].into(), false).await,
709            Err(CreateError::AlreadyExists)
710        );
711    }
712
713    #[fuchsia::test]
714    async fn concurrent_list_known_blobs_all_return_full_contents() {
715        use futures::StreamExt;
716        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
717        let client = Client::for_ramdisk(&blobfs);
718
719        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
720        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
721        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
722        // each successful listing requires a call to Rewind as well, but it does make conflict
723        // more likely.
724        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
725        for i in 0..256u16 {
726            let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
727        }
728
729        let () = futures::stream::iter(0..100)
730            .for_each_concurrent(None, |_| async {
731                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
732            })
733            .await;
734    }
735}