fidl_fuchsia_pkg_ext/
cache.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8//! Wrapper types for [`fidl_fuchsia_pkg::PackageCacheProxy`] and its related protocols.
9
10use crate::types::{BlobId, BlobInfo};
11use fuchsia_pkg::PackageDirectory;
12use futures::prelude::*;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use zx_status::Status;
16use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_pkg as fpkg};
17
18/// An open connection to a provider of the `fuchsia.pkg.PackageCache`.
19#[derive(Debug, Clone)]
20pub struct Client {
21    proxy: fpkg::PackageCacheProxy,
22}
23
24impl Client {
25    /// Constructs a client from the given proxy.
26    pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
27        Self { proxy }
28    }
29
30    /// Returns a reference to the underlying PackageCacheProxy connection.
31    pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
32        &self.proxy
33    }
34
35    /// Opens the package specified by `meta_far_blob` with the intent to fetch any missing blobs
36    /// using the returned [`Get`] type if needed.
37    pub fn get(
38        &self,
39        meta_far_blob: BlobInfo,
40        gc_protection: fpkg::GcProtection,
41    ) -> Result<Get, fidl::Error> {
42        let (needed_blobs, needed_blobs_server_end) =
43            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
44        let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
45
46        let get_fut = self.proxy.get(
47            &meta_far_blob.into(),
48            gc_protection,
49            needed_blobs_server_end,
50            pkg_dir_server_end,
51        );
52
53        Ok(Get {
54            get_fut,
55            pkg_dir,
56            needed_blobs,
57            pkg_present: SharedBoolEvent::new(),
58            meta_far: meta_far_blob.blob_id,
59        })
60    }
61
62    /// Uses PackageCache.Get to obtain the package directory of a package that is already cached
63    /// (all blobs are already in blobfs).
64    /// Errors if the package is not already cached.
65    /// Always uses open package tracking GC protection, because OTA (the only client of Retained
66    /// GC protection), should never need to get an already cached package.
67    ///
68    /// Compared to `get_cached`:
69    ///   * Activates `meta_far_blob` in the dynamic index
70    ///   * Must not be called concurrently with the same `meta_far_blob`
71    pub async fn get_already_cached(
72        &self,
73        meta_far_blob: BlobId,
74    ) -> Result<PackageDirectory, GetAlreadyCachedError> {
75        let mut get = self
76            .get(
77                BlobInfo { blob_id: meta_far_blob, length: 0 },
78                fpkg::GcProtection::OpenPackageTracking,
79            )
80            .map_err(GetAlreadyCachedError::Get)?;
81        if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
82            return Err(GetAlreadyCachedError::MissingMetaFar);
83        }
84
85        if let Some(missing_blobs) = get
86            .get_missing_blobs()
87            .try_next()
88            .await
89            .map_err(GetAlreadyCachedError::GetMissingBlobs)?
90        {
91            return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
92        }
93
94        get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
95    }
96
97    /// Uses PackageCache.GetSubpackage to obtain the package directory of a subpackage.
98    /// Errors if there is not an open connection to the superpackage.
99    pub async fn get_subpackage(
100        &self,
101        superpackage: BlobId,
102        subpackage: &fuchsia_url::RelativePackageUrl,
103    ) -> Result<PackageDirectory, GetSubpackageError> {
104        let (dir, dir_server_end) =
105            PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
106        let () = self
107            .proxy
108            .get_subpackage(
109                &superpackage.into(),
110                &fpkg::PackageUrl { url: subpackage.into() },
111                dir_server_end,
112            )
113            .await
114            .map_err(GetSubpackageError::CallingGetSubpackage)??;
115        Ok(dir)
116    }
117
118    /// Write blobs using the returned [`WriteBlobs`] type.
119    pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
120        let (needed_blobs, needed_blobs_server_end) =
121            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
122
123        let () = self.proxy.write_blobs(needed_blobs_server_end)?;
124
125        Ok(WriteBlobs { needed_blobs })
126    }
127}
128
129#[derive(thiserror::Error, Debug)]
130#[allow(missing_docs)]
131pub enum GetAlreadyCachedError {
132    #[error("calling get")]
133    Get(#[source] fidl::Error),
134
135    #[error("opening meta blob")]
136    OpenMetaBlob(#[source] OpenBlobError),
137
138    #[error("meta.far blob not cached")]
139    MissingMetaFar,
140
141    #[error("getting missing blobs")]
142    GetMissingBlobs(#[source] ListMissingBlobsError),
143
144    #[error("content blobs not cached {0:?}")]
145    MissingContentBlobs(Vec<BlobInfo>),
146
147    #[error("finishing get")]
148    FinishGet(#[source] GetError),
149}
150
151impl GetAlreadyCachedError {
152    /// Returns true if the get failed because the package was not cached.
153    pub fn was_not_cached(&self) -> bool {
154        use GetAlreadyCachedError::*;
155        match self {
156            Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
157            MissingMetaFar | MissingContentBlobs(..) => true,
158        }
159    }
160}
161
162#[derive(thiserror::Error, Debug)]
163#[allow(missing_docs)]
164pub enum GetSubpackageError {
165    #[error("creating handles")]
166    CreatingHandles(#[source] fidl::Error),
167
168    #[error("calling GetCached FIDL")]
169    CallingGetSubpackage(#[source] fidl::Error),
170
171    #[error("the superpackage does not have an open package connection")]
172    SuperpackageClosed,
173
174    #[error("the subpackage does not exist")]
175    DoesNotExist,
176
177    #[error("internal")]
178    Internal,
179}
180
181impl From<fpkg::GetSubpackageError> for GetSubpackageError {
182    fn from(fidl: fpkg::GetSubpackageError) -> Self {
183        use GetSubpackageError::*;
184        use fpkg::GetSubpackageError as fErr;
185        match fidl {
186            fErr::SuperpackageClosed => SuperpackageClosed,
187            fErr::DoesNotExist => DoesNotExist,
188            fErr::Internal => Internal,
189        }
190    }
191}
192
193#[derive(Debug, Clone)]
194struct SharedBoolEvent(Arc<AtomicBool>);
195
196impl SharedBoolEvent {
197    fn new() -> Self {
198        Self(Arc::new(AtomicBool::new(false)))
199    }
200
201    fn get(&self) -> bool {
202        self.0.load(Ordering::SeqCst)
203    }
204
205    fn set(&self) {
206        self.0.store(true, Ordering::SeqCst)
207    }
208}
209
210async fn open_blob(
211    needed_blobs: &fpkg::NeededBlobsProxy,
212    kind: OpenKind,
213    blob_id: BlobId,
214    pkg_present: Option<&SharedBoolEvent>,
215) -> Result<Option<NeededBlob>, OpenBlobError> {
216    let open_fut = match kind {
217        OpenKind::Meta => needed_blobs.open_meta_blob(),
218        OpenKind::Content => needed_blobs.open_blob(&blob_id.into()),
219    };
220    match open_fut.await {
221        Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
222            if let Some(pkg_present) = pkg_present {
223                pkg_present.set();
224            }
225            Ok(None)
226        }
227        res => {
228            if let Some(blob) = res?? {
229                Ok(Some(NeededBlob {
230                    blob: Blob {
231                        needed_blobs: needed_blobs.clone(),
232                        blob_id,
233                        state: NeedsTruncate(blob),
234                    },
235                }))
236            } else {
237                Ok(None)
238            }
239        }
240    }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244enum OpenKind {
245    Meta,
246    Content,
247}
248
249/// A deferred call to [`Get::open_meta_blob`] or [`Get::open_blob`].
250#[derive(Debug)]
251pub struct DeferredOpenBlob {
252    needed_blobs: fpkg::NeededBlobsProxy,
253    kind: OpenKind,
254    blob_id: BlobId,
255    pkg_present: Option<SharedBoolEvent>,
256}
257
258impl DeferredOpenBlob {
259    /// Opens the blob for write, if it is still needed. The blob's data can be provided using the
260    /// returned NeededBlob.
261    pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
262        open_blob(&self.needed_blobs, self.kind, self.blob_id, self.pkg_present.as_ref()).await
263    }
264
265    fn proxy_cmp_key(&self) -> u32 {
266        use fidl::AsHandleRef;
267        use fidl::endpoints::Proxy;
268        self.needed_blobs.as_channel().raw_handle()
269    }
270}
271
272impl std::cmp::PartialEq for DeferredOpenBlob {
273    fn eq(&self, other: &Self) -> bool {
274        self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
275    }
276}
277
278impl std::cmp::Eq for DeferredOpenBlob {}
279
280/// A pending `fuchsia.pkg/PackageCache.Get()` request. Clients must, in order:
281/// 1. open/write the meta blob, if Some(NeededBlob) is provided by that API
282/// 2. enumerate all missing content blobs
283/// 3. open/write all missing content blobs, if Some(NeededBlob) is provided by that API
284/// 4. finish() to complete the Get() request.
285#[derive(Debug)]
286pub struct Get {
287    get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
288    needed_blobs: fpkg::NeededBlobsProxy,
289    pkg_dir: PackageDirectory,
290    pkg_present: SharedBoolEvent,
291    meta_far: BlobId,
292}
293
294impl Get {
295    /// Returns an independent object that can be used to open the meta blob for write.  See
296    /// [`Self::open_meta_blob`].
297    pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
298        DeferredOpenBlob {
299            needed_blobs: self.needed_blobs.clone(),
300            kind: OpenKind::Meta,
301            blob_id: self.meta_far,
302            pkg_present: Some(self.pkg_present.clone()),
303        }
304    }
305
306    /// Opens the meta blob for write, if it is still needed. The blob's data can be provided using
307    /// the returned NeededBlob.
308    pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
309        open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present)).await
310    }
311
312    fn start_get_missing_blobs(
313        &mut self,
314    ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
315        if self.pkg_present.get() {
316            return Ok(None);
317        }
318
319        let (blob_iterator, blob_iterator_server_end) =
320            fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
321
322        self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
323        Ok(Some(blob_iterator))
324    }
325
326    /// Determines the set of blobs that the caller must open/write to complete this `Get()`
327    /// operation.
328    /// The returned stream will never yield an empty `Vec`.
329    /// Callers should process the missing blobs (via `make_open_blob` or `open_blob`) concurrently
330    /// with reading the stream to guarantee stream termination.
331    pub fn get_missing_blobs(
332        &mut self,
333    ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin {
334        match self.start_get_missing_blobs() {
335            Ok(option_iter) => match option_iter {
336                Some(iterator) => crate::fidl_iterator_to_stream(iterator)
337                    .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
338                    .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
339                    .left_stream(),
340                None => futures::stream::empty().right_stream(),
341            }
342            .left_stream(),
343            Err(e) => {
344                futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
345                    .right_stream()
346            }
347        }
348    }
349
350    /// Returns an independent object that can be used to open the `content_blob` for write.  See
351    /// [`Self::open_blob`].
352    pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
353        DeferredOpenBlob {
354            needed_blobs: self.needed_blobs.clone(),
355            kind: OpenKind::Content,
356            blob_id: content_blob,
357            pkg_present: None,
358        }
359    }
360
361    /// Opens `content_blob` for write, if it is still needed. The blob's data can be provided
362    /// using the returned NeededBlob.
363    pub async fn open_blob(
364        &mut self,
365        content_blob: BlobId,
366    ) -> Result<Option<NeededBlob>, OpenBlobError> {
367        open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None).await
368    }
369
370    /// Notifies the endpoint that all blobs have been written and wait for the response to the
371    /// pending `Get()` request, returning the cached [`PackageDirectory`].
372    pub async fn finish(self) -> Result<PackageDirectory, GetError> {
373        drop(self.needed_blobs);
374        let () = self.get_fut.await?.map_err(Status::from_raw)?;
375        Ok(self.pkg_dir)
376    }
377
378    /// Aborts this caching operation for the package.
379    pub async fn abort(self) {
380        self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
381        // The package is not guaranteed to be removed from the dynamic index after abort
382        // returns, we have to wait until finish returns (to prevent a resolve retry from
383        // racing). The finish call will return an error that just tells us that we called
384        // abort, so we ignore it.
385        let _ = self.get_fut.await;
386    }
387}
388
389/// A pending `fuchsia.pkg/PackageCache.WriteBlob()` request.
390#[derive(Clone, Debug)]
391pub struct WriteBlobs {
392    needed_blobs: fpkg::NeededBlobsProxy,
393}
394
395impl WriteBlobs {
396    /// Returns an independent object that can be used to open the `blob` for write.  See
397    /// [`Self::open_blob`].
398    pub fn make_open_blob(&mut self, blob: BlobId) -> DeferredOpenBlob {
399        DeferredOpenBlob {
400            needed_blobs: self.needed_blobs.clone(),
401            kind: OpenKind::Content,
402            blob_id: blob,
403            pkg_present: None,
404        }
405    }
406
407    /// Opens `blob` for write. The blob's data can be provided using the returned NeededBlob.
408    pub async fn open_blob(&mut self, blob: BlobId) -> Result<Option<NeededBlob>, OpenBlobError> {
409        open_blob(&self.needed_blobs, OpenKind::Content, blob, None).await
410    }
411}
412
413/// A blob that needs to be written.
414#[derive(Debug)]
415pub struct NeededBlob {
416    /// Typestate wrapper around the blob. Clients must first call truncate(), then write() until
417    /// all data is provided.
418    pub blob: Blob<NeedsTruncate>,
419}
420
421/// The successful result of truncating a blob.
422#[derive(Debug)]
423pub enum TruncateBlobSuccess {
424    /// The blob contents need to be written.
425    NeedsData(Blob<NeedsData>),
426
427    /// The blob is fully written (it was the empty blob) and now a
428    /// fuchsia.pkg.NeededBlobs.BlobWritten message should be sent.
429    AllWritten(Blob<NeedsBlobWritten>),
430}
431
432/// The successful result of writing some data to a blob.
433#[derive(Debug)]
434pub enum BlobWriteSuccess {
435    /// There is still more data to write.
436    NeedsData(Blob<NeedsData>),
437
438    /// The blob is fully written and now a fuchsia.pkg.NeededBlobs.BlobWritten
439    /// message should be sent.
440    AllWritten(Blob<NeedsBlobWritten>),
441}
442
443/// State for a blob that can be truncated.
444#[derive(Debug)]
445pub struct NeedsTruncate(fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>);
446
447/// State for a blob that can be written to.
448#[derive(Debug)]
449pub struct NeedsData {
450    size: u64,
451    written: u64,
452    writer: blob_writer::BlobWriter,
453}
454
455/// State for a blob that has been fully written but that needs a
456/// fuchsia.pkg.NeededBlobs.BlobWritten message sent to pkg-cache.
457#[derive(Debug)]
458pub struct NeedsBlobWritten;
459
460/// A blob in the process of being written.
461#[derive(Debug)]
462#[must_use]
463pub struct Blob<S> {
464    needed_blobs: fpkg::NeededBlobsProxy,
465    blob_id: BlobId,
466    state: S,
467}
468
469impl Blob<NeedsTruncate> {
470    /// Truncates the blob to the given size. On success, the blob enters the writable state.
471    pub async fn truncate(self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
472        let Self { needed_blobs, blob_id, state: NeedsTruncate(blob) } = self;
473
474        let writer = blob_writer::BlobWriter::create(blob.into_proxy(), size).await.map_err(
475            |e| match e {
476                blob_writer::CreateError::GetVmo(s) if s == Status::NO_SPACE => {
477                    TruncateBlobError::NoSpace
478                }
479                _ => TruncateBlobError::CreateBlobWriter(e),
480            },
481        )?;
482
483        Ok(if size == 0 {
484            TruncateBlobSuccess::AllWritten(Blob { needed_blobs, blob_id, state: NeedsBlobWritten })
485        } else {
486            TruncateBlobSuccess::NeedsData(Blob {
487                needed_blobs,
488                blob_id,
489                state: NeedsData { size, written: 0, writer },
490            })
491        })
492    }
493}
494
495impl Blob<NeedsData> {
496    /// Writes all of the given buffer to the blob.
497    ///
498    /// # Panics
499    ///
500    /// Panics if a write is attempted with a buf larger than the remaining blob size.
501    pub fn write(
502        self,
503        buf: &[u8],
504    ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
505        self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
506    }
507
508    /// Writes all of the given buffer to the blob.
509    ///
510    /// `after_write` and `after_write_ack` are called before and after, respectively, waiting for
511    /// the server to acknowledge writes.
512    /// They may be called multiple times if the write of `buf` is chunked.
513    /// `after_write` is given the size of each write in bytes.
514    /// Useful for creating trace spans.
515    ///
516    /// # Panics
517    ///
518    /// Panics if a write is attempted with a buf larger than the remaining blob size.
519    pub async fn write_with_trace_callbacks(
520        mut self,
521        buf: &[u8],
522        after_write: &(dyn Fn(u64) + Send + Sync),
523        after_write_ack: &(dyn Fn() + Send + Sync),
524    ) -> Result<BlobWriteSuccess, WriteBlobError> {
525        assert!(self.state.written + buf.len() as u64 <= self.state.size);
526
527        let fut = self.state.writer.write(buf);
528        let () = after_write(buf.len() as u64);
529        let res = fut.await;
530        let () = after_write_ack();
531        let () = res.map_err(|e| match e {
532            e @ blob_writer::WriteError::BytesReady(s) => match s {
533                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
534                Status::NO_SPACE => WriteBlobError::NoSpace,
535                _ => WriteBlobError::FxBlob(e),
536            },
537            e => WriteBlobError::FxBlob(e),
538        })?;
539
540        self.state.written += buf.len() as u64;
541
542        if self.state.written == self.state.size {
543            let Self { needed_blobs, blob_id, state: _ } = self;
544            Ok(BlobWriteSuccess::AllWritten(Blob {
545                needed_blobs,
546                blob_id,
547                state: NeedsBlobWritten,
548            }))
549        } else {
550            Ok(BlobWriteSuccess::NeedsData(self))
551        }
552    }
553}
554
555impl Blob<NeedsBlobWritten> {
556    /// Tells pkg-cache that the blob has been successfully written and can now be read.
557    pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
558        Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
559    }
560}
561
562/// An error encountered while opening a package.
563#[derive(Debug, thiserror::Error)]
564#[allow(missing_docs)]
565pub enum OpenError {
566    #[error("the package does not exist")]
567    NotFound,
568
569    #[error("Open() responded with an unexpected status")]
570    UnexpectedResponse(#[source] Status),
571
572    #[error("transport error")]
573    Fidl(#[from] fidl::Error),
574}
575/// An error encountered while caching a package.
576#[derive(Debug, thiserror::Error)]
577#[allow(missing_docs)]
578pub enum GetError {
579    #[error("Get() responded with an unexpected status")]
580    UnexpectedResponse(#[from] Status),
581
582    #[error("transport error")]
583    Fidl(#[from] fidl::Error),
584}
585
586/// An error encountered while opening a metadata or content blob for write.
587#[derive(Debug, thiserror::Error)]
588#[allow(missing_docs)]
589pub enum OpenBlobError {
590    #[error("there is insufficient storage space available to persist this blob")]
591    OutOfSpace,
592
593    #[error("this blob is already open for write by another cache operation")]
594    ConcurrentWrite,
595
596    #[error("an unspecified error occurred during underlying I/O")]
597    UnspecifiedIo,
598
599    #[error("an unspecified error occurred")]
600    Internal,
601
602    #[error("transport error")]
603    Fidl(#[from] fidl::Error),
604}
605
606impl From<fpkg::OpenBlobError> for OpenBlobError {
607    fn from(e: fpkg::OpenBlobError) -> Self {
608        match e {
609            fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
610            fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
611            fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
612            fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
613        }
614    }
615}
616
617/// An error encountered while enumerating missing content blobs.
618#[derive(Debug, thiserror::Error)]
619#[allow(missing_docs)]
620pub enum ListMissingBlobsError {
621    #[error("while obtaining the missing blobs fidl iterator")]
622    CallGetMissingBlobs(#[source] fidl::Error),
623
624    #[error("while obtaining the next chunk of blobs from the fidl iterator")]
625    CallNextOnBlobIterator(#[source] fidl::Error),
626}
627
628/// An error encountered while truncating a blob
629#[derive(Debug, thiserror::Error)]
630#[allow(missing_docs)]
631pub enum TruncateBlobError {
632    #[error("insufficient storage space is available")]
633    NoSpace,
634
635    #[error("creating blob writer")]
636    CreateBlobWriter(#[source] blob_writer::CreateError),
637
638    #[error("transport error")]
639    Fidl(#[from] fidl::Error),
640
641    #[error("blob is in an invalid state")]
642    BadState,
643}
644
645/// An error encountered while writing a blob.
646#[derive(Debug, thiserror::Error)]
647#[allow(missing_docs)]
648pub enum WriteBlobError {
649    #[error("the written data was corrupt")]
650    Corrupt,
651
652    #[error("insufficient storage space is available")]
653    NoSpace,
654
655    #[error("transport error")]
656    Fidl(#[from] fidl::Error),
657
658    #[error("while using the fxblob writer")]
659    FxBlob(#[source] blob_writer::WriteError),
660}
661
662/// An error encountered while sending the BlobWritten message.
663#[derive(Debug, thiserror::Error)]
664#[allow(missing_docs)]
665pub enum BlobWrittenError {
666    #[error("pkg-cache could not find the blob after it was successfully written")]
667    MissingAfterWritten,
668
669    #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
670    UnopenedBlob,
671
672    #[error("transport error")]
673    Fidl(#[from] fidl::Error),
674}
675
676impl From<fpkg::BlobWrittenError> for BlobWrittenError {
677    fn from(e: fpkg::BlobWrittenError) -> Self {
678        match e {
679            fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
680            fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
681        }
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use assert_matches::assert_matches;
689    use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
690    use fidl_fuchsia_io as fio;
691    use fidl_fuchsia_pkg::{
692        BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
693        PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
694        PackageCacheRequestStream,
695    };
696    use zx::HandleBased as _;
697
698    struct MockPackageCache {
699        stream: PackageCacheRequestStream,
700    }
701
702    impl MockPackageCache {
703        fn new() -> (Client, Self) {
704            let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
705            (Client::from_proxy(proxy), Self { stream })
706        }
707
708        async fn expect_get(
709            &mut self,
710            blob_info: BlobInfo,
711            expected_gc_protection: fpkg::GcProtection,
712        ) -> PendingGet {
713            match self.stream.next().await {
714                Some(Ok(PackageCacheRequest::Get {
715                    meta_far_blob,
716                    gc_protection,
717                    needed_blobs,
718                    dir,
719                    responder,
720                })) => {
721                    assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
722                    assert_eq!(gc_protection, expected_gc_protection);
723                    let needed_blobs = needed_blobs.into_stream();
724                    let dir = dir.into_stream();
725
726                    PendingGet { stream: needed_blobs, dir, responder }
727                }
728                r => panic!("Unexpected request: {r:?}"),
729            }
730        }
731
732        async fn expect_closed(mut self) {
733            assert_matches!(self.stream.next().await, None);
734        }
735    }
736
737    struct PendingGet {
738        stream: NeededBlobsRequestStream,
739        dir: fio::DirectoryRequestStream,
740        responder: PackageCacheGetResponder,
741    }
742
743    impl PendingGet {
744        async fn new() -> (Get, PendingGet) {
745            let (client, mut server) = MockPackageCache::new();
746
747            let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
748            let pending_get =
749                server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
750            (get, pending_get)
751        }
752
753        fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
754            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
755            self.responder.send(Ok(())).unwrap();
756            (self.stream, PackageDirProvider { stream: self.dir })
757        }
758
759        fn finish(self) -> PackageDirProvider {
760            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
761            self.responder.send(Ok(())).unwrap();
762            PackageDirProvider { stream: self.dir }
763        }
764
765        #[cfg(target_os = "fuchsia")]
766        fn fail_the_get(self) {
767            self.responder
768                .send(Err(Status::IO_INVALID.into_raw()))
769                .expect("client should be waiting");
770        }
771
772        async fn expect_open_meta_blob(
773            mut self,
774            res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
775        ) -> Self {
776            match self.stream.next().await {
777                Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
778                    responder.send(res).unwrap();
779                }
780                r => panic!("Unexpected request: {r:?}"),
781            }
782            self
783        }
784
785        async fn expect_open_blob(
786            mut self,
787            expected_blob_id: BlobId,
788            res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
789        ) -> Self {
790            match self.stream.next().await {
791                Some(Ok(NeededBlobsRequest::OpenBlob { blob_id, responder })) => {
792                    assert_eq!(BlobId::from(blob_id), expected_blob_id);
793                    responder.send(res).unwrap();
794                }
795                r => panic!("Unexpected request: {r:?}"),
796            }
797            self
798        }
799
800        async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
801            match self.stream.next().await {
802                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
803                    let mut stream = iterator.into_stream();
804
805                    // Respond to each next request with the next chunk.
806                    for chunk in response_chunks {
807                        let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
808
809                        let BlobInfoIteratorRequest::Next { responder } =
810                            stream.next().await.unwrap().unwrap();
811                        responder.send(&chunk).unwrap();
812                    }
813
814                    // Then respond with an empty chunk.
815                    let BlobInfoIteratorRequest::Next { responder } =
816                        stream.next().await.unwrap().unwrap();
817                    responder.send(&[]).unwrap();
818
819                    // Expect the client to stop asking.
820                    assert_matches!(stream.next().await, None);
821                }
822                r => panic!("Unexpected request: {r:?}"),
823            }
824            self
825        }
826
827        async fn expect_get_missing_blobs_client_closes_channel(
828            mut self,
829            response_chunks: Vec<Vec<BlobInfo>>,
830        ) -> Self {
831            match self.stream.next().await {
832                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
833                    let mut stream = iterator.into_stream();
834
835                    // Respond to each next request with the next chunk.
836                    for chunk in response_chunks {
837                        let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
838
839                        let BlobInfoIteratorRequest::Next { responder } =
840                            stream.next().await.unwrap().unwrap();
841                        responder.send(&chunk).unwrap();
842                    }
843
844                    // The client closes the channel before we can respond with an empty chunk.
845                    assert_matches!(stream.next().await, None);
846                }
847                r => panic!("Unexpected request: {r:?}"),
848            }
849            self
850        }
851
852        async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
853            match self.stream.next().await {
854                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
855                    iterator
856                        .into_stream_and_control_handle()
857                        .1
858                        .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
859                }
860                r => panic!("Unexpected request: {r:?}"),
861            }
862            self
863        }
864
865        #[cfg(target_os = "fuchsia")]
866        async fn expect_abort(mut self) -> Self {
867            match self.stream.next().await {
868                Some(Ok(NeededBlobsRequest::Abort { responder })) => {
869                    responder.send().unwrap();
870                }
871                r => panic!("Unexpected request: {r:?}"),
872            }
873            self
874        }
875    }
876
877    struct PackageDirProvider {
878        stream: fio::DirectoryRequestStream,
879    }
880
881    impl PackageDirProvider {
882        fn close_pkg_dir(self) {
883            self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
884        }
885    }
886
887    fn blob_id(n: u8) -> BlobId {
888        BlobId::from([n; 32])
889    }
890
891    fn blob_info(n: u8) -> BlobInfo {
892        BlobInfo { blob_id: blob_id(n), length: 0 }
893    }
894
895    #[fuchsia_async::run_singlethreaded(test)]
896    async fn constructor() {
897        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
898        let client = Client::from_proxy(proxy);
899
900        drop(stream);
901        assert_matches!(client.proxy().sync().await, Err(_));
902    }
903
904    #[fuchsia_async::run_singlethreaded(test)]
905    async fn get_present_package() {
906        let (client, mut server) = MockPackageCache::new();
907
908        let ((), ()) = future::join(
909            async {
910                server
911                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
912                    .await
913                    .finish()
914                    .close_pkg_dir();
915                server.expect_closed().await;
916            },
917            async move {
918                let mut get =
919                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
920
921                assert_matches!(get.open_meta_blob().await.unwrap(), None);
922                assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
923                let pkg_dir = get.finish().await.unwrap();
924
925                assert_matches!(
926                    pkg_dir.into_proxy().take_event_stream().next().await,
927                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
928                );
929            },
930        )
931        .await;
932    }
933
934    #[fuchsia_async::run_singlethreaded(test)]
935    async fn get_present_package_handles_slow_stream_close() {
936        let (client, mut server) = MockPackageCache::new();
937
938        let (send, recv) = futures::channel::oneshot::channel::<()>();
939
940        let ((), ()) = future::join(
941            async {
942                let (needed_blobs_stream, pkg_dir) = server
943                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
944                    .await
945                    .finish_hold_stream_open();
946                pkg_dir.close_pkg_dir();
947
948                // wait until `send` is dropped to drop the request stream.
949                let _ = recv.await;
950                drop(needed_blobs_stream);
951            },
952            async move {
953                let mut get =
954                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
955
956                assert_matches!(get.open_meta_blob().await.unwrap(), None);
957
958                // ensure sending the request doesn't fail, then unblock closing the channel, then
959                // ensure the get_missing_blobs call detects the closed iterator as success instead
960                // of a PEER_CLOSED error.
961                let missing_blobs_stream = get.get_missing_blobs();
962                drop(send);
963                assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
964                let pkg_dir = get.finish().await.unwrap();
965
966                assert_matches!(
967                    pkg_dir.into_proxy().take_event_stream().next().await,
968                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
969                );
970            },
971        )
972        .await;
973    }
974
975    #[fuchsia_async::run_singlethreaded(test)]
976    async fn needed_blobs_open_meta_far() {
977        let (mut get, pending_get) = PendingGet::new().await;
978
979        let ((), ()) = future::join(
980            async {
981                pending_get
982                    .expect_open_meta_blob(Ok(None))
983                    .await
984                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
985                    .await
986                    .expect_open_meta_blob(Ok(None))
987                    .await
988                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
989                    .await
990                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
991                    .await
992                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
993                    .await;
994            },
995            async {
996                {
997                    let opener = get.make_open_meta_blob();
998                    assert_matches!(opener.open().await.unwrap(), None);
999                    assert_matches!(opener.open().await.unwrap(), Some(_));
1000                }
1001                assert_matches!(get.open_meta_blob().await.unwrap(), None);
1002                assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1003                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1004                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1005            },
1006        )
1007        .await;
1008    }
1009
1010    #[fuchsia_async::run_singlethreaded(test)]
1011    async fn needed_blobs_open_content_blob() {
1012        let (mut get, pending_get) = PendingGet::new().await;
1013
1014        let ((), ()) = future::join(
1015            async {
1016                pending_get
1017                    .expect_open_blob(blob_id(2), Ok(None))
1018                    .await
1019                    .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1020                    .await
1021                    .expect_open_blob(blob_id(10), Ok(None))
1022                    .await
1023                    .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1024                    .await
1025                    .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1026                    .await
1027                    .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1028                    .await;
1029            },
1030            async {
1031                {
1032                    let opener = get.make_open_blob(blob_id(2));
1033                    assert_matches!(opener.open().await.unwrap(), None);
1034                    assert_matches!(opener.open().await.unwrap(), Some(_));
1035                }
1036                assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1037                assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1038                assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1039                assert_matches!(
1040                    get.open_blob(blob_id(13),).await,
1041                    Err(OpenBlobError::UnspecifiedIo)
1042                );
1043            },
1044        )
1045        .await;
1046    }
1047
1048    #[fuchsia_async::run_singlethreaded(test)]
1049    async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1050        let (mut get, pending_get) = PendingGet::new().await;
1051        let _ = pending_get.finish();
1052
1053        assert_matches!(get.open_meta_blob().await, Ok(None));
1054        assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1055    }
1056
1057    #[fuchsia_async::run_singlethreaded(test)]
1058    async fn needed_blobs_get_missing_blobs() {
1059        let (mut get, pending_get) = PendingGet::new().await;
1060
1061        let ((), ()) = future::join(
1062            async {
1063                pending_get
1064                    .expect_get_missing_blobs(vec![
1065                        vec![blob_info(1), blob_info(2)],
1066                        vec![blob_info(3)],
1067                    ])
1068                    .await;
1069            },
1070            async {
1071                assert_eq!(
1072                    get.get_missing_blobs().try_concat().await.unwrap(),
1073                    vec![blob_info(1), blob_info(2), blob_info(3)]
1074                );
1075            },
1076        )
1077        .await;
1078    }
1079
1080    #[fuchsia_async::run_singlethreaded(test)]
1081    async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1082        let (mut get, pending_get) = PendingGet::new().await;
1083        drop(pending_get);
1084
1085        assert_matches!(
1086            get.get_missing_blobs().try_concat().await,
1087            Err(ListMissingBlobsError::CallNextOnBlobIterator(
1088                fidl::Error::ClientChannelClosed{status, ..})
1089            )
1090                if status == Status::PEER_CLOSED
1091        );
1092    }
1093
1094    #[fuchsia_async::run_singlethreaded(test)]
1095    async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1096        let (mut get, pending_get) = PendingGet::new().await;
1097
1098        let (_, ()) =
1099            future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1100                assert_matches!(
1101                    get.get_missing_blobs().try_concat().await,
1102                    Err(ListMissingBlobsError::CallNextOnBlobIterator(
1103                        fidl::Error::ClientChannelClosed{status, ..}
1104                    ))
1105                        if status == Status::ADDRESS_IN_USE
1106                );
1107            })
1108            .await;
1109    }
1110
1111    #[cfg(target_os = "fuchsia")]
1112    #[test]
1113    fn needed_blobs_abort() {
1114        use futures::future::Either;
1115        use futures::pin_mut;
1116        use std::task::Poll;
1117
1118        let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1119
1120        let fut = async {
1121            let (get, pending_get) = PendingGet::new().await;
1122
1123            let abort_fut = get.abort().boxed();
1124            let expect_abort_fut = pending_get.expect_abort();
1125            pin_mut!(expect_abort_fut);
1126
1127            match futures::future::select(abort_fut, expect_abort_fut).await {
1128                Either::Left(((), _expect_abort_fut)) => {
1129                    panic!("abort should wait for the get future to complete")
1130                }
1131                Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1132            }
1133        };
1134        pin_mut!(fut);
1135
1136        let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1137            Poll::Pending => panic!("should complete"),
1138            Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1139        };
1140
1141        // NeededBlobs.Abort should wait until PackageCache.Get returns
1142        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1143        pending_get.fail_the_get();
1144        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1145    }
1146
1147    struct MockNeededBlob {
1148        writer: ffxfs::BlobWriterRequestStream,
1149        needed_blobs: fpkg::NeededBlobsRequestStream,
1150        vmo: Option<zx::Vmo>,
1151    }
1152
1153    impl MockNeededBlob {
1154        fn mock_hash() -> BlobId {
1155            [7; 32].into()
1156        }
1157
1158        fn new() -> (NeededBlob, Self) {
1159            let (writer_client, writer) =
1160                fidl::endpoints::create_request_stream::<ffxfs::BlobWriterMarker>();
1161            let (needed_blobs_proxy, needed_blobs) =
1162                fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1163            (
1164                NeededBlob {
1165                    blob: Blob {
1166                        needed_blobs: needed_blobs_proxy,
1167                        blob_id: Self::mock_hash(),
1168                        state: NeedsTruncate(writer_client),
1169                    },
1170                },
1171                Self { writer, needed_blobs, vmo: None },
1172            )
1173        }
1174
1175        async fn fail_get_vmo(mut self) -> Self {
1176            match self.writer.next().await {
1177                Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size: _, responder })) => {
1178                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1179                }
1180                r => panic!("Unexpected request: {r:?}"),
1181            }
1182            self
1183        }
1184
1185        async fn expect_get_vmo(mut self, expected_size: u64) -> Self {
1186            match self.writer.next().await {
1187                Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size, responder })) => {
1188                    assert_eq!(size, expected_size);
1189                    let vmo = zx::Vmo::create(size).unwrap();
1190                    assert!(self.vmo.is_none());
1191                    self.vmo = Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap());
1192                    responder.send(Ok(vmo)).unwrap();
1193                }
1194                r => panic!("Unexpected request: {r:?}"),
1195            }
1196            self
1197        }
1198
1199        async fn fail_bytes_ready(mut self) -> Self {
1200            match self.writer.next().await {
1201                Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written: _, responder })) => {
1202                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1203                }
1204                r => panic!("Unexpected request: {r:?}"),
1205            }
1206            self
1207        }
1208
1209        async fn expect_bytes_ready(mut self, expected_payload: &[u8], offset: u64) -> Self {
1210            match self.writer.next().await {
1211                Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written, responder })) => {
1212                    assert_eq!(bytes_written, u64::try_from(expected_payload.len()).unwrap());
1213
1214                    let vmo = self.vmo.as_ref().unwrap();
1215                    let mut buf = vec![0; expected_payload.len()];
1216                    let () = vmo.read(&mut buf, offset).unwrap();
1217                    assert_eq!(buf, expected_payload);
1218
1219                    responder.send(Ok(())).unwrap();
1220                }
1221                r => panic!("Unexpected request: {r:?}"),
1222            }
1223            self
1224        }
1225
1226        async fn expect_blob_written(mut self) -> Self {
1227            match self.needed_blobs.next().await {
1228                Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1229                    assert_eq!(blob_id, Self::mock_hash().into());
1230                    responder.send(Ok(())).unwrap();
1231                }
1232                r => panic!("Unexpected request: {r:?}"),
1233            }
1234            self
1235        }
1236    }
1237
1238    #[fuchsia_async::run_singlethreaded(test)]
1239    async fn empty_blob_write() {
1240        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1241
1242        let ((), ()) = future::join(
1243            async {
1244                blob_server.expect_get_vmo(0).await.expect_blob_written().await;
1245            },
1246            async {
1247                let blob = match blob.truncate(0).await.unwrap() {
1248                    TruncateBlobSuccess::AllWritten(blob) => blob,
1249                    other => panic!("empty blob shouldn't need bytes {other:?}"),
1250                };
1251                let () = blob.blob_written().await.unwrap();
1252            },
1253        )
1254        .await;
1255    }
1256
1257    impl TruncateBlobSuccess {
1258        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1259            match self {
1260                TruncateBlobSuccess::NeedsData(blob) => blob,
1261                TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1262            }
1263        }
1264    }
1265
1266    impl BlobWriteSuccess {
1267        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1268            match self {
1269                BlobWriteSuccess::NeedsData(blob) => blob,
1270                BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1271            }
1272        }
1273
1274        fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1275            match self {
1276                BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1277                BlobWriteSuccess::AllWritten(blob) => blob,
1278            }
1279        }
1280    }
1281
1282    #[fuchsia_async::run_singlethreaded(test)]
1283    async fn small_blob_write() {
1284        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1285
1286        let ((), ()) = future::join(
1287            async {
1288                blob_server
1289                    .expect_get_vmo(4)
1290                    .await
1291                    .expect_bytes_ready(b"test", 0)
1292                    .await
1293                    .expect_blob_written()
1294                    .await;
1295            },
1296            async {
1297                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1298                let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1299                let () = blob.blob_written().await.unwrap();
1300            },
1301        )
1302        .await;
1303    }
1304
1305    #[fuchsia_async::run_singlethreaded(test)]
1306    async fn blob_truncate_no_space() {
1307        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1308
1309        let ((), ()) = future::join(
1310            async {
1311                blob_server.fail_get_vmo().await;
1312            },
1313            async {
1314                assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1315            },
1316        )
1317        .await;
1318    }
1319
1320    #[fuchsia_async::run_singlethreaded(test)]
1321    async fn blob_write_no_space() {
1322        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1323
1324        let ((), ()) = future::join(
1325            async {
1326                blob_server.expect_get_vmo(4).await.fail_bytes_ready().await;
1327            },
1328            async {
1329                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1330                assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1331            },
1332        )
1333        .await;
1334    }
1335
1336    #[fuchsia_async::run_singlethreaded(test)]
1337    async fn blob_write_multiple_write() {
1338        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1339
1340        let ((), ()) = future::join(
1341            async {
1342                blob_server
1343                    .expect_get_vmo(6)
1344                    .await
1345                    .expect_bytes_ready(b"abc", 0)
1346                    .await
1347                    .expect_bytes_ready(b"123", 3)
1348                    .await
1349                    .expect_blob_written()
1350                    .await;
1351            },
1352            async {
1353                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1354                let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1355                let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1356                let () = blob.blob_written().await.unwrap();
1357            },
1358        )
1359        .await;
1360    }
1361
1362    #[fuchsia_async::run_singlethreaded(test)]
1363    async fn get_already_cached_success() {
1364        let (client, mut server) = MockPackageCache::new();
1365
1366        let ((), ()) = future::join(
1367            async {
1368                server
1369                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1370                    .await
1371                    .finish()
1372                    .close_pkg_dir();
1373                server.expect_closed().await;
1374            },
1375            async move {
1376                let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1377
1378                assert_matches!(
1379                    pkg_dir.into_proxy().take_event_stream().next().await,
1380                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1381                );
1382            },
1383        )
1384        .await;
1385    }
1386
1387    #[fuchsia_async::run_singlethreaded(test)]
1388    async fn get_already_cached_missing_meta_far() {
1389        let (client, mut server) = MockPackageCache::new();
1390
1391        let ((), ()) = future::join(
1392            async {
1393                server
1394                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1395                    .await
1396                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1397                    .await;
1398            },
1399            async move {
1400                assert_matches!(
1401                    client.get_already_cached(blob_id(2)).await,
1402                    Err(GetAlreadyCachedError::MissingMetaFar)
1403                );
1404            },
1405        )
1406        .await;
1407    }
1408
1409    #[fuchsia_async::run_singlethreaded(test)]
1410    async fn get_already_cached_missing_content_blob() {
1411        let (client, mut server) = MockPackageCache::new();
1412
1413        let ((), ()) = future::join(
1414            async {
1415                server
1416                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1417                    .await
1418                    .expect_open_meta_blob(Ok(None))
1419                    .await
1420                    .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1421                        blob_id: [0; 32].into(),
1422                        length: 0,
1423                    }]])
1424                    .await;
1425            },
1426            async move {
1427                assert_matches!(
1428                    client.get_already_cached(blob_id(2)).await,
1429                    Err(GetAlreadyCachedError::MissingContentBlobs(v))
1430                        if v == vec![BlobInfo {
1431                            blob_id: [0; 32].into(),
1432                            length: 0,
1433                        }]
1434                );
1435            },
1436        )
1437        .await;
1438    }
1439}