blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25/// Blobfs client errors.
26#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29    #[error("while opening blobfs dir")]
30    OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32    #[error("while cloning the blobfs dir")]
33    CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35    #[error("while listing blobfs dir")]
36    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38    #[error("while deleting blob")]
39    Unlink(#[source] Status),
40
41    #[error("while sync'ing")]
42    Sync(#[source] Status),
43
44    #[error("while parsing blob merkle hash")]
45    ParseHash(#[from] ParseHashError),
46
47    #[error("FIDL error")]
48    Fidl(#[from] fidl::Error),
49
50    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51    ConnectToBlobCreator(#[source] anyhow::Error),
52
53    #[error("while connecting to fuchsia.fxfs/BlobReader")]
54    ConnectToBlobReader(#[source] anyhow::Error),
55
56    #[error("while setting the VmexResource")]
57    InitVmexResource(#[source] anyhow::Error),
58}
59
60/// An error encountered while creating a blob
61#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64    #[error("the blob already exists or is being concurrently written")]
65    AlreadyExists,
66
67    #[error("while creating the blob")]
68    Io(#[source] fuchsia_fs::node::OpenError),
69
70    #[error("while converting the proxy into a client end")]
71    ConvertToClientEnd,
72
73    #[error("FIDL error")]
74    Fidl(#[from] fidl::Error),
75
76    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77    BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81    fn from(e: ffxfs::CreateBlobError) -> Self {
82        match e {
83            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85        }
86    }
87}
88
89/// A builder for [`Client`]
90#[derive(Default)]
91pub struct ClientBuilder {
92    use_reader: Reader,
93    use_creator: bool,
94    readable: bool,
95    writable: bool,
96    executable: bool,
97}
98
99#[derive(Default)]
100enum Reader {
101    #[default]
102    DontUse,
103    Use {
104        use_vmex: bool,
105    },
106}
107
108impl ClientBuilder {
109    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
110    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
111    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
112    pub async fn build(self) -> Result<Client, BlobfsError> {
113        let mut flags = fio::Flags::empty();
114        if self.readable {
115            flags |= fio::PERM_READABLE
116        }
117        if self.writable {
118            flags |= fio::PERM_WRITABLE
119        }
120        if self.executable {
121            flags |= fio::PERM_EXECUTABLE
122        }
123        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
124        let reader = match self.use_reader {
125            Reader::DontUse => None,
126            Reader::Use { use_vmex } => {
127                if use_vmex {
128                    if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
129                        fidl_fuchsia_kernel::VmexResourceMarker,
130                    >() {
131                        if let Ok(vmex) = client.get().await {
132                            info!("Got vmex resource");
133                            vmo_blob::init_vmex_resource(vmex)
134                                .map_err(BlobfsError::InitVmexResource)?;
135                        }
136                    }
137                }
138                Some(
139                    fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
140                        .map_err(BlobfsError::ConnectToBlobReader)?,
141                )
142            }
143        };
144
145        let creator = if self.use_creator {
146            Some(
147                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
148                    .map_err(BlobfsError::ConnectToBlobCreator)?,
149            )
150        } else {
151            None
152        };
153
154        Ok(Client { dir, creator, reader })
155    }
156
157    /// [`Client`] will connect to and use fuchsia.fxfs/BlobReader for reads. Sets the VmexResource
158    /// for `Client`. The VmexResource is used by `get_backing_memory` to mark blobs as executable.
159    pub fn use_reader(self) -> Self {
160        Self { use_reader: Reader::Use { use_vmex: true }, ..self }
161    }
162
163    /// [`Client`] will connect to and use fuchsia.fxfs/BlobReader for reads. Does not set the
164    /// VmexResource.
165    pub fn use_reader_no_vmex(self) -> Self {
166        Self { use_reader: Reader::Use { use_vmex: false }, ..self }
167    }
168
169    /// If set, [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
170    pub fn use_creator(self) -> Self {
171        Self { use_creator: true, ..self }
172    }
173
174    /// If set, [`Client`] will connect to /blob in the current component's namespace with
175    /// [`fio::PERM_READABLE`].
176    pub fn readable(self) -> Self {
177        Self { readable: true, ..self }
178    }
179
180    /// If set, [`Client`] will connect to /blob in the current component's namespace with
181    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file.
182    pub fn writable(self) -> Self {
183        Self { writable: true, ..self }
184    }
185
186    /// If set, [`Client`] will connect to /blob in the current component's namespace with
187    /// [`fio::PERM_EXECUTABLE`].
188    pub fn executable(self) -> Self {
189        Self { executable: true, ..self }
190    }
191}
192
193impl Client {
194    /// Create an empty `ClientBuilder`
195    pub fn builder() -> ClientBuilder {
196        Default::default()
197    }
198}
199/// Blobfs client
200#[derive(Debug, Clone)]
201pub struct Client {
202    dir: fio::DirectoryProxy,
203    creator: Option<ffxfs::BlobCreatorProxy>,
204    reader: Option<ffxfs::BlobReaderProxy>,
205}
206
207impl Client {
208    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
209    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
210    /// as executable. If `creator` or `reader` is not supplied, writes or reads respectively will
211    /// be performed through the blob directory.
212    pub fn new(
213        dir: fio::DirectoryProxy,
214        creator: Option<ffxfs::BlobCreatorProxy>,
215        reader: Option<ffxfs::BlobReaderProxy>,
216        vmex: Option<zx::Resource>,
217    ) -> Result<Self, anyhow::Error> {
218        if let Some(vmex) = vmex {
219            vmo_blob::init_vmex_resource(vmex)?;
220        }
221        Ok(Self { dir, creator, reader })
222    }
223
224    /// Creates a new client backed by the returned request stream. This constructor should not be
225    /// used outside of tests.
226    ///
227    /// # Panics
228    ///
229    /// Panics on error
230    pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
231        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
232
233        (Self { dir, creator: None, reader: None }, stream)
234    }
235
236    /// Creates a new client backed by the returned mock. This constructor should not be used
237    /// outside of tests.
238    ///
239    /// # Panics
240    ///
241    /// Panics on error
242    pub fn new_mock() -> (Self, mock::Mock) {
243        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
244
245        (Self { dir, creator: None, reader: None }, mock::Mock { stream })
246    }
247
248    /// Returns the read-only VMO backing the blob.
249    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
250        if let Some(reader) = &self.reader {
251            reader
252                .get_vmo(hash)
253                .await
254                .map_err(GetBlobVmoError::Fidl)?
255                .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
256        } else {
257            let blob =
258                fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
259                    .await
260                    .map_err(GetBlobVmoError::OpenBlob)?;
261            blob.get_backing_memory(fio::VmoFlags::READ)
262                .await
263                .map_err(GetBlobVmoError::Fidl)?
264                .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
265        }
266    }
267
268    /// Open a blob for read. `scope` will only be used if the client was configured to use
269    /// fuchsia.fxfs.BlobReader.
270    pub fn deprecated_open_blob_for_read(
271        &self,
272        blob: &Hash,
273        flags: fio::OpenFlags,
274        scope: ExecutionScope,
275        server_end: ServerEnd<fio::NodeMarker>,
276    ) -> Result<(), fidl::Error> {
277        let describe = flags.contains(fio::OpenFlags::DESCRIBE);
278        // Reject requests that attempt to open blobs as writable.
279        if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
280            send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
281            return Ok(());
282        }
283        // Reject requests that attempt to create new blobs.
284        if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
285            send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
286            return Ok(());
287        }
288        // Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open.
289        if let Some(reader) = &self.reader {
290            let object_request = flags.to_object_request(server_end);
291            let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
292            Ok(())
293        } else {
294            self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
295        }
296    }
297
298    /// Open a blob for read using open3. `scope` will only be used if the client was configured to
299    /// use fuchsia.fxfs.BlobReader.
300    pub fn open_blob_for_read(
301        &self,
302        blob: &Hash,
303        flags: fio::Flags,
304        scope: ExecutionScope,
305        object_request: ObjectRequestRef<'_>,
306    ) -> Result<(), zx::Status> {
307        // Reject requests that attempt to open blobs as writable.
308        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
309            return Err(zx::Status::ACCESS_DENIED);
310        }
311        // Reject requests that attempt to create new blobs.
312        if flags.creation_mode() != vfs::CreationMode::Never {
313            return Err(zx::Status::NOT_SUPPORTED);
314        }
315        // Errors below will be communicated via the `object_request` channel.
316        let object_request = object_request.take();
317        // Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open3.
318        if let Some(reader) = &self.reader {
319            let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
320        } else {
321            let _: Result<(), ()> = self
322                .dir
323                .open(
324                    &blob.to_string(),
325                    flags,
326                    &object_request.options(),
327                    object_request.into_channel(),
328                )
329                .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
330        }
331        Ok(())
332    }
333
334    /// Returns the list of known blobs in blobfs.
335    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
336        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
337        // directory entries. To prevent contention over this index by concurrent calls (either
338        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
339        // or other clones of the DirectoryProxy this object was made from), create a new
340        // connection which will have its own index.
341        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
342        fuchsia_fs::directory::readdir(&private_connection)
343            .await
344            .map_err(BlobfsError::ReadDir)?
345            .into_iter()
346            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
347            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
348            .collect()
349    }
350
351    /// Delete the blob with the given merkle hash.
352    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
353        self.dir
354            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
355            .await?
356            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
357    }
358
359    /// Open a new blob for write.
360    pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
361        Ok(if let Some(blob_creator) = &self.creator {
362            fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
363        } else {
364            fpkg::BlobWriter::File(
365                self.open_blob_proxy_from_dir_for_write(blob)
366                    .await?
367                    .into_channel()
368                    .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
369                    .into_zx_channel()
370                    .into(),
371            )
372        })
373    }
374
375    /// Open a new blob for write, unconditionally using the blob directory.
376    async fn open_blob_proxy_from_dir_for_write(
377        &self,
378        blob: &Hash,
379    ) -> Result<fio::FileProxy, CreateError> {
380        let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
381
382        let path = delivery_blob::delivery_blob_path(blob);
383        fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
384            fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
385                CreateError::AlreadyExists
386            }
387            other => CreateError::Io(other),
388        })
389    }
390
391    /// Returns whether blobfs has a blob with the given hash.
392    /// On c++blobfs, this should only be called if there are no concurrent attempts to write the
393    /// blob. On c++blobfs, open connections to even partially written blobs keep the blob alive,
394    /// and so if this call overlaps with a concurrent attempt to create the blob that fails and
395    /// is then retried, this open connection will prevent the partially written blob from being
396    /// removed and block the creation of the new write connection.
397    /// TODO(https://fxbug.dev/294286136) Add GetVmo support to c++blobfs.
398    pub async fn has_blob(&self, blob: &Hash) -> bool {
399        if let Some(reader) = &self.reader {
400            // TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
401            matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
402        } else {
403            let file = match fuchsia_fs::directory::open_file_async(
404                &self.dir,
405                &blob.to_string(),
406                fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
407            ) {
408                Ok(file) => file,
409                Err(_) => return false,
410            };
411
412            let mut events = file.take_event_stream();
413
414            let event = match events.next().await {
415                None => return false,
416                Some(event) => match event {
417                    Err(_) => return false,
418                    Ok(event) => match event {
419                        fio::FileEvent::OnOpen_ { s, info } => {
420                            if Status::from_raw(s) != Status::OK {
421                                return false;
422                            }
423
424                            match info {
425                                Some(info) => match *info {
426                                    fio::NodeInfoDeprecated::File(fio::FileObject {
427                                        event: Some(event),
428                                        stream: _, // TODO(https://fxbug.dev/293606235): Use stream
429                                    }) => event,
430                                    _ => return false,
431                                },
432                                _ => return false,
433                            }
434                        }
435                        fio::FileEvent::OnRepresentation { payload } => match payload {
436                            fio::Representation::File(fio::FileInfo {
437                                observer: Some(event),
438                                stream: _, // TODO(https://fxbug.dev/293606235): Use stream
439                                ..
440                            }) => event,
441                            _ => return false,
442                        },
443                        fio::FileEvent::_UnknownEvent { .. } => return false,
444                    },
445                },
446            };
447
448            // Check that the USER_0 signal has been asserted on the file's event to make sure we
449            // return false on the edge case of the blob is current being written.
450            match event.wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST) {
451                Ok(_) => true,
452                Err(status) => {
453                    if status != Status::TIMED_OUT {
454                        warn!("blobfs: unknown error asserting blob existence: {}", status);
455                    }
456                    false
457                }
458            }
459        }
460    }
461
462    /// Determines which blobs of `candidates` are missing from blobfs.
463    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
464    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
465    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
466    /// following race condition:
467    /// 1. blob is partially written by resolve A
468    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
469    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
470    ///    write, which collides with the partially written blob from (1) that is being kept alive
471    ///    by (2) and so fails
472    pub async fn filter_to_missing_blobs(&self, candidates: &HashSet<Hash>) -> HashSet<Hash> {
473        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
474        // metadata corruption.
475        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
476        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
477        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
478        // slower and has_blob is faster). The minor speedup on packages with a great number of
479        // missing blobs is not worth a rarely taken branch deep within package resolution.
480        stream::iter(candidates.clone())
481            .map(move |blob| async move {
482                if self.has_blob(&blob).await {
483                    None
484                } else {
485                    Some(blob)
486                }
487            })
488            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
489            // even three concurrent `has_blob` calls.
490            .buffer_unordered(10)
491            .filter_map(|blob| async move { blob })
492            .collect()
493            .await
494    }
495
496    /// Call fuchsia.io/Node.Sync on the blobfs directory.
497    pub async fn sync(&self) -> Result<(), BlobfsError> {
498        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
499    }
500}
501
502/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
503/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
504fn open_blob_with_reader<P: ProtocolsExt + Send>(
505    reader: ffxfs::BlobReaderProxy,
506    blob_hash: Hash,
507    scope: ExecutionScope,
508    protocols: P,
509    object_request: ObjectRequest,
510) {
511    scope.clone().spawn(object_request.handle_async(async move |object_request| {
512        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
513            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
514                error!("Blob reader channel closed: {:?}", status);
515                status
516            } else {
517                error!("Transport error on get_vmo: {:?}", fidl_error);
518                zx::Status::INTERNAL
519            }
520        })?;
521        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
522        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
523        object_request
524            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
525            .await
526    }));
527}
528
529#[derive(thiserror::Error, Debug)]
530#[allow(missing_docs)]
531pub enum GetBlobVmoError {
532    #[error("getting the vmo")]
533    GetVmo(#[source] Status),
534
535    #[error("opening the blob")]
536    OpenBlob(#[source] fuchsia_fs::node::OpenError),
537
538    #[error("making a fidl request")]
539    Fidl(#[source] fidl::Error),
540}
541
542#[cfg(test)]
543impl Client {
544    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
545    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
546    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
547    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
548    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
549    /// specific functionality.
550    ///
551    /// # Panics
552    ///
553    /// Panics on error.
554    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
555        Self::new(
556            blobfs.root_dir_proxy().unwrap(),
557            blobfs.blob_creator_proxy().unwrap(),
558            blobfs.blob_reader_proxy().unwrap(),
559            None,
560        )
561        .unwrap()
562    }
563}
564
565#[cfg(test)]
566#[allow(clippy::bool_assert_comparison)]
567mod tests {
568    use super::*;
569    use assert_matches::assert_matches;
570    use blobfs_ramdisk::BlobfsRamdisk;
571    use fuchsia_async as fasync;
572    use futures::stream::TryStreamExt;
573    use maplit::hashset;
574    use std::io::Write as _;
575    use std::sync::Arc;
576
577    #[fasync::run_singlethreaded(test)]
578    async fn list_known_blobs_empty() {
579        let blobfs = BlobfsRamdisk::start().await.unwrap();
580        let client = Client::for_ramdisk(&blobfs);
581
582        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
583        blobfs.stop().await.unwrap();
584    }
585
586    #[fasync::run_singlethreaded(test)]
587    async fn list_known_blobs() {
588        let blobfs = BlobfsRamdisk::builder()
589            .with_blob(&b"blob 1"[..])
590            .with_blob(&b"blob 2"[..])
591            .start()
592            .await
593            .unwrap();
594        let client = Client::for_ramdisk(&blobfs);
595
596        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
597        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
598        blobfs.stop().await.unwrap();
599    }
600
601    #[fasync::run_singlethreaded(test)]
602    async fn delete_blob_and_then_list() {
603        let blobfs = BlobfsRamdisk::builder()
604            .with_blob(&b"blob 1"[..])
605            .with_blob(&b"blob 2"[..])
606            .start()
607            .await
608            .unwrap();
609        let client = Client::for_ramdisk(&blobfs);
610
611        let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
612        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
613
614        let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
615        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
616        blobfs.stop().await.unwrap();
617    }
618
619    #[fasync::run_singlethreaded(test)]
620    async fn delete_non_existing_blob() {
621        let blobfs = BlobfsRamdisk::start().await.unwrap();
622        let client = Client::for_ramdisk(&blobfs);
623        let blob_merkle = Hash::from([1; 32]);
624
625        assert_matches!(
626            client.delete_blob(&blob_merkle).await,
627            Err(BlobfsError::Unlink(Status::NOT_FOUND))
628        );
629        blobfs.stop().await.unwrap();
630    }
631
632    #[fasync::run_singlethreaded(test)]
633    async fn delete_blob_mock() {
634        let (client, mut stream) = Client::new_test();
635        let blob_merkle = Hash::from([1; 32]);
636        fasync::Task::spawn(async move {
637            match stream.try_next().await.unwrap().unwrap() {
638                fio::DirectoryRequest::Unlink { name, responder, .. } => {
639                    assert_eq!(name, blob_merkle.to_string());
640                    responder.send(Ok(())).unwrap();
641                }
642                other => panic!("unexpected request: {other:?}"),
643            }
644        })
645        .detach();
646
647        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
648    }
649
650    #[fasync::run_singlethreaded(test)]
651    async fn has_blob() {
652        let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
653        let client = Client::for_ramdisk(&blobfs);
654
655        assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
656        assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
657
658        blobfs.stop().await.unwrap();
659    }
660
661    #[fasync::run_singlethreaded(test)]
662    async fn has_blob_fxblob() {
663        let blobfs =
664            BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
665        let client = Client::for_ramdisk(&blobfs);
666
667        assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
668        assert!(!client.has_blob(&Hash::from([1; 32])).await);
669
670        blobfs.stop().await.unwrap();
671    }
672
673    #[fasync::run_singlethreaded(test)]
674    async fn has_blob_return_false_if_blob_is_partially_written() {
675        let blobfs = BlobfsRamdisk::start().await.unwrap();
676        let client = Client::for_ramdisk(&blobfs);
677
678        let blob = [3; 1024];
679        let hash = fuchsia_merkle::from_slice(&blob).root();
680
681        let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
682        assert_eq!(client.has_blob(&hash).await, false);
683        file.set_len(blob.len() as u64).unwrap();
684        assert_eq!(client.has_blob(&hash).await, false);
685        file.write_all(&blob[..512]).unwrap();
686        assert_eq!(client.has_blob(&hash).await, false);
687        file.write_all(&blob[512..]).unwrap();
688        assert_eq!(client.has_blob(&hash).await, true);
689
690        blobfs.stop().await.unwrap();
691    }
692
693    async fn resize(blob: &fio::FileProxy, size: usize) {
694        let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
695    }
696
697    async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
698        assert_eq!(
699            blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
700            bytes.len() as u64
701        );
702    }
703
704    #[fasync::run_singlethreaded(test)]
705    async fn write_delivery_blob() {
706        let blobfs = BlobfsRamdisk::start().await.unwrap();
707        let client = Client::for_ramdisk(&blobfs);
708
709        let content = [3; 1024];
710        let hash = fuchsia_merkle::from_slice(&content).root();
711        let delivery_content =
712            delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
713
714        let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
715
716        let () = resize(&proxy, delivery_content.len()).await;
717        let () = write(&proxy, &delivery_content).await;
718
719        assert!(client.has_blob(&hash).await);
720
721        blobfs.stop().await.unwrap();
722    }
723
724    /// Wrapper for a blob and its hash. This lets the tests retain ownership of the Blob,
725    /// which is important because it ensures blobfs will not close partially written blobs for the
726    /// duration of the test.
727    struct TestBlob {
728        _blob: fio::FileProxy,
729        hash: Hash,
730    }
731
732    async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
733        let hash = fuchsia_merkle::from_slice(content).root();
734        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
735        TestBlob { _blob, hash }
736    }
737
738    async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
739        let hash = fuchsia_merkle::from_slice(content).root();
740        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
741        let () = resize(&_blob, content.len()).await;
742        TestBlob { _blob, hash }
743    }
744
745    async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
746        let hash = fuchsia_merkle::from_slice(content).root();
747        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
748        let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
749        let () = resize(&_blob, content.len()).await;
750        let () = write(&_blob, &content[..content.len() / 2]).await;
751        TestBlob { _blob, hash }
752    }
753
754    async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
755        let hash = fuchsia_merkle::from_slice(content).root();
756        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
757        let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
758        let () = resize(&_blob, content.len()).await;
759        let () = write(&_blob, &content).await;
760        TestBlob { _blob, hash }
761    }
762
763    #[fasync::run_singlethreaded(test)]
764    async fn filter_to_missing_blobs() {
765        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
766        let client = Client::for_ramdisk(&blobfs);
767
768        let missing_hash0 = Hash::from([0; 32]);
769        let missing_hash1 = Hash::from([1; 32]);
770
771        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
772        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
773
774        assert_eq!(
775            client
776                .filter_to_missing_blobs(
777                    // Pass in <= 20 candidates so the heuristic is not used.
778                    &hashset! { missing_hash0, missing_hash1,
779                        present_blob0.hash, present_blob1.hash
780                    },
781                )
782                .await,
783            hashset! { missing_hash0, missing_hash1 }
784        );
785
786        blobfs.stop().await.unwrap();
787    }
788
789    /// Similar to the above test, except also test that partially written blobs count as missing.
790    #[fasync::run_singlethreaded(test)]
791    async fn filter_to_missing_blobs_with_partially_written_blobs() {
792        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
793        let client = Client::for_ramdisk(&blobfs);
794
795        // Some blobs are created (but not yet truncated).
796        let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
797
798        // Some are truncated but not written.
799        let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
800
801        // Some are partially written.
802        let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
803
804        // Some are fully written.
805        let present_blob = fully_write_blob(&client, &[3; 1024]).await;
806
807        assert_eq!(
808            client
809                .filter_to_missing_blobs(&hashset! {
810                    missing_blob0.hash,
811                    missing_blob1.hash,
812                    missing_blob2.hash,
813                    present_blob.hash
814                },)
815                .await,
816            // All partially written blobs should count as missing.
817            hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
818        );
819
820        blobfs.stop().await.unwrap();
821    }
822
823    #[fasync::run_singlethreaded(test)]
824    async fn sync() {
825        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
826        let counter_clone = Arc::clone(&counter);
827        let (client, mut stream) = Client::new_test();
828        fasync::Task::spawn(async move {
829            match stream.try_next().await.unwrap().unwrap() {
830                fio::DirectoryRequest::Sync { responder } => {
831                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
832                    responder.send(Ok(())).unwrap();
833                }
834                other => panic!("unexpected request: {other:?}"),
835            }
836        })
837        .detach();
838
839        assert_matches!(client.sync().await, Ok(()));
840        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
841    }
842
843    #[fasync::run_singlethreaded(test)]
844    async fn open_blob_for_write_uses_fxblob_if_configured() {
845        let (blob_creator, mut blob_creator_stream) =
846            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
847        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
848        let client = Client::new(
849            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
850            Some(blob_creator),
851            Some(blob_reader),
852            None,
853        )
854        .unwrap();
855
856        fuchsia_async::Task::spawn(async move {
857            match blob_creator_stream.next().await.unwrap().unwrap() {
858                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
859                    assert_eq!(hash, [0; 32]);
860                    assert!(!allow_existing);
861                    let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
862                }
863            }
864        })
865        .detach();
866
867        assert_matches!(
868            client.open_blob_for_write(&[0; 32].into()).await,
869            Ok(fpkg::BlobWriter::Writer(_))
870        );
871    }
872
873    #[fasync::run_singlethreaded(test)]
874    async fn open_blob_for_write_fxblob_maps_already_exists() {
875        let (blob_creator, mut blob_creator_stream) =
876            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
877        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
878
879        let client = Client::new(
880            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
881            Some(blob_creator),
882            Some(blob_reader),
883            None,
884        )
885        .unwrap();
886
887        fuchsia_async::Task::spawn(async move {
888            match blob_creator_stream.next().await.unwrap().unwrap() {
889                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
890                    assert_eq!(hash, [0; 32]);
891                    assert!(!allow_existing);
892                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
893                }
894            }
895        })
896        .detach();
897
898        assert_matches!(
899            client.open_blob_for_write(&[0; 32].into()).await,
900            Err(CreateError::AlreadyExists)
901        );
902    }
903
904    #[fasync::run_singlethreaded(test)]
905    async fn concurrent_list_known_blobs_all_return_full_contents() {
906        use futures::StreamExt;
907        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
908        let client = Client::for_ramdisk(&blobfs);
909
910        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
911        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
912        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
913        // each successful listing requires a call to Rewind as well, but it does make conflict
914        // more likely.
915        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
916        for i in 0..256u16 {
917            let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
918        }
919
920        let () = futures::stream::iter(0..100)
921            .for_each_concurrent(None, |_| async {
922                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
923            })
924            .await;
925    }
926}