blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25/// Blobfs client errors.
26#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29    #[error("while opening blobfs dir")]
30    OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32    #[error("while cloning the blobfs dir")]
33    CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35    #[error("while listing blobfs dir")]
36    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38    #[error("while deleting blob")]
39    Unlink(#[source] Status),
40
41    #[error("while sync'ing")]
42    Sync(#[source] Status),
43
44    #[error("while parsing blob merkle hash")]
45    ParseHash(#[from] ParseHashError),
46
47    #[error("FIDL error")]
48    Fidl(#[from] fidl::Error),
49
50    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51    ConnectToBlobCreator(#[source] anyhow::Error),
52
53    #[error("while connecting to fuchsia.fxfs/BlobReader")]
54    ConnectToBlobReader(#[source] anyhow::Error),
55
56    #[error("while setting the VmexResource")]
57    InitVmexResource(#[source] anyhow::Error),
58}
59
60/// An error encountered while creating a blob
61#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64    #[error("the blob already exists or is being concurrently written")]
65    AlreadyExists,
66
67    #[error("while creating the blob")]
68    Io(#[source] fuchsia_fs::node::OpenError),
69
70    #[error("while converting the proxy into a client end")]
71    ConvertToClientEnd,
72
73    #[error("FIDL error")]
74    Fidl(#[from] fidl::Error),
75
76    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77    BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81    fn from(e: ffxfs::CreateBlobError) -> Self {
82        match e {
83            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85        }
86    }
87}
88
89/// A builder for [`Client`]
90#[derive(Default)]
91pub struct ClientBuilder {
92    readable: bool,
93    writable: bool,
94    executable: bool,
95}
96
97impl ClientBuilder {
98    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
99    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
100    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
101    pub async fn build(self) -> Result<Client, BlobfsError> {
102        let mut flags = fio::Flags::empty();
103        if self.readable {
104            flags |= fio::PERM_READABLE
105        }
106        if self.writable {
107            flags |= fio::PERM_WRITABLE
108        }
109        if self.executable {
110            flags |= fio::PERM_EXECUTABLE
111        }
112        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
113        if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
114            fidl_fuchsia_kernel::VmexResourceMarker,
115        >() {
116            if let Ok(vmex) = client.get().await {
117                info!("Got vmex resource");
118                vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
119            }
120        }
121        let reader = Some(
122            fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
123                .map_err(BlobfsError::ConnectToBlobReader)?,
124        );
125
126        let creator = if self.writable {
127            Some(
128                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
129                    .map_err(BlobfsError::ConnectToBlobCreator)?,
130            )
131        } else {
132            None
133        };
134
135        Ok(Client { dir, creator, reader })
136    }
137
138    /// If set, [`Client`] will connect to /blob in the current component's namespace with
139    /// [`fio::PERM_READABLE`].
140    pub fn readable(self) -> Self {
141        Self { readable: true, ..self }
142    }
143
144    /// If set, [`Client`] will connect to /blob in the current component's namespace with
145    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file,
146    /// and [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
147    pub fn writable(self) -> Self {
148        Self { writable: true, ..self }
149    }
150
151    /// If set, [`Client`] will connect to /blob in the current component's namespace with
152    /// [`fio::PERM_EXECUTABLE`].
153    pub fn executable(self) -> Self {
154        Self { executable: true, ..self }
155    }
156}
157
158impl Client {
159    /// Create an empty `ClientBuilder`
160    pub fn builder() -> ClientBuilder {
161        Default::default()
162    }
163}
164/// Blobfs client
165#[derive(Debug, Clone)]
166pub struct Client {
167    dir: fio::DirectoryProxy,
168    creator: Option<ffxfs::BlobCreatorProxy>,
169    reader: Option<ffxfs::BlobReaderProxy>,
170}
171
172impl Client {
173    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
174    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
175    /// as executable. If `creator` or `reader` is not supplied, writes or reads respectively will
176    /// be performed through the blob directory.
177    pub fn new(
178        dir: fio::DirectoryProxy,
179        creator: Option<ffxfs::BlobCreatorProxy>,
180        reader: Option<ffxfs::BlobReaderProxy>,
181        vmex: Option<zx::Resource>,
182    ) -> Result<Self, anyhow::Error> {
183        if let Some(vmex) = vmex {
184            vmo_blob::init_vmex_resource(vmex)?;
185        }
186        Ok(Self { dir, creator, reader })
187    }
188
189    /// Creates a new client backed by the returned request stream. This constructor should not be
190    /// used outside of tests.
191    ///
192    /// # Panics
193    ///
194    /// Panics on error
195    pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
196        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
197
198        (Self { dir, creator: None, reader: None }, stream)
199    }
200
201    /// Creates a new client backed by the returned mock. This constructor should not be used
202    /// outside of tests.
203    ///
204    /// # Panics
205    ///
206    /// Panics on error
207    pub fn new_mock() -> (Self, mock::Mock) {
208        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
209
210        (Self { dir, creator: None, reader: None }, mock::Mock { stream })
211    }
212
213    /// Returns the read-only VMO backing the blob.
214    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
215        if let Some(reader) = &self.reader {
216            reader
217                .get_vmo(hash)
218                .await
219                .map_err(GetBlobVmoError::Fidl)?
220                .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
221        } else {
222            let blob =
223                fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
224                    .await
225                    .map_err(GetBlobVmoError::OpenBlob)?;
226            blob.get_backing_memory(fio::VmoFlags::READ)
227                .await
228                .map_err(GetBlobVmoError::Fidl)?
229                .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
230        }
231    }
232
233    /// Open a blob for read. `scope` will only be used if the client was configured to use
234    /// fuchsia.fxfs.BlobReader.
235    pub fn deprecated_open_blob_for_read(
236        &self,
237        blob: &Hash,
238        flags: fio::OpenFlags,
239        scope: ExecutionScope,
240        server_end: ServerEnd<fio::NodeMarker>,
241    ) -> Result<(), fidl::Error> {
242        let describe = flags.contains(fio::OpenFlags::DESCRIBE);
243        // Reject requests that attempt to open blobs as writable.
244        if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
245            send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
246            return Ok(());
247        }
248        // Reject requests that attempt to create new blobs.
249        if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
250            send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
251            return Ok(());
252        }
253        // Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open.
254        if let Some(reader) = &self.reader {
255            let object_request = flags.to_object_request(server_end);
256            let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
257            Ok(())
258        } else {
259            self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
260        }
261    }
262
263    /// Open a blob for read using open3. `scope` will only be used if the client was configured to
264    /// use fuchsia.fxfs.BlobReader.
265    pub fn open_blob_for_read(
266        &self,
267        blob: &Hash,
268        flags: fio::Flags,
269        scope: ExecutionScope,
270        object_request: ObjectRequestRef<'_>,
271    ) -> Result<(), zx::Status> {
272        // Reject requests that attempt to open blobs as writable.
273        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
274            return Err(zx::Status::ACCESS_DENIED);
275        }
276        // Reject requests that attempt to create new blobs.
277        if flags.creation_mode() != vfs::CreationMode::Never {
278            return Err(zx::Status::NOT_SUPPORTED);
279        }
280        // Errors below will be communicated via the `object_request` channel.
281        let object_request = object_request.take();
282        // Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open3.
283        if let Some(reader) = &self.reader {
284            let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
285        } else {
286            let _: Result<(), ()> = self
287                .dir
288                .open(
289                    &blob.to_string(),
290                    flags,
291                    &object_request.options(),
292                    object_request.into_channel(),
293                )
294                .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
295        }
296        Ok(())
297    }
298
299    /// Returns the list of known blobs in blobfs.
300    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
301        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
302        // directory entries. To prevent contention over this index by concurrent calls (either
303        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
304        // or other clones of the DirectoryProxy this object was made from), create a new
305        // connection which will have its own index.
306        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
307        fuchsia_fs::directory::readdir(&private_connection)
308            .await
309            .map_err(BlobfsError::ReadDir)?
310            .into_iter()
311            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
312            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
313            .collect()
314    }
315
316    /// Delete the blob with the given merkle hash.
317    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
318        self.dir
319            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
320            .await?
321            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
322    }
323
324    /// Open a new blob for write.
325    pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
326        Ok(if let Some(blob_creator) = &self.creator {
327            fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
328        } else {
329            fpkg::BlobWriter::File(
330                self.open_blob_proxy_from_dir_for_write(blob)
331                    .await?
332                    .into_channel()
333                    .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
334                    .into_zx_channel()
335                    .into(),
336            )
337        })
338    }
339
340    /// Open a new blob for write, unconditionally using the blob directory.
341    async fn open_blob_proxy_from_dir_for_write(
342        &self,
343        blob: &Hash,
344    ) -> Result<fio::FileProxy, CreateError> {
345        let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
346
347        let path = delivery_blob::delivery_blob_path(blob);
348        fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
349            fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
350                CreateError::AlreadyExists
351            }
352            other => CreateError::Io(other),
353        })
354    }
355
356    /// Returns whether blobfs has a blob with the given hash.
357    /// On c++blobfs, this should only be called if there are no concurrent attempts to write the
358    /// blob. On c++blobfs, open connections to even partially written blobs keep the blob alive,
359    /// and so if this call overlaps with a concurrent attempt to create the blob that fails and
360    /// is then retried, this open connection will prevent the partially written blob from being
361    /// removed and block the creation of the new write connection.
362    /// TODO(https://fxbug.dev/294286136) Add GetVmo support to c++blobfs.
363    pub async fn has_blob(&self, blob: &Hash) -> bool {
364        if let Some(reader) = &self.reader {
365            // TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
366            matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
367        } else {
368            let file = match fuchsia_fs::directory::open_file_async(
369                &self.dir,
370                &blob.to_string(),
371                fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
372            ) {
373                Ok(file) => file,
374                Err(_) => return false,
375            };
376
377            let mut events = file.take_event_stream();
378
379            let event = match events.next().await {
380                None => return false,
381                Some(event) => match event {
382                    Err(_) => return false,
383                    Ok(event) => match event {
384                        fio::FileEvent::OnOpen_ { s, info } => {
385                            if Status::from_raw(s) != Status::OK {
386                                return false;
387                            }
388
389                            match info {
390                                Some(info) => match *info {
391                                    fio::NodeInfoDeprecated::File(fio::FileObject {
392                                        event: Some(event),
393                                        stream: _, // TODO(https://fxbug.dev/293606235): Use stream
394                                    }) => event,
395                                    _ => return false,
396                                },
397                                _ => return false,
398                            }
399                        }
400                        fio::FileEvent::OnRepresentation { payload } => match payload {
401                            fio::Representation::File(fio::FileInfo {
402                                observer: Some(event),
403                                stream: _, // TODO(https://fxbug.dev/293606235): Use stream
404                                ..
405                            }) => event,
406                            _ => return false,
407                        },
408                        fio::FileEvent::_UnknownEvent { .. } => return false,
409                    },
410                },
411            };
412
413            // Check that the USER_0 signal has been asserted on the file's event to make sure we
414            // return false on the edge case of the blob is current being written.
415            match event
416                .wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST)
417                .to_result()
418            {
419                Ok(_) => true,
420                Err(status) => {
421                    if status != Status::TIMED_OUT {
422                        warn!("blobfs: unknown error asserting blob existence: {}", status);
423                    }
424                    false
425                }
426            }
427        }
428    }
429
430    /// Determines which blobs of `candidates` are missing from blobfs.
431    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
432    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
433    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
434    /// following race condition:
435    /// 1. blob is partially written by resolve A
436    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
437    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
438    ///    write, which collides with the partially written blob from (1) that is being kept alive
439    ///    by (2) and so fails
440    pub async fn filter_to_missing_blobs(&self, candidates: &HashSet<Hash>) -> HashSet<Hash> {
441        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
442        // metadata corruption.
443        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
444        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
445        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
446        // slower and has_blob is faster). The minor speedup on packages with a great number of
447        // missing blobs is not worth a rarely taken branch deep within package resolution.
448        stream::iter(candidates.clone())
449            .map(move |blob| async move {
450                if self.has_blob(&blob).await {
451                    None
452                } else {
453                    Some(blob)
454                }
455            })
456            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
457            // even three concurrent `has_blob` calls.
458            .buffer_unordered(10)
459            .filter_map(|blob| async move { blob })
460            .collect()
461            .await
462    }
463
464    /// Call fuchsia.io/Node.Sync on the blobfs directory.
465    pub async fn sync(&self) -> Result<(), BlobfsError> {
466        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
467    }
468}
469
470/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
471/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
472fn open_blob_with_reader<P: ProtocolsExt + Send>(
473    reader: ffxfs::BlobReaderProxy,
474    blob_hash: Hash,
475    scope: ExecutionScope,
476    protocols: P,
477    object_request: ObjectRequest,
478) {
479    scope.clone().spawn(object_request.handle_async(async move |object_request| {
480        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
481            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
482                error!("Blob reader channel closed: {:?}", status);
483                status
484            } else {
485                error!("Transport error on get_vmo: {:?}", fidl_error);
486                zx::Status::INTERNAL
487            }
488        })?;
489        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
490        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
491        object_request
492            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
493            .await
494    }));
495}
496
497#[derive(thiserror::Error, Debug)]
498#[allow(missing_docs)]
499pub enum GetBlobVmoError {
500    #[error("getting the vmo")]
501    GetVmo(#[source] Status),
502
503    #[error("opening the blob")]
504    OpenBlob(#[source] fuchsia_fs::node::OpenError),
505
506    #[error("making a fidl request")]
507    Fidl(#[source] fidl::Error),
508}
509
510#[cfg(test)]
511impl Client {
512    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
513    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
514    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
515    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
516    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
517    /// specific functionality.
518    ///
519    /// # Panics
520    ///
521    /// Panics on error.
522    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
523        Self::new(
524            blobfs.root_dir_proxy().unwrap(),
525            blobfs.blob_creator_proxy().unwrap(),
526            blobfs.blob_reader_proxy().unwrap(),
527            None,
528        )
529        .unwrap()
530    }
531}
532
533#[cfg(test)]
534#[allow(clippy::bool_assert_comparison)]
535mod tests {
536    use super::*;
537    use assert_matches::assert_matches;
538    use blobfs_ramdisk::BlobfsRamdisk;
539    use fuchsia_async as fasync;
540    use futures::stream::TryStreamExt;
541    use maplit::hashset;
542    use std::io::Write as _;
543    use std::sync::Arc;
544
545    #[fasync::run_singlethreaded(test)]
546    async fn list_known_blobs_empty() {
547        let blobfs = BlobfsRamdisk::start().await.unwrap();
548        let client = Client::for_ramdisk(&blobfs);
549
550        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
551        blobfs.stop().await.unwrap();
552    }
553
554    #[fasync::run_singlethreaded(test)]
555    async fn list_known_blobs() {
556        let blobfs = BlobfsRamdisk::builder()
557            .with_blob(&b"blob 1"[..])
558            .with_blob(&b"blob 2"[..])
559            .start()
560            .await
561            .unwrap();
562        let client = Client::for_ramdisk(&blobfs);
563
564        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
565        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
566        blobfs.stop().await.unwrap();
567    }
568
569    #[fasync::run_singlethreaded(test)]
570    async fn delete_blob_and_then_list() {
571        let blobfs = BlobfsRamdisk::builder()
572            .with_blob(&b"blob 1"[..])
573            .with_blob(&b"blob 2"[..])
574            .start()
575            .await
576            .unwrap();
577        let client = Client::for_ramdisk(&blobfs);
578
579        let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
580        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
581
582        let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
583        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
584        blobfs.stop().await.unwrap();
585    }
586
587    #[fasync::run_singlethreaded(test)]
588    async fn delete_non_existing_blob() {
589        let blobfs = BlobfsRamdisk::start().await.unwrap();
590        let client = Client::for_ramdisk(&blobfs);
591        let blob_merkle = Hash::from([1; 32]);
592
593        assert_matches!(
594            client.delete_blob(&blob_merkle).await,
595            Err(BlobfsError::Unlink(Status::NOT_FOUND))
596        );
597        blobfs.stop().await.unwrap();
598    }
599
600    #[fasync::run_singlethreaded(test)]
601    async fn delete_blob_mock() {
602        let (client, mut stream) = Client::new_test();
603        let blob_merkle = Hash::from([1; 32]);
604        fasync::Task::spawn(async move {
605            match stream.try_next().await.unwrap().unwrap() {
606                fio::DirectoryRequest::Unlink { name, responder, .. } => {
607                    assert_eq!(name, blob_merkle.to_string());
608                    responder.send(Ok(())).unwrap();
609                }
610                other => panic!("unexpected request: {other:?}"),
611            }
612        })
613        .detach();
614
615        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
616    }
617
618    #[fasync::run_singlethreaded(test)]
619    async fn has_blob() {
620        let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
621        let client = Client::for_ramdisk(&blobfs);
622
623        assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
624        assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
625
626        blobfs.stop().await.unwrap();
627    }
628
629    #[fasync::run_singlethreaded(test)]
630    async fn has_blob_fxblob() {
631        let blobfs =
632            BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
633        let client = Client::for_ramdisk(&blobfs);
634
635        assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
636        assert!(!client.has_blob(&Hash::from([1; 32])).await);
637
638        blobfs.stop().await.unwrap();
639    }
640
641    #[fasync::run_singlethreaded(test)]
642    async fn has_blob_return_false_if_blob_is_partially_written() {
643        let blobfs = BlobfsRamdisk::start().await.unwrap();
644        let client = Client::for_ramdisk(&blobfs);
645
646        let blob = [3; 1024];
647        let hash = fuchsia_merkle::from_slice(&blob).root();
648
649        let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
650        assert_eq!(client.has_blob(&hash).await, false);
651        file.set_len(blob.len() as u64).unwrap();
652        assert_eq!(client.has_blob(&hash).await, false);
653        file.write_all(&blob[..512]).unwrap();
654        assert_eq!(client.has_blob(&hash).await, false);
655        file.write_all(&blob[512..]).unwrap();
656        assert_eq!(client.has_blob(&hash).await, true);
657
658        blobfs.stop().await.unwrap();
659    }
660
661    async fn resize(blob: &fio::FileProxy, size: usize) {
662        let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
663    }
664
665    async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
666        assert_eq!(
667            blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
668            bytes.len() as u64
669        );
670    }
671
672    #[fasync::run_singlethreaded(test)]
673    async fn write_delivery_blob() {
674        let blobfs = BlobfsRamdisk::start().await.unwrap();
675        let client = Client::for_ramdisk(&blobfs);
676
677        let content = [3; 1024];
678        let hash = fuchsia_merkle::from_slice(&content).root();
679        let delivery_content =
680            delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
681
682        let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
683
684        let () = resize(&proxy, delivery_content.len()).await;
685        let () = write(&proxy, &delivery_content).await;
686
687        assert!(client.has_blob(&hash).await);
688
689        blobfs.stop().await.unwrap();
690    }
691
692    /// Wrapper for a blob and its hash. This lets the tests retain ownership of the Blob,
693    /// which is important because it ensures blobfs will not close partially written blobs for the
694    /// duration of the test.
695    struct TestBlob {
696        _blob: fio::FileProxy,
697        hash: Hash,
698    }
699
700    async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
701        let hash = fuchsia_merkle::from_slice(content).root();
702        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
703        TestBlob { _blob, hash }
704    }
705
706    async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
707        let hash = fuchsia_merkle::from_slice(content).root();
708        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
709        let () = resize(&_blob, content.len()).await;
710        TestBlob { _blob, hash }
711    }
712
713    async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
714        let hash = fuchsia_merkle::from_slice(content).root();
715        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
716        let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
717        let () = resize(&_blob, content.len()).await;
718        let () = write(&_blob, &content[..content.len() / 2]).await;
719        TestBlob { _blob, hash }
720    }
721
722    async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
723        let hash = fuchsia_merkle::from_slice(content).root();
724        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
725        let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
726        let () = resize(&_blob, content.len()).await;
727        let () = write(&_blob, &content).await;
728        TestBlob { _blob, hash }
729    }
730
731    #[fasync::run_singlethreaded(test)]
732    async fn filter_to_missing_blobs() {
733        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
734        let client = Client::for_ramdisk(&blobfs);
735
736        let missing_hash0 = Hash::from([0; 32]);
737        let missing_hash1 = Hash::from([1; 32]);
738
739        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
740        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
741
742        assert_eq!(
743            client
744                .filter_to_missing_blobs(
745                    // Pass in <= 20 candidates so the heuristic is not used.
746                    &hashset! { missing_hash0, missing_hash1,
747                        present_blob0.hash, present_blob1.hash
748                    },
749                )
750                .await,
751            hashset! { missing_hash0, missing_hash1 }
752        );
753
754        blobfs.stop().await.unwrap();
755    }
756
757    /// Similar to the above test, except also test that partially written blobs count as missing.
758    #[fasync::run_singlethreaded(test)]
759    async fn filter_to_missing_blobs_with_partially_written_blobs() {
760        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
761        let client = Client::for_ramdisk(&blobfs);
762
763        // Some blobs are created (but not yet truncated).
764        let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
765
766        // Some are truncated but not written.
767        let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
768
769        // Some are partially written.
770        let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
771
772        // Some are fully written.
773        let present_blob = fully_write_blob(&client, &[3; 1024]).await;
774
775        assert_eq!(
776            client
777                .filter_to_missing_blobs(&hashset! {
778                    missing_blob0.hash,
779                    missing_blob1.hash,
780                    missing_blob2.hash,
781                    present_blob.hash
782                },)
783                .await,
784            // All partially written blobs should count as missing.
785            hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
786        );
787
788        blobfs.stop().await.unwrap();
789    }
790
791    #[fasync::run_singlethreaded(test)]
792    async fn sync() {
793        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
794        let counter_clone = Arc::clone(&counter);
795        let (client, mut stream) = Client::new_test();
796        fasync::Task::spawn(async move {
797            match stream.try_next().await.unwrap().unwrap() {
798                fio::DirectoryRequest::Sync { responder } => {
799                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
800                    responder.send(Ok(())).unwrap();
801                }
802                other => panic!("unexpected request: {other:?}"),
803            }
804        })
805        .detach();
806
807        assert_matches!(client.sync().await, Ok(()));
808        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
809    }
810
811    #[fasync::run_singlethreaded(test)]
812    async fn open_blob_for_write_uses_fxblob_if_configured() {
813        let (blob_creator, mut blob_creator_stream) =
814            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
815        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
816        let client = Client::new(
817            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
818            Some(blob_creator),
819            Some(blob_reader),
820            None,
821        )
822        .unwrap();
823
824        fuchsia_async::Task::spawn(async move {
825            match blob_creator_stream.next().await.unwrap().unwrap() {
826                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
827                    assert_eq!(hash, [0; 32]);
828                    assert!(!allow_existing);
829                    let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
830                }
831            }
832        })
833        .detach();
834
835        assert_matches!(
836            client.open_blob_for_write(&[0; 32].into()).await,
837            Ok(fpkg::BlobWriter::Writer(_))
838        );
839    }
840
841    #[fasync::run_singlethreaded(test)]
842    async fn open_blob_for_write_fxblob_maps_already_exists() {
843        let (blob_creator, mut blob_creator_stream) =
844            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
845        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
846
847        let client = Client::new(
848            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
849            Some(blob_creator),
850            Some(blob_reader),
851            None,
852        )
853        .unwrap();
854
855        fuchsia_async::Task::spawn(async move {
856            match blob_creator_stream.next().await.unwrap().unwrap() {
857                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
858                    assert_eq!(hash, [0; 32]);
859                    assert!(!allow_existing);
860                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
861                }
862            }
863        })
864        .detach();
865
866        assert_matches!(
867            client.open_blob_for_write(&[0; 32].into()).await,
868            Err(CreateError::AlreadyExists)
869        );
870    }
871
872    #[fasync::run_singlethreaded(test)]
873    async fn concurrent_list_known_blobs_all_return_full_contents() {
874        use futures::StreamExt;
875        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
876        let client = Client::for_ramdisk(&blobfs);
877
878        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
879        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
880        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
881        // each successful listing requires a call to Rewind as well, but it does make conflict
882        // more likely.
883        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
884        for i in 0..256u16 {
885            let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
886        }
887
888        let () = futures::stream::iter(0..100)
889            .for_each_concurrent(None, |_| async {
890                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
891            })
892            .await;
893    }
894}