fidl_fuchsia_pkg_ext/
cache.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8//! Wrapper types for [`fidl_fuchsia_pkg::PackageCacheProxy`] and its related protocols.
9
10use crate::types::{BlobId, BlobInfo};
11use fidl_fuchsia_pkg as fpkg;
12use fuchsia_pkg::PackageDirectory;
13use futures::prelude::*;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use zx_status::Status;
17
18mod storage;
19
20/// An open connection to a provider of the `fuchsia.pkg.PackageCache`.
21#[derive(Debug, Clone)]
22pub struct Client {
23    proxy: fpkg::PackageCacheProxy,
24}
25
26impl Client {
27    /// Constructs a client from the given proxy.
28    pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
29        Self { proxy }
30    }
31
32    /// Returns a reference to the underlying PackageCacheProxy connection.
33    pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
34        &self.proxy
35    }
36
37    /// Opens the package specified by `meta_far_blob` with the intent to fetch any missing blobs
38    /// using the returned [`Get`] type if needed.
39    pub fn get(
40        &self,
41        meta_far_blob: BlobInfo,
42        gc_protection: fpkg::GcProtection,
43    ) -> Result<Get, fidl::Error> {
44        let (needed_blobs, needed_blobs_server_end) =
45            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
46        let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
47
48        let get_fut = self.proxy.get(
49            &meta_far_blob.into(),
50            gc_protection,
51            needed_blobs_server_end,
52            pkg_dir_server_end,
53        );
54
55        Ok(Get {
56            get_fut,
57            pkg_dir,
58            needed_blobs,
59            pkg_present: SharedBoolEvent::new(),
60            meta_far: meta_far_blob.blob_id,
61        })
62    }
63
64    /// Uses PackageCache.Get to obtain the package directory of a package that is already cached
65    /// (all blobs are already in blobfs).
66    /// Errors if the package is not already cached.
67    /// Always uses open package tracking GC protection, because OTA (the only client of Retained
68    /// GC protection), should never need to get an already cached package.
69    ///
70    /// Compared to `get_cached`:
71    ///   * Activates `meta_far_blob` in the dynamic index
72    ///   * Must not be called concurrently with the same `meta_far_blob`
73    pub async fn get_already_cached(
74        &self,
75        meta_far_blob: BlobId,
76    ) -> Result<PackageDirectory, GetAlreadyCachedError> {
77        let mut get = self
78            .get(
79                BlobInfo { blob_id: meta_far_blob, length: 0 },
80                fpkg::GcProtection::OpenPackageTracking,
81            )
82            .map_err(GetAlreadyCachedError::Get)?;
83        if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
84            return Err(GetAlreadyCachedError::MissingMetaFar);
85        }
86
87        if let Some(missing_blobs) = get
88            .get_missing_blobs()
89            .try_next()
90            .await
91            .map_err(GetAlreadyCachedError::GetMissingBlobs)?
92        {
93            return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
94        }
95
96        get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
97    }
98
99    /// Uses PackageCache.GetSubpackage to obtain the package directory of a subpackage.
100    /// Errors if there is not an open connection to the superpackage.
101    pub async fn get_subpackage(
102        &self,
103        superpackage: BlobId,
104        subpackage: &fuchsia_url::RelativePackageUrl,
105    ) -> Result<PackageDirectory, GetSubpackageError> {
106        let (dir, dir_server_end) =
107            PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
108        let () = self
109            .proxy
110            .get_subpackage(
111                &superpackage.into(),
112                &fpkg::PackageUrl { url: subpackage.into() },
113                dir_server_end,
114            )
115            .await
116            .map_err(GetSubpackageError::CallingGetSubpackage)??;
117        Ok(dir)
118    }
119
120    /// Write blobs using the returned [`WriteBlobs`] type.
121    pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
122        let (needed_blobs, needed_blobs_server_end) =
123            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
124
125        let () = self.proxy.write_blobs(needed_blobs_server_end)?;
126
127        Ok(WriteBlobs { needed_blobs })
128    }
129}
130
131#[derive(thiserror::Error, Debug)]
132#[allow(missing_docs)]
133pub enum GetAlreadyCachedError {
134    #[error("calling get")]
135    Get(#[source] fidl::Error),
136
137    #[error("opening meta blob")]
138    OpenMetaBlob(#[source] OpenBlobError),
139
140    #[error("meta.far blob not cached")]
141    MissingMetaFar,
142
143    #[error("getting missing blobs")]
144    GetMissingBlobs(#[source] ListMissingBlobsError),
145
146    #[error("content blobs not cached {0:?}")]
147    MissingContentBlobs(Vec<BlobInfo>),
148
149    #[error("finishing get")]
150    FinishGet(#[source] GetError),
151}
152
153impl GetAlreadyCachedError {
154    /// Returns true if the get failed because the package was not cached.
155    pub fn was_not_cached(&self) -> bool {
156        use GetAlreadyCachedError::*;
157        match self {
158            Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
159            MissingMetaFar | MissingContentBlobs(..) => true,
160        }
161    }
162}
163
164#[derive(thiserror::Error, Debug)]
165#[allow(missing_docs)]
166pub enum GetSubpackageError {
167    #[error("creating handles")]
168    CreatingHandles(#[source] fidl::Error),
169
170    #[error("calling GetCached FIDL")]
171    CallingGetSubpackage(#[source] fidl::Error),
172
173    #[error("the superpackage does not have an open package connection")]
174    SuperpackageClosed,
175
176    #[error("the subpackage does not exist")]
177    DoesNotExist,
178
179    #[error("internal")]
180    Internal,
181}
182
183impl From<fpkg::GetSubpackageError> for GetSubpackageError {
184    fn from(fidl: fpkg::GetSubpackageError) -> Self {
185        use fpkg::GetSubpackageError as fErr;
186        use GetSubpackageError::*;
187        match fidl {
188            fErr::SuperpackageClosed => SuperpackageClosed,
189            fErr::DoesNotExist => DoesNotExist,
190            fErr::Internal => Internal,
191        }
192    }
193}
194
195#[derive(Debug, Clone)]
196struct SharedBoolEvent(Arc<AtomicBool>);
197
198impl SharedBoolEvent {
199    fn new() -> Self {
200        Self(Arc::new(AtomicBool::new(false)))
201    }
202
203    fn get(&self) -> bool {
204        self.0.load(Ordering::SeqCst)
205    }
206
207    fn set(&self) {
208        self.0.store(true, Ordering::SeqCst)
209    }
210}
211
212async fn open_blob(
213    needed_blobs: &fpkg::NeededBlobsProxy,
214    kind: OpenKind,
215    blob_id: BlobId,
216    pkg_present: Option<&SharedBoolEvent>,
217) -> Result<Option<NeededBlob>, OpenBlobError> {
218    let open_fut = match kind {
219        OpenKind::Meta => needed_blobs.open_meta_blob(),
220        OpenKind::Content => needed_blobs.open_blob(&blob_id.into()),
221    };
222    match open_fut.await {
223        Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
224            if let Some(pkg_present) = pkg_present {
225                pkg_present.set();
226            }
227            Ok(None)
228        }
229        res => {
230            if let Some(blob) = res?? {
231                let (writer, closer) = self::storage::into_blob_writer_and_closer(*blob)?;
232                Ok(Some(NeededBlob {
233                    blob: Blob {
234                        writer,
235                        needed_blobs: needed_blobs.clone(),
236                        blob_id,
237                        state: NeedsTruncate,
238                    },
239                    closer: BlobCloser { closer, closed: false },
240                }))
241            } else {
242                Ok(None)
243            }
244        }
245    }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249enum OpenKind {
250    Meta,
251    Content,
252}
253
254/// A deferred call to [`Get::open_meta_blob`] or [`Get::open_blob`].
255#[derive(Debug)]
256pub struct DeferredOpenBlob {
257    needed_blobs: fpkg::NeededBlobsProxy,
258    kind: OpenKind,
259    blob_id: BlobId,
260    pkg_present: Option<SharedBoolEvent>,
261}
262
263impl DeferredOpenBlob {
264    /// Opens the blob for write, if it is still needed. The blob's data can be provided using the
265    /// returned NeededBlob.
266    pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
267        open_blob(&self.needed_blobs, self.kind, self.blob_id, self.pkg_present.as_ref()).await
268    }
269
270    fn proxy_cmp_key(&self) -> u32 {
271        use fidl::endpoints::Proxy;
272        use fidl::AsHandleRef;
273        self.needed_blobs.as_channel().raw_handle()
274    }
275}
276
277impl std::cmp::PartialEq for DeferredOpenBlob {
278    fn eq(&self, other: &Self) -> bool {
279        self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
280    }
281}
282
283impl std::cmp::Eq for DeferredOpenBlob {}
284
285/// A pending `fuchsia.pkg/PackageCache.Get()` request. Clients must, in order:
286/// 1. open/write the meta blob, if Some(NeededBlob) is provided by that API
287/// 2. enumerate all missing content blobs
288/// 3. open/write all missing content blobs, if Some(NeededBlob) is provided by that API
289/// 4. finish() to complete the Get() request.
290#[derive(Debug)]
291pub struct Get {
292    get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
293    needed_blobs: fpkg::NeededBlobsProxy,
294    pkg_dir: PackageDirectory,
295    pkg_present: SharedBoolEvent,
296    meta_far: BlobId,
297}
298
299impl Get {
300    /// Returns an independent object that can be used to open the meta blob for write.  See
301    /// [`Self::open_meta_blob`].
302    pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
303        DeferredOpenBlob {
304            needed_blobs: self.needed_blobs.clone(),
305            kind: OpenKind::Meta,
306            blob_id: self.meta_far,
307            pkg_present: Some(self.pkg_present.clone()),
308        }
309    }
310
311    /// Opens the meta blob for write, if it is still needed. The blob's data can be provided using
312    /// the returned NeededBlob.
313    pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
314        open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present)).await
315    }
316
317    fn start_get_missing_blobs(
318        &mut self,
319    ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
320        if self.pkg_present.get() {
321            return Ok(None);
322        }
323
324        let (blob_iterator, blob_iterator_server_end) =
325            fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
326
327        self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
328        Ok(Some(blob_iterator))
329    }
330
331    /// Determines the set of blobs that the caller must open/write to complete this `Get()`
332    /// operation.
333    /// The returned stream will never yield an empty `Vec`.
334    /// Callers should process the missing blobs (via `make_open_blob` or `open_blob`) concurrently
335    /// with reading the stream to guarantee stream termination.
336    pub fn get_missing_blobs(
337        &mut self,
338    ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin {
339        match self.start_get_missing_blobs() {
340            Ok(option_iter) => match option_iter {
341                Some(iterator) => crate::fidl_iterator_to_stream(iterator)
342                    .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
343                    .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
344                    .left_stream(),
345                None => futures::stream::empty().right_stream(),
346            }
347            .left_stream(),
348            Err(e) => {
349                futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
350                    .right_stream()
351            }
352        }
353    }
354
355    /// Returns an independent object that can be used to open the `content_blob` for write.  See
356    /// [`Self::open_blob`].
357    pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
358        DeferredOpenBlob {
359            needed_blobs: self.needed_blobs.clone(),
360            kind: OpenKind::Content,
361            blob_id: content_blob,
362            pkg_present: None,
363        }
364    }
365
366    /// Opens `content_blob` for write, if it is still needed. The blob's data can be provided
367    /// using the returned NeededBlob.
368    pub async fn open_blob(
369        &mut self,
370        content_blob: BlobId,
371    ) -> Result<Option<NeededBlob>, OpenBlobError> {
372        open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None).await
373    }
374
375    /// Notifies the endpoint that all blobs have been written and wait for the response to the
376    /// pending `Get()` request, returning the cached [`PackageDirectory`].
377    pub async fn finish(self) -> Result<PackageDirectory, GetError> {
378        drop(self.needed_blobs);
379        let () = self.get_fut.await?.map_err(Status::from_raw)?;
380        Ok(self.pkg_dir)
381    }
382
383    /// Aborts this caching operation for the package.
384    pub async fn abort(self) {
385        self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
386        // The package is not guaranteed to be removed from the dynamic index after abort
387        // returns, we have to wait until finish returns (to prevent a resolve retry from
388        // racing). The finish call will return an error that just tells us that we called
389        // abort, so we ignore it.
390        let _ = self.get_fut.await;
391    }
392}
393
394/// A pending `fuchsia.pkg/PackageCache.WriteBlob()` request.
395#[derive(Clone, Debug)]
396pub struct WriteBlobs {
397    needed_blobs: fpkg::NeededBlobsProxy,
398}
399
400impl WriteBlobs {
401    /// Returns an independent object that can be used to open the `blob` for write.  See
402    /// [`Self::open_blob`].
403    pub fn make_open_blob(&mut self, blob: BlobId) -> DeferredOpenBlob {
404        DeferredOpenBlob {
405            needed_blobs: self.needed_blobs.clone(),
406            kind: OpenKind::Content,
407            blob_id: blob,
408            pkg_present: None,
409        }
410    }
411
412    /// Opens `blob` for write. The blob's data can be provided using the returned NeededBlob.
413    pub async fn open_blob(&mut self, blob: BlobId) -> Result<Option<NeededBlob>, OpenBlobError> {
414        open_blob(&self.needed_blobs, OpenKind::Content, blob, None).await
415    }
416}
417
418/// A blob that needs to be written.
419#[derive(Debug)]
420pub struct NeededBlob {
421    /// Typestate wrapper around the blob. Clients must first call truncate(), then write() until
422    /// all data is provided.
423    pub blob: Blob<NeedsTruncate>,
424
425    /// Helper object that can close the blob independent of what state `blob` is in.
426    pub closer: BlobCloser,
427}
428
429/// A handle to a blob that must be explicitly closed to prevent future opens of the same blob from
430/// racing with this blob closing.
431#[derive(Debug)]
432#[must_use = "Subsequent opens of this blob may race with closing this one"]
433pub struct BlobCloser {
434    closer: Box<dyn self::storage::Closer>,
435    closed: bool,
436}
437
438impl BlobCloser {
439    /// Close the blob, silently ignoring errors.
440    pub async fn close(mut self) {
441        let () = self.closer.close().await;
442        self.closed = true;
443    }
444}
445
446impl Drop for BlobCloser {
447    fn drop(&mut self) {
448        if !self.closed {
449            let () = self.closer.best_effort_close();
450        }
451    }
452}
453
454/// The successful result of truncating a blob.
455#[derive(Debug)]
456pub enum TruncateBlobSuccess {
457    /// The blob contents need to be written.
458    NeedsData(Blob<NeedsData>),
459
460    /// The blob is fully written (it was the empty blob) and now a
461    /// fuchsia.pkg.NeededBlobs.BlobWritten message should be sent.
462    AllWritten(Blob<NeedsBlobWritten>),
463}
464
465/// The successful result of writing some data to a blob.
466#[derive(Debug)]
467pub enum BlobWriteSuccess {
468    /// There is still more data to write.
469    NeedsData(Blob<NeedsData>),
470
471    /// The blob is fully written and now a fuchsia.pkg.NeededBlobs.BlobWritten
472    /// message should be sent.
473    AllWritten(Blob<NeedsBlobWritten>),
474}
475
476/// State for a blob that can be truncated.
477#[derive(Debug)]
478pub struct NeedsTruncate;
479
480/// State for a blob that can be written to.
481#[derive(Debug)]
482pub struct NeedsData {
483    size: u64,
484    written: u64,
485}
486
487/// State for a blob that has been fully written but that needs a
488/// fuchsia.pkg.NeededBlobs.BlobWritten message sent to pkg-cache.
489#[derive(Debug)]
490pub struct NeedsBlobWritten;
491
492/// A blob in the process of being written.
493#[derive(Debug)]
494#[must_use]
495pub struct Blob<S> {
496    writer: Box<dyn self::storage::Writer>,
497    needed_blobs: fpkg::NeededBlobsProxy,
498    blob_id: BlobId,
499    state: S,
500}
501
502impl Blob<NeedsTruncate> {
503    /// Truncates the blob to the given size. On success, the blob enters the writable state.
504    pub async fn truncate(mut self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
505        let () = self.writer.truncate(size).await?;
506
507        let Self { writer, needed_blobs, blob_id, state: _ } = self;
508
509        Ok(if size == 0 {
510            TruncateBlobSuccess::AllWritten(Blob {
511                writer,
512                needed_blobs,
513                blob_id,
514                state: NeedsBlobWritten,
515            })
516        } else {
517            TruncateBlobSuccess::NeedsData(Blob {
518                writer,
519                needed_blobs,
520                blob_id,
521                state: NeedsData { size, written: 0 },
522            })
523        })
524    }
525}
526
527impl Blob<NeedsData> {
528    /// Writes all of the given buffer to the blob.
529    ///
530    /// # Panics
531    ///
532    /// Panics if a write is attempted with a buf larger than the remaining blob size.
533    pub fn write(
534        self,
535        buf: &[u8],
536    ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
537        self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
538    }
539
540    /// Writes all of the given buffer to the blob.
541    ///
542    /// `after_write` and `after_write_ack` are called before and after, respectively, waiting for
543    /// the server to acknowledge writes.
544    /// They may be called multiple times if the write of `buf` is chunked.
545    /// `after_write` is given the size of each write in bytes.
546    /// Useful for creating trace spans.
547    ///
548    /// # Panics
549    ///
550    /// Panics if a write is attempted with a buf larger than the remaining blob size.
551    pub async fn write_with_trace_callbacks(
552        mut self,
553        buf: &[u8],
554        after_write: &(dyn Fn(u64) + Send + Sync),
555        after_write_ack: &(dyn Fn() + Send + Sync),
556    ) -> Result<BlobWriteSuccess, WriteBlobError> {
557        assert!(self.state.written + buf.len() as u64 <= self.state.size);
558
559        let () = self.writer.write(buf, after_write, after_write_ack).await?;
560        self.state.written += buf.len() as u64;
561
562        if self.state.written == self.state.size {
563            let Self { writer, needed_blobs, blob_id, state: _ } = self;
564            Ok(BlobWriteSuccess::AllWritten(Blob {
565                writer,
566                needed_blobs,
567                blob_id,
568                state: NeedsBlobWritten,
569            }))
570        } else {
571            Ok(BlobWriteSuccess::NeedsData(self))
572        }
573    }
574}
575
576impl Blob<NeedsBlobWritten> {
577    /// Tells pkg-cache that the blob has been successfully written and can now be read.
578    pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
579        Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
580    }
581}
582
583/// An error encountered while opening a package.
584#[derive(Debug, thiserror::Error)]
585#[allow(missing_docs)]
586pub enum OpenError {
587    #[error("the package does not exist")]
588    NotFound,
589
590    #[error("Open() responded with an unexpected status")]
591    UnexpectedResponse(#[source] Status),
592
593    #[error("transport error")]
594    Fidl(#[from] fidl::Error),
595}
596/// An error encountered while caching a package.
597#[derive(Debug, thiserror::Error)]
598#[allow(missing_docs)]
599pub enum GetError {
600    #[error("Get() responded with an unexpected status")]
601    UnexpectedResponse(#[from] Status),
602
603    #[error("transport error")]
604    Fidl(#[from] fidl::Error),
605}
606
607/// An error encountered while opening a metadata or content blob for write.
608#[derive(Debug, thiserror::Error)]
609#[allow(missing_docs)]
610pub enum OpenBlobError {
611    #[error("there is insufficient storage space available to persist this blob")]
612    OutOfSpace,
613
614    #[error("this blob is already open for write by another cache operation")]
615    ConcurrentWrite,
616
617    #[error("an unspecified error occurred during underlying I/O")]
618    UnspecifiedIo,
619
620    #[error("an unspecified error occurred")]
621    Internal,
622
623    #[error("transport error")]
624    Fidl(#[from] fidl::Error),
625}
626
627impl From<fpkg::OpenBlobError> for OpenBlobError {
628    fn from(e: fpkg::OpenBlobError) -> Self {
629        match e {
630            fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
631            fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
632            fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
633            fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
634        }
635    }
636}
637
638/// An error encountered while enumerating missing content blobs.
639#[derive(Debug, thiserror::Error)]
640#[allow(missing_docs)]
641pub enum ListMissingBlobsError {
642    #[error("while obtaining the missing blobs fidl iterator")]
643    CallGetMissingBlobs(#[source] fidl::Error),
644
645    #[error("while obtaining the next chunk of blobs from the fidl iterator")]
646    CallNextOnBlobIterator(#[source] fidl::Error),
647}
648
649/// An error encountered while truncating a blob
650#[derive(Debug, thiserror::Error)]
651#[allow(missing_docs)]
652pub enum TruncateBlobError {
653    #[error("insufficient storage space is available")]
654    NoSpace,
655
656    #[error("Truncate() responded with an unexpected status")]
657    UnexpectedResponse(#[source] Status),
658
659    #[error("transport error")]
660    Fidl(#[from] fidl::Error),
661
662    #[error("already truncated, currently in state {0}")]
663    AlreadyTruncated(&'static str),
664
665    // TODO(https://fxbug.dev/42080352) Add error variants to BlobWriter.
666    #[error("unspecified error")]
667    Other(#[source] anyhow::Error),
668
669    #[error("blob is in an invalid state")]
670    BadState,
671}
672
673/// An error encountered while writing a blob.
674#[derive(Debug, thiserror::Error)]
675#[allow(missing_docs)]
676pub enum WriteBlobError {
677    #[error("file endpoint reported it wrote more bytes than were actually provided to the file endpoint")]
678    Overwrite,
679
680    #[error("the written data was corrupt")]
681    Corrupt,
682
683    #[error("insufficient storage space is available")]
684    NoSpace,
685
686    #[error("Write() responded with an unexpected status")]
687    UnexpectedResponse(#[source] Status),
688
689    #[error("transport error")]
690    Fidl(#[from] fidl::Error),
691
692    #[error("bytes were written but not needed in state {0}")]
693    BytesNotNeeded(&'static str),
694
695    #[error("while using the fxblob writer")]
696    FxBlob(#[source] blob_writer::WriteError),
697}
698
699/// An error encountered while sending the BlobWritten message.
700#[derive(Debug, thiserror::Error)]
701#[allow(missing_docs)]
702pub enum BlobWrittenError {
703    #[error("pkg-cache could not find the blob after it was successfully written")]
704    MissingAfterWritten,
705
706    #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
707    UnopenedBlob,
708
709    #[error("transport error")]
710    Fidl(#[from] fidl::Error),
711}
712
713impl From<fpkg::BlobWrittenError> for BlobWrittenError {
714    fn from(e: fpkg::BlobWrittenError) -> Self {
715        match e {
716            fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
717            fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
718        }
719    }
720}
721
722#[cfg(test)]
723mod tests {
724    use super::*;
725    use assert_matches::assert_matches;
726    use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
727    use fidl_fuchsia_io as fio;
728    use fidl_fuchsia_pkg::{
729        BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
730        PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
731        PackageCacheRequestStream,
732    };
733
734    struct MockPackageCache {
735        stream: PackageCacheRequestStream,
736    }
737
738    impl MockPackageCache {
739        fn new() -> (Client, Self) {
740            let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
741            (Client::from_proxy(proxy), Self { stream })
742        }
743
744        async fn expect_get(
745            &mut self,
746            blob_info: BlobInfo,
747            expected_gc_protection: fpkg::GcProtection,
748        ) -> PendingGet {
749            match self.stream.next().await {
750                Some(Ok(PackageCacheRequest::Get {
751                    meta_far_blob,
752                    gc_protection,
753                    needed_blobs,
754                    dir,
755                    responder,
756                })) => {
757                    assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
758                    assert_eq!(gc_protection, expected_gc_protection);
759                    let needed_blobs = needed_blobs.into_stream();
760                    let dir = dir.into_stream();
761
762                    PendingGet { stream: needed_blobs, dir, responder }
763                }
764                r => panic!("Unexpected request: {:?}", r),
765            }
766        }
767
768        async fn expect_closed(mut self) {
769            assert_matches!(self.stream.next().await, None);
770        }
771    }
772
773    struct PendingGet {
774        stream: NeededBlobsRequestStream,
775        dir: fio::DirectoryRequestStream,
776        responder: PackageCacheGetResponder,
777    }
778
779    impl PendingGet {
780        async fn new() -> (Get, PendingGet) {
781            let (client, mut server) = MockPackageCache::new();
782
783            let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
784            let pending_get =
785                server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
786            (get, pending_get)
787        }
788
789        fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
790            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
791            self.responder.send(Ok(())).unwrap();
792            (self.stream, PackageDirProvider { stream: self.dir })
793        }
794
795        fn finish(self) -> PackageDirProvider {
796            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
797            self.responder.send(Ok(())).unwrap();
798            PackageDirProvider { stream: self.dir }
799        }
800
801        #[cfg(target_os = "fuchsia")]
802        fn fail_the_get(self) {
803            self.responder
804                .send(Err(Status::IO_INVALID.into_raw()))
805                .expect("client should be waiting");
806        }
807
808        async fn expect_open_meta_blob(
809            mut self,
810            res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
811        ) -> Self {
812            match self.stream.next().await {
813                Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
814                    responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
815                }
816                r => panic!("Unexpected request: {:?}", r),
817            }
818            self
819        }
820
821        async fn expect_open_blob(
822            mut self,
823            expected_blob_id: BlobId,
824            res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
825        ) -> Self {
826            match self.stream.next().await {
827                Some(Ok(NeededBlobsRequest::OpenBlob { blob_id, responder })) => {
828                    assert_eq!(BlobId::from(blob_id), expected_blob_id);
829                    responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
830                }
831                r => panic!("Unexpected request: {:?}", r),
832            }
833            self
834        }
835
836        async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
837            match self.stream.next().await {
838                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
839                    let mut stream = iterator.into_stream();
840
841                    // Respond to each next request with the next chunk.
842                    for chunk in response_chunks {
843                        let chunk = chunk
844                            .into_iter()
845                            .map(fidl_fuchsia_pkg::BlobInfo::from)
846                            .collect::<Vec<_>>();
847
848                        let BlobInfoIteratorRequest::Next { responder } =
849                            stream.next().await.unwrap().unwrap();
850                        responder.send(&chunk).unwrap();
851                    }
852
853                    // Then respond with an empty chunk.
854                    let BlobInfoIteratorRequest::Next { responder } =
855                        stream.next().await.unwrap().unwrap();
856                    responder.send(&[]).unwrap();
857
858                    // Expect the client to stop asking.
859                    assert_matches!(stream.next().await, None);
860                }
861                r => panic!("Unexpected request: {:?}", r),
862            }
863            self
864        }
865
866        async fn expect_get_missing_blobs_client_closes_channel(
867            mut self,
868            response_chunks: Vec<Vec<BlobInfo>>,
869        ) -> Self {
870            match self.stream.next().await {
871                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
872                    let mut stream = iterator.into_stream();
873
874                    // Respond to each next request with the next chunk.
875                    for chunk in response_chunks {
876                        let chunk = chunk
877                            .into_iter()
878                            .map(fidl_fuchsia_pkg::BlobInfo::from)
879                            .collect::<Vec<_>>();
880
881                        let BlobInfoIteratorRequest::Next { responder } =
882                            stream.next().await.unwrap().unwrap();
883                        responder.send(&chunk).unwrap();
884                    }
885
886                    // The client closes the channel before we can respond with an empty chunk.
887                    assert_matches!(stream.next().await, None);
888                }
889                r => panic!("Unexpected request: {:?}", r),
890            }
891            self
892        }
893
894        async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
895            match self.stream.next().await {
896                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
897                    iterator
898                        .into_stream_and_control_handle()
899                        .1
900                        .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
901                }
902                r => panic!("Unexpected request: {:?}", r),
903            }
904            self
905        }
906
907        #[cfg(target_os = "fuchsia")]
908        async fn expect_abort(mut self) -> Self {
909            match self.stream.next().await {
910                Some(Ok(NeededBlobsRequest::Abort { responder })) => {
911                    responder.send().unwrap();
912                }
913                r => panic!("Unexpected request: {:?}", r),
914            }
915            self
916        }
917    }
918
919    struct PackageDirProvider {
920        stream: fio::DirectoryRequestStream,
921    }
922
923    impl PackageDirProvider {
924        fn close_pkg_dir(self) {
925            self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
926        }
927    }
928
929    fn blob_id(n: u8) -> BlobId {
930        BlobId::from([n; 32])
931    }
932
933    fn blob_info(n: u8) -> BlobInfo {
934        BlobInfo { blob_id: blob_id(n), length: 0 }
935    }
936
937    #[fuchsia_async::run_singlethreaded(test)]
938    async fn constructor() {
939        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
940        let client = Client::from_proxy(proxy);
941
942        drop(stream);
943        assert_matches!(client.proxy().sync().await, Err(_));
944    }
945
946    #[fuchsia_async::run_singlethreaded(test)]
947    async fn get_present_package() {
948        let (client, mut server) = MockPackageCache::new();
949
950        let ((), ()) = future::join(
951            async {
952                server
953                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
954                    .await
955                    .finish()
956                    .close_pkg_dir();
957                server.expect_closed().await;
958            },
959            async move {
960                let mut get =
961                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
962
963                assert_matches!(get.open_meta_blob().await.unwrap(), None);
964                assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
965                let pkg_dir = get.finish().await.unwrap();
966
967                assert_matches!(
968                    pkg_dir.into_proxy().take_event_stream().next().await,
969                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
970                );
971            },
972        )
973        .await;
974    }
975
976    #[fuchsia_async::run_singlethreaded(test)]
977    async fn get_present_package_handles_slow_stream_close() {
978        let (client, mut server) = MockPackageCache::new();
979
980        let (send, recv) = futures::channel::oneshot::channel::<()>();
981
982        let ((), ()) = future::join(
983            async {
984                let (needed_blobs_stream, pkg_dir) = server
985                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
986                    .await
987                    .finish_hold_stream_open();
988                pkg_dir.close_pkg_dir();
989
990                // wait until `send` is dropped to drop the request stream.
991                let _ = recv.await;
992                drop(needed_blobs_stream);
993            },
994            async move {
995                let mut get =
996                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
997
998                assert_matches!(get.open_meta_blob().await.unwrap(), None);
999
1000                // ensure sending the request doesn't fail, then unblock closing the channel, then
1001                // ensure the get_missing_blobs call detects the closed iterator as success instead
1002                // of a PEER_CLOSED error.
1003                let missing_blobs_stream = get.get_missing_blobs();
1004                drop(send);
1005                assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
1006                let pkg_dir = get.finish().await.unwrap();
1007
1008                assert_matches!(
1009                    pkg_dir.into_proxy().take_event_stream().next().await,
1010                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1011                );
1012            },
1013        )
1014        .await;
1015    }
1016
1017    #[fuchsia_async::run_singlethreaded(test)]
1018    async fn needed_blobs_open_meta_far() {
1019        let (mut get, pending_get) = PendingGet::new().await;
1020
1021        let ((), ()) = future::join(
1022            async {
1023                pending_get
1024                    .expect_open_meta_blob(Ok(None))
1025                    .await
1026                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1027                    .await
1028                    .expect_open_meta_blob(Ok(None))
1029                    .await
1030                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1031                    .await
1032                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
1033                    .await
1034                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1035                    .await;
1036            },
1037            async {
1038                {
1039                    let opener = get.make_open_meta_blob();
1040                    assert_matches!(opener.open().await.unwrap(), None);
1041                    assert_matches!(opener.open().await.unwrap(), Some(_));
1042                }
1043                assert_matches!(get.open_meta_blob().await.unwrap(), None);
1044                assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1045                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1046                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1047            },
1048        )
1049        .await;
1050    }
1051
1052    #[fuchsia_async::run_singlethreaded(test)]
1053    async fn needed_blobs_open_content_blob() {
1054        let (mut get, pending_get) = PendingGet::new().await;
1055
1056        let ((), ()) = future::join(
1057            async {
1058                pending_get
1059                    .expect_open_blob(blob_id(2), Ok(None))
1060                    .await
1061                    .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1062                    .await
1063                    .expect_open_blob(blob_id(10), Ok(None))
1064                    .await
1065                    .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1066                    .await
1067                    .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1068                    .await
1069                    .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1070                    .await;
1071            },
1072            async {
1073                {
1074                    let opener = get.make_open_blob(blob_id(2));
1075                    assert_matches!(opener.open().await.unwrap(), None);
1076                    assert_matches!(opener.open().await.unwrap(), Some(_));
1077                }
1078                assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1079                assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1080                assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1081                assert_matches!(
1082                    get.open_blob(blob_id(13),).await,
1083                    Err(OpenBlobError::UnspecifiedIo)
1084                );
1085            },
1086        )
1087        .await;
1088    }
1089
1090    #[fuchsia_async::run_singlethreaded(test)]
1091    async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1092        let (mut get, pending_get) = PendingGet::new().await;
1093        let _ = pending_get.finish();
1094
1095        assert_matches!(get.open_meta_blob().await, Ok(None));
1096        assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1097    }
1098
1099    #[fuchsia_async::run_singlethreaded(test)]
1100    async fn needed_blobs_get_missing_blobs() {
1101        let (mut get, pending_get) = PendingGet::new().await;
1102
1103        let ((), ()) = future::join(
1104            async {
1105                pending_get
1106                    .expect_get_missing_blobs(vec![
1107                        vec![blob_info(1), blob_info(2)],
1108                        vec![blob_info(3)],
1109                    ])
1110                    .await;
1111            },
1112            async {
1113                assert_eq!(
1114                    get.get_missing_blobs().try_concat().await.unwrap(),
1115                    vec![blob_info(1), blob_info(2), blob_info(3)]
1116                );
1117            },
1118        )
1119        .await;
1120    }
1121
1122    #[fuchsia_async::run_singlethreaded(test)]
1123    async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1124        let (mut get, pending_get) = PendingGet::new().await;
1125        drop(pending_get);
1126
1127        assert_matches!(
1128            get.get_missing_blobs().try_concat().await,
1129            Err(ListMissingBlobsError::CallNextOnBlobIterator(
1130                fidl::Error::ClientChannelClosed{status, ..})
1131            )
1132                if status == Status::PEER_CLOSED
1133        );
1134    }
1135
1136    #[fuchsia_async::run_singlethreaded(test)]
1137    async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1138        let (mut get, pending_get) = PendingGet::new().await;
1139
1140        let (_, ()) =
1141            future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1142                assert_matches!(
1143                    get.get_missing_blobs().try_concat().await,
1144                    Err(ListMissingBlobsError::CallNextOnBlobIterator(
1145                        fidl::Error::ClientChannelClosed{status, ..}
1146                    ))
1147                        if status == Status::ADDRESS_IN_USE
1148                );
1149            })
1150            .await;
1151    }
1152
1153    #[cfg(target_os = "fuchsia")]
1154    #[test]
1155    fn needed_blobs_abort() {
1156        use futures::future::Either;
1157        use futures::pin_mut;
1158        use std::task::Poll;
1159
1160        let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1161
1162        let fut = async {
1163            let (get, pending_get) = PendingGet::new().await;
1164
1165            let abort_fut = get.abort().boxed();
1166            let expect_abort_fut = pending_get.expect_abort();
1167            pin_mut!(expect_abort_fut);
1168
1169            match futures::future::select(abort_fut, expect_abort_fut).await {
1170                Either::Left(((), _expect_abort_fut)) => {
1171                    panic!("abort should wait for the get future to complete")
1172                }
1173                Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1174            }
1175        };
1176        pin_mut!(fut);
1177
1178        let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1179            Poll::Pending => panic!("should complete"),
1180            Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1181        };
1182
1183        // NeededBlobs.Abort should wait until PackageCache.Get returns
1184        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1185        pending_get.fail_the_get();
1186        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1187    }
1188
1189    struct MockNeededBlob {
1190        blob: fio::FileRequestStream,
1191        needed_blobs: fpkg::NeededBlobsRequestStream,
1192    }
1193
1194    impl MockNeededBlob {
1195        fn mock_hash() -> BlobId {
1196            [7; 32].into()
1197        }
1198
1199        fn new() -> (NeededBlob, Self) {
1200            let (blob_proxy, blob) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
1201            let (needed_blobs_proxy, needed_blobs) =
1202                fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1203            (
1204                NeededBlob {
1205                    blob: Blob {
1206                        writer: Box::new(Clone::clone(&blob_proxy)),
1207                        needed_blobs: needed_blobs_proxy,
1208                        blob_id: Self::mock_hash(),
1209                        state: NeedsTruncate,
1210                    },
1211                    closer: BlobCloser { closer: Box::new(blob_proxy), closed: false },
1212                },
1213                Self { blob, needed_blobs },
1214            )
1215        }
1216
1217        async fn fail_truncate(mut self) -> Self {
1218            match self.blob.next().await {
1219                Some(Ok(fio::FileRequest::Resize { length: _, responder })) => {
1220                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1221                }
1222                r => panic!("Unexpected request: {:?}", r),
1223            }
1224            self
1225        }
1226
1227        async fn expect_truncate(mut self, expected_length: u64) -> Self {
1228            match self.blob.next().await {
1229                Some(Ok(fio::FileRequest::Resize { length, responder })) => {
1230                    assert_eq!(length, expected_length);
1231                    responder.send(Ok(())).unwrap();
1232                }
1233                r => panic!("Unexpected request: {:?}", r),
1234            }
1235            self
1236        }
1237
1238        async fn fail_write(mut self) -> Self {
1239            match self.blob.next().await {
1240                Some(Ok(fio::FileRequest::Write { data: _, responder })) => {
1241                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1242                }
1243                r => panic!("Unexpected request: {:?}", r),
1244            }
1245            self
1246        }
1247
1248        async fn expect_write(mut self, expected_payload: &[u8]) -> Self {
1249            match self.blob.next().await {
1250                Some(Ok(fio::FileRequest::Write { data, responder })) => {
1251                    assert_eq!(data, expected_payload);
1252                    responder.send(Ok(data.len() as u64)).unwrap();
1253                }
1254                r => panic!("Unexpected request: {:?}", r),
1255            }
1256            self
1257        }
1258
1259        async fn expect_write_partial(
1260            mut self,
1261            expected_payload: &[u8],
1262            bytes_to_consume: u64,
1263        ) -> Self {
1264            match self.blob.next().await {
1265                Some(Ok(fio::FileRequest::Write { data, responder })) => {
1266                    assert_eq!(data, expected_payload);
1267                    responder.send(Ok(bytes_to_consume)).unwrap();
1268                }
1269                r => panic!("Unexpected request: {:?}", r),
1270            }
1271            self
1272        }
1273
1274        async fn expect_close(mut self) {
1275            match self.blob.next().await {
1276                Some(Ok(fio::FileRequest::Close { responder })) => {
1277                    responder.send(Ok(())).unwrap();
1278                }
1279                r => panic!("Unexpected request: {:?}", r),
1280            }
1281        }
1282
1283        async fn expect_blob_written(mut self) -> Self {
1284            match self.needed_blobs.next().await {
1285                Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1286                    assert_eq!(blob_id, Self::mock_hash().into());
1287                    responder.send(Ok(())).unwrap();
1288                }
1289                r => panic!("Unexpected request: {:?}", r),
1290            }
1291            self
1292        }
1293    }
1294
1295    #[fuchsia_async::run_singlethreaded(test)]
1296    async fn empty_blob_write() {
1297        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1298
1299        let ((), ()) = future::join(
1300            async {
1301                blob_server
1302                    .expect_truncate(0)
1303                    .await
1304                    .expect_blob_written()
1305                    .await
1306                    .expect_close()
1307                    .await;
1308            },
1309            async {
1310                let blob = match blob.truncate(0).await.unwrap() {
1311                    TruncateBlobSuccess::AllWritten(blob) => blob,
1312                    other => panic!("empty blob shouldn't need bytes {other:?}"),
1313                };
1314                let () = blob.blob_written().await.unwrap();
1315                closer.close().await;
1316            },
1317        )
1318        .await;
1319    }
1320
1321    impl TruncateBlobSuccess {
1322        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1323            match self {
1324                TruncateBlobSuccess::NeedsData(blob) => blob,
1325                TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1326            }
1327        }
1328    }
1329
1330    impl BlobWriteSuccess {
1331        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1332            match self {
1333                BlobWriteSuccess::NeedsData(blob) => blob,
1334                BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1335            }
1336        }
1337
1338        fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1339            match self {
1340                BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1341                BlobWriteSuccess::AllWritten(blob) => blob,
1342            }
1343        }
1344    }
1345
1346    #[fuchsia_async::run_singlethreaded(test)]
1347    async fn small_blob_write() {
1348        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1349
1350        let ((), ()) = future::join(
1351            async {
1352                blob_server
1353                    .expect_truncate(4)
1354                    .await
1355                    .expect_write(b"test")
1356                    .await
1357                    .expect_blob_written()
1358                    .await
1359                    .expect_close()
1360                    .await;
1361            },
1362            async {
1363                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1364                let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1365                let () = blob.blob_written().await.unwrap();
1366                closer.close().await;
1367            },
1368        )
1369        .await;
1370    }
1371
1372    #[fuchsia_async::run_singlethreaded(test)]
1373    async fn blob_truncate_no_space() {
1374        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1375
1376        let ((), ()) = future::join(
1377            async {
1378                blob_server.fail_truncate().await;
1379            },
1380            async {
1381                assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1382                closer.close().await;
1383            },
1384        )
1385        .await;
1386    }
1387
1388    #[fuchsia_async::run_singlethreaded(test)]
1389    async fn blob_write_no_space() {
1390        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1391
1392        let ((), ()) = future::join(
1393            async {
1394                blob_server.expect_truncate(4).await.fail_write().await;
1395            },
1396            async {
1397                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1398                assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1399                closer.close().await;
1400            },
1401        )
1402        .await;
1403    }
1404
1405    #[fuchsia_async::run_singlethreaded(test)]
1406    async fn blob_write_server_partial_write() {
1407        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1408
1409        let ((), ()) = future::join(
1410            async {
1411                blob_server
1412                    .expect_truncate(6)
1413                    .await
1414                    .expect_write_partial(b"abc123", 3)
1415                    .await
1416                    .expect_write(b"123")
1417                    .await
1418                    .expect_blob_written()
1419                    .await;
1420            },
1421            async {
1422                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1423                let blob = blob.write(b"abc123").await.unwrap().unwrap_all_written();
1424                let () = blob.blob_written().await.unwrap();
1425                closer.close().await;
1426            },
1427        )
1428        .await;
1429    }
1430
1431    #[fuchsia_async::run_singlethreaded(test)]
1432    async fn blob_write_client_partial_write() {
1433        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1434
1435        let ((), ()) = future::join(
1436            async {
1437                blob_server
1438                    .expect_truncate(6)
1439                    .await
1440                    .expect_write(b"abc")
1441                    .await
1442                    .expect_write(b"123")
1443                    .await
1444                    .expect_blob_written()
1445                    .await;
1446            },
1447            async {
1448                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1449                let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1450                let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1451                let () = blob.blob_written().await.unwrap();
1452                closer.close().await;
1453            },
1454        )
1455        .await;
1456    }
1457
1458    #[fuchsia_async::run_singlethreaded(test)]
1459    async fn blob_write_chunkize_payload() {
1460        const CHUNK_SIZE: usize = fio::MAX_BUF as usize;
1461
1462        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1463
1464        let ((), ()) = future::join(
1465            async {
1466                blob_server
1467                    .expect_truncate(3 * CHUNK_SIZE as u64)
1468                    .await
1469                    .expect_write(&[0u8; CHUNK_SIZE])
1470                    .await
1471                    .expect_write(&[1u8; CHUNK_SIZE])
1472                    .await
1473                    .expect_write(&[2u8; CHUNK_SIZE])
1474                    .await
1475                    .expect_blob_written()
1476                    .await;
1477            },
1478            async {
1479                let payload =
1480                    (0..3).flat_map(|n| std::iter::repeat(n).take(CHUNK_SIZE)).collect::<Vec<u8>>();
1481                let blob = blob.truncate(3 * CHUNK_SIZE as u64).await.unwrap().unwrap_needs_data();
1482                let blob = blob.write(&payload).await.unwrap().unwrap_all_written();
1483                let () = blob.blob_written().await.unwrap();
1484                closer.close().await;
1485            },
1486        )
1487        .await;
1488    }
1489
1490    #[fuchsia_async::run_singlethreaded(test)]
1491    async fn get_already_cached_success() {
1492        let (client, mut server) = MockPackageCache::new();
1493
1494        let ((), ()) = future::join(
1495            async {
1496                server
1497                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1498                    .await
1499                    .finish()
1500                    .close_pkg_dir();
1501                server.expect_closed().await;
1502            },
1503            async move {
1504                let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1505
1506                assert_matches!(
1507                    pkg_dir.into_proxy().take_event_stream().next().await,
1508                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1509                );
1510            },
1511        )
1512        .await;
1513    }
1514
1515    #[fuchsia_async::run_singlethreaded(test)]
1516    async fn get_already_cached_missing_meta_far() {
1517        let (client, mut server) = MockPackageCache::new();
1518
1519        let ((), ()) = future::join(
1520            async {
1521                server
1522                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1523                    .await
1524                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1525                    .await;
1526            },
1527            async move {
1528                assert_matches!(
1529                    client.get_already_cached(blob_id(2)).await,
1530                    Err(GetAlreadyCachedError::MissingMetaFar)
1531                );
1532            },
1533        )
1534        .await;
1535    }
1536
1537    #[fuchsia_async::run_singlethreaded(test)]
1538    async fn get_already_cached_missing_content_blob() {
1539        let (client, mut server) = MockPackageCache::new();
1540
1541        let ((), ()) = future::join(
1542            async {
1543                server
1544                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1545                    .await
1546                    .expect_open_meta_blob(Ok(None))
1547                    .await
1548                    .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1549                        blob_id: [0; 32].into(),
1550                        length: 0,
1551                    }]])
1552                    .await;
1553            },
1554            async move {
1555                assert_matches!(
1556                    client.get_already_cached(blob_id(2)).await,
1557                    Err(GetAlreadyCachedError::MissingContentBlobs(v))
1558                        if v == vec![BlobInfo {
1559                            blob_id: [0; 32].into(),
1560                            length: 0,
1561                        }]
1562                );
1563            },
1564        )
1565        .await;
1566    }
1567}