1#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8use crate::types::{BlobId, BlobInfo};
11use fidl_fuchsia_pkg as fpkg;
12use fuchsia_pkg::PackageDirectory;
13use futures::prelude::*;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use zx_status::Status;
17
18mod storage;
19
20#[derive(Debug, Clone)]
22pub struct Client {
23 proxy: fpkg::PackageCacheProxy,
24}
25
26impl Client {
27 pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
29 Self { proxy }
30 }
31
32 pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
34 &self.proxy
35 }
36
37 pub fn get(
40 &self,
41 meta_far_blob: BlobInfo,
42 gc_protection: fpkg::GcProtection,
43 ) -> Result<Get, fidl::Error> {
44 let (needed_blobs, needed_blobs_server_end) =
45 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
46 let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
47
48 let get_fut = self.proxy.get(
49 &meta_far_blob.into(),
50 gc_protection,
51 needed_blobs_server_end,
52 pkg_dir_server_end,
53 );
54
55 Ok(Get {
56 get_fut,
57 pkg_dir,
58 needed_blobs,
59 pkg_present: SharedBoolEvent::new(),
60 meta_far: meta_far_blob.blob_id,
61 })
62 }
63
64 pub async fn get_already_cached(
74 &self,
75 meta_far_blob: BlobId,
76 ) -> Result<PackageDirectory, GetAlreadyCachedError> {
77 let mut get = self
78 .get(
79 BlobInfo { blob_id: meta_far_blob, length: 0 },
80 fpkg::GcProtection::OpenPackageTracking,
81 )
82 .map_err(GetAlreadyCachedError::Get)?;
83 if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
84 return Err(GetAlreadyCachedError::MissingMetaFar);
85 }
86
87 if let Some(missing_blobs) = get
88 .get_missing_blobs()
89 .try_next()
90 .await
91 .map_err(GetAlreadyCachedError::GetMissingBlobs)?
92 {
93 return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
94 }
95
96 get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
97 }
98
99 pub async fn get_subpackage(
102 &self,
103 superpackage: BlobId,
104 subpackage: &fuchsia_url::RelativePackageUrl,
105 ) -> Result<PackageDirectory, GetSubpackageError> {
106 let (dir, dir_server_end) =
107 PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
108 let () = self
109 .proxy
110 .get_subpackage(
111 &superpackage.into(),
112 &fpkg::PackageUrl { url: subpackage.into() },
113 dir_server_end,
114 )
115 .await
116 .map_err(GetSubpackageError::CallingGetSubpackage)??;
117 Ok(dir)
118 }
119
120 pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
122 let (needed_blobs, needed_blobs_server_end) =
123 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
124
125 let () = self.proxy.write_blobs(needed_blobs_server_end)?;
126
127 Ok(WriteBlobs { needed_blobs })
128 }
129}
130
131#[derive(thiserror::Error, Debug)]
132#[allow(missing_docs)]
133pub enum GetAlreadyCachedError {
134 #[error("calling get")]
135 Get(#[source] fidl::Error),
136
137 #[error("opening meta blob")]
138 OpenMetaBlob(#[source] OpenBlobError),
139
140 #[error("meta.far blob not cached")]
141 MissingMetaFar,
142
143 #[error("getting missing blobs")]
144 GetMissingBlobs(#[source] ListMissingBlobsError),
145
146 #[error("content blobs not cached {0:?}")]
147 MissingContentBlobs(Vec<BlobInfo>),
148
149 #[error("finishing get")]
150 FinishGet(#[source] GetError),
151}
152
153impl GetAlreadyCachedError {
154 pub fn was_not_cached(&self) -> bool {
156 use GetAlreadyCachedError::*;
157 match self {
158 Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
159 MissingMetaFar | MissingContentBlobs(..) => true,
160 }
161 }
162}
163
164#[derive(thiserror::Error, Debug)]
165#[allow(missing_docs)]
166pub enum GetSubpackageError {
167 #[error("creating handles")]
168 CreatingHandles(#[source] fidl::Error),
169
170 #[error("calling GetCached FIDL")]
171 CallingGetSubpackage(#[source] fidl::Error),
172
173 #[error("the superpackage does not have an open package connection")]
174 SuperpackageClosed,
175
176 #[error("the subpackage does not exist")]
177 DoesNotExist,
178
179 #[error("internal")]
180 Internal,
181}
182
183impl From<fpkg::GetSubpackageError> for GetSubpackageError {
184 fn from(fidl: fpkg::GetSubpackageError) -> Self {
185 use fpkg::GetSubpackageError as fErr;
186 use GetSubpackageError::*;
187 match fidl {
188 fErr::SuperpackageClosed => SuperpackageClosed,
189 fErr::DoesNotExist => DoesNotExist,
190 fErr::Internal => Internal,
191 }
192 }
193}
194
195#[derive(Debug, Clone)]
196struct SharedBoolEvent(Arc<AtomicBool>);
197
198impl SharedBoolEvent {
199 fn new() -> Self {
200 Self(Arc::new(AtomicBool::new(false)))
201 }
202
203 fn get(&self) -> bool {
204 self.0.load(Ordering::SeqCst)
205 }
206
207 fn set(&self) {
208 self.0.store(true, Ordering::SeqCst)
209 }
210}
211
212async fn open_blob(
213 needed_blobs: &fpkg::NeededBlobsProxy,
214 kind: OpenKind,
215 blob_id: BlobId,
216 pkg_present: Option<&SharedBoolEvent>,
217) -> Result<Option<NeededBlob>, OpenBlobError> {
218 let open_fut = match kind {
219 OpenKind::Meta => needed_blobs.open_meta_blob(),
220 OpenKind::Content => needed_blobs.open_blob(&blob_id.into()),
221 };
222 match open_fut.await {
223 Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
224 if let Some(pkg_present) = pkg_present {
225 pkg_present.set();
226 }
227 Ok(None)
228 }
229 res => {
230 if let Some(blob) = res?? {
231 let (writer, closer) = self::storage::into_blob_writer_and_closer(*blob)?;
232 Ok(Some(NeededBlob {
233 blob: Blob {
234 writer,
235 needed_blobs: needed_blobs.clone(),
236 blob_id,
237 state: NeedsTruncate,
238 },
239 closer: BlobCloser { closer, closed: false },
240 }))
241 } else {
242 Ok(None)
243 }
244 }
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249enum OpenKind {
250 Meta,
251 Content,
252}
253
254#[derive(Debug)]
256pub struct DeferredOpenBlob {
257 needed_blobs: fpkg::NeededBlobsProxy,
258 kind: OpenKind,
259 blob_id: BlobId,
260 pkg_present: Option<SharedBoolEvent>,
261}
262
263impl DeferredOpenBlob {
264 pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
267 open_blob(&self.needed_blobs, self.kind, self.blob_id, self.pkg_present.as_ref()).await
268 }
269
270 fn proxy_cmp_key(&self) -> u32 {
271 use fidl::endpoints::Proxy;
272 use fidl::AsHandleRef;
273 self.needed_blobs.as_channel().raw_handle()
274 }
275}
276
277impl std::cmp::PartialEq for DeferredOpenBlob {
278 fn eq(&self, other: &Self) -> bool {
279 self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
280 }
281}
282
283impl std::cmp::Eq for DeferredOpenBlob {}
284
285#[derive(Debug)]
291pub struct Get {
292 get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
293 needed_blobs: fpkg::NeededBlobsProxy,
294 pkg_dir: PackageDirectory,
295 pkg_present: SharedBoolEvent,
296 meta_far: BlobId,
297}
298
299impl Get {
300 pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
303 DeferredOpenBlob {
304 needed_blobs: self.needed_blobs.clone(),
305 kind: OpenKind::Meta,
306 blob_id: self.meta_far,
307 pkg_present: Some(self.pkg_present.clone()),
308 }
309 }
310
311 pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
314 open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present)).await
315 }
316
317 fn start_get_missing_blobs(
318 &mut self,
319 ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
320 if self.pkg_present.get() {
321 return Ok(None);
322 }
323
324 let (blob_iterator, blob_iterator_server_end) =
325 fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
326
327 self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
328 Ok(Some(blob_iterator))
329 }
330
331 pub fn get_missing_blobs(
337 &mut self,
338 ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin {
339 match self.start_get_missing_blobs() {
340 Ok(option_iter) => match option_iter {
341 Some(iterator) => crate::fidl_iterator_to_stream(iterator)
342 .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
343 .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
344 .left_stream(),
345 None => futures::stream::empty().right_stream(),
346 }
347 .left_stream(),
348 Err(e) => {
349 futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
350 .right_stream()
351 }
352 }
353 }
354
355 pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
358 DeferredOpenBlob {
359 needed_blobs: self.needed_blobs.clone(),
360 kind: OpenKind::Content,
361 blob_id: content_blob,
362 pkg_present: None,
363 }
364 }
365
366 pub async fn open_blob(
369 &mut self,
370 content_blob: BlobId,
371 ) -> Result<Option<NeededBlob>, OpenBlobError> {
372 open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None).await
373 }
374
375 pub async fn finish(self) -> Result<PackageDirectory, GetError> {
378 drop(self.needed_blobs);
379 let () = self.get_fut.await?.map_err(Status::from_raw)?;
380 Ok(self.pkg_dir)
381 }
382
383 pub async fn abort(self) {
385 self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
386 let _ = self.get_fut.await;
391 }
392}
393
394#[derive(Clone, Debug)]
396pub struct WriteBlobs {
397 needed_blobs: fpkg::NeededBlobsProxy,
398}
399
400impl WriteBlobs {
401 pub fn make_open_blob(&mut self, blob: BlobId) -> DeferredOpenBlob {
404 DeferredOpenBlob {
405 needed_blobs: self.needed_blobs.clone(),
406 kind: OpenKind::Content,
407 blob_id: blob,
408 pkg_present: None,
409 }
410 }
411
412 pub async fn open_blob(&mut self, blob: BlobId) -> Result<Option<NeededBlob>, OpenBlobError> {
414 open_blob(&self.needed_blobs, OpenKind::Content, blob, None).await
415 }
416}
417
418#[derive(Debug)]
420pub struct NeededBlob {
421 pub blob: Blob<NeedsTruncate>,
424
425 pub closer: BlobCloser,
427}
428
429#[derive(Debug)]
432#[must_use = "Subsequent opens of this blob may race with closing this one"]
433pub struct BlobCloser {
434 closer: Box<dyn self::storage::Closer>,
435 closed: bool,
436}
437
438impl BlobCloser {
439 pub async fn close(mut self) {
441 let () = self.closer.close().await;
442 self.closed = true;
443 }
444}
445
446impl Drop for BlobCloser {
447 fn drop(&mut self) {
448 if !self.closed {
449 let () = self.closer.best_effort_close();
450 }
451 }
452}
453
454#[derive(Debug)]
456pub enum TruncateBlobSuccess {
457 NeedsData(Blob<NeedsData>),
459
460 AllWritten(Blob<NeedsBlobWritten>),
463}
464
465#[derive(Debug)]
467pub enum BlobWriteSuccess {
468 NeedsData(Blob<NeedsData>),
470
471 AllWritten(Blob<NeedsBlobWritten>),
474}
475
476#[derive(Debug)]
478pub struct NeedsTruncate;
479
480#[derive(Debug)]
482pub struct NeedsData {
483 size: u64,
484 written: u64,
485}
486
487#[derive(Debug)]
490pub struct NeedsBlobWritten;
491
492#[derive(Debug)]
494#[must_use]
495pub struct Blob<S> {
496 writer: Box<dyn self::storage::Writer>,
497 needed_blobs: fpkg::NeededBlobsProxy,
498 blob_id: BlobId,
499 state: S,
500}
501
502impl Blob<NeedsTruncate> {
503 pub async fn truncate(mut self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
505 let () = self.writer.truncate(size).await?;
506
507 let Self { writer, needed_blobs, blob_id, state: _ } = self;
508
509 Ok(if size == 0 {
510 TruncateBlobSuccess::AllWritten(Blob {
511 writer,
512 needed_blobs,
513 blob_id,
514 state: NeedsBlobWritten,
515 })
516 } else {
517 TruncateBlobSuccess::NeedsData(Blob {
518 writer,
519 needed_blobs,
520 blob_id,
521 state: NeedsData { size, written: 0 },
522 })
523 })
524 }
525}
526
527impl Blob<NeedsData> {
528 pub fn write(
534 self,
535 buf: &[u8],
536 ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
537 self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
538 }
539
540 pub async fn write_with_trace_callbacks(
552 mut self,
553 buf: &[u8],
554 after_write: &(dyn Fn(u64) + Send + Sync),
555 after_write_ack: &(dyn Fn() + Send + Sync),
556 ) -> Result<BlobWriteSuccess, WriteBlobError> {
557 assert!(self.state.written + buf.len() as u64 <= self.state.size);
558
559 let () = self.writer.write(buf, after_write, after_write_ack).await?;
560 self.state.written += buf.len() as u64;
561
562 if self.state.written == self.state.size {
563 let Self { writer, needed_blobs, blob_id, state: _ } = self;
564 Ok(BlobWriteSuccess::AllWritten(Blob {
565 writer,
566 needed_blobs,
567 blob_id,
568 state: NeedsBlobWritten,
569 }))
570 } else {
571 Ok(BlobWriteSuccess::NeedsData(self))
572 }
573 }
574}
575
576impl Blob<NeedsBlobWritten> {
577 pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
579 Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
580 }
581}
582
583#[derive(Debug, thiserror::Error)]
585#[allow(missing_docs)]
586pub enum OpenError {
587 #[error("the package does not exist")]
588 NotFound,
589
590 #[error("Open() responded with an unexpected status")]
591 UnexpectedResponse(#[source] Status),
592
593 #[error("transport error")]
594 Fidl(#[from] fidl::Error),
595}
596#[derive(Debug, thiserror::Error)]
598#[allow(missing_docs)]
599pub enum GetError {
600 #[error("Get() responded with an unexpected status")]
601 UnexpectedResponse(#[from] Status),
602
603 #[error("transport error")]
604 Fidl(#[from] fidl::Error),
605}
606
607#[derive(Debug, thiserror::Error)]
609#[allow(missing_docs)]
610pub enum OpenBlobError {
611 #[error("there is insufficient storage space available to persist this blob")]
612 OutOfSpace,
613
614 #[error("this blob is already open for write by another cache operation")]
615 ConcurrentWrite,
616
617 #[error("an unspecified error occurred during underlying I/O")]
618 UnspecifiedIo,
619
620 #[error("an unspecified error occurred")]
621 Internal,
622
623 #[error("transport error")]
624 Fidl(#[from] fidl::Error),
625}
626
627impl From<fpkg::OpenBlobError> for OpenBlobError {
628 fn from(e: fpkg::OpenBlobError) -> Self {
629 match e {
630 fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
631 fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
632 fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
633 fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
634 }
635 }
636}
637
638#[derive(Debug, thiserror::Error)]
640#[allow(missing_docs)]
641pub enum ListMissingBlobsError {
642 #[error("while obtaining the missing blobs fidl iterator")]
643 CallGetMissingBlobs(#[source] fidl::Error),
644
645 #[error("while obtaining the next chunk of blobs from the fidl iterator")]
646 CallNextOnBlobIterator(#[source] fidl::Error),
647}
648
649#[derive(Debug, thiserror::Error)]
651#[allow(missing_docs)]
652pub enum TruncateBlobError {
653 #[error("insufficient storage space is available")]
654 NoSpace,
655
656 #[error("Truncate() responded with an unexpected status")]
657 UnexpectedResponse(#[source] Status),
658
659 #[error("transport error")]
660 Fidl(#[from] fidl::Error),
661
662 #[error("already truncated, currently in state {0}")]
663 AlreadyTruncated(&'static str),
664
665 #[error("unspecified error")]
667 Other(#[source] anyhow::Error),
668
669 #[error("blob is in an invalid state")]
670 BadState,
671}
672
673#[derive(Debug, thiserror::Error)]
675#[allow(missing_docs)]
676pub enum WriteBlobError {
677 #[error("file endpoint reported it wrote more bytes than were actually provided to the file endpoint")]
678 Overwrite,
679
680 #[error("the written data was corrupt")]
681 Corrupt,
682
683 #[error("insufficient storage space is available")]
684 NoSpace,
685
686 #[error("Write() responded with an unexpected status")]
687 UnexpectedResponse(#[source] Status),
688
689 #[error("transport error")]
690 Fidl(#[from] fidl::Error),
691
692 #[error("bytes were written but not needed in state {0}")]
693 BytesNotNeeded(&'static str),
694
695 #[error("while using the fxblob writer")]
696 FxBlob(#[source] blob_writer::WriteError),
697}
698
699#[derive(Debug, thiserror::Error)]
701#[allow(missing_docs)]
702pub enum BlobWrittenError {
703 #[error("pkg-cache could not find the blob after it was successfully written")]
704 MissingAfterWritten,
705
706 #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
707 UnopenedBlob,
708
709 #[error("transport error")]
710 Fidl(#[from] fidl::Error),
711}
712
713impl From<fpkg::BlobWrittenError> for BlobWrittenError {
714 fn from(e: fpkg::BlobWrittenError) -> Self {
715 match e {
716 fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
717 fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
718 }
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725 use assert_matches::assert_matches;
726 use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
727 use fidl_fuchsia_io as fio;
728 use fidl_fuchsia_pkg::{
729 BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
730 PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
731 PackageCacheRequestStream,
732 };
733
734 struct MockPackageCache {
735 stream: PackageCacheRequestStream,
736 }
737
738 impl MockPackageCache {
739 fn new() -> (Client, Self) {
740 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
741 (Client::from_proxy(proxy), Self { stream })
742 }
743
744 async fn expect_get(
745 &mut self,
746 blob_info: BlobInfo,
747 expected_gc_protection: fpkg::GcProtection,
748 ) -> PendingGet {
749 match self.stream.next().await {
750 Some(Ok(PackageCacheRequest::Get {
751 meta_far_blob,
752 gc_protection,
753 needed_blobs,
754 dir,
755 responder,
756 })) => {
757 assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
758 assert_eq!(gc_protection, expected_gc_protection);
759 let needed_blobs = needed_blobs.into_stream();
760 let dir = dir.into_stream();
761
762 PendingGet { stream: needed_blobs, dir, responder }
763 }
764 r => panic!("Unexpected request: {:?}", r),
765 }
766 }
767
768 async fn expect_closed(mut self) {
769 assert_matches!(self.stream.next().await, None);
770 }
771 }
772
773 struct PendingGet {
774 stream: NeededBlobsRequestStream,
775 dir: fio::DirectoryRequestStream,
776 responder: PackageCacheGetResponder,
777 }
778
779 impl PendingGet {
780 async fn new() -> (Get, PendingGet) {
781 let (client, mut server) = MockPackageCache::new();
782
783 let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
784 let pending_get =
785 server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
786 (get, pending_get)
787 }
788
789 fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
790 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
791 self.responder.send(Ok(())).unwrap();
792 (self.stream, PackageDirProvider { stream: self.dir })
793 }
794
795 fn finish(self) -> PackageDirProvider {
796 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
797 self.responder.send(Ok(())).unwrap();
798 PackageDirProvider { stream: self.dir }
799 }
800
801 #[cfg(target_os = "fuchsia")]
802 fn fail_the_get(self) {
803 self.responder
804 .send(Err(Status::IO_INVALID.into_raw()))
805 .expect("client should be waiting");
806 }
807
808 async fn expect_open_meta_blob(
809 mut self,
810 res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
811 ) -> Self {
812 match self.stream.next().await {
813 Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
814 responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
815 }
816 r => panic!("Unexpected request: {:?}", r),
817 }
818 self
819 }
820
821 async fn expect_open_blob(
822 mut self,
823 expected_blob_id: BlobId,
824 res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
825 ) -> Self {
826 match self.stream.next().await {
827 Some(Ok(NeededBlobsRequest::OpenBlob { blob_id, responder })) => {
828 assert_eq!(BlobId::from(blob_id), expected_blob_id);
829 responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
830 }
831 r => panic!("Unexpected request: {:?}", r),
832 }
833 self
834 }
835
836 async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
837 match self.stream.next().await {
838 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
839 let mut stream = iterator.into_stream();
840
841 for chunk in response_chunks {
843 let chunk = chunk
844 .into_iter()
845 .map(fidl_fuchsia_pkg::BlobInfo::from)
846 .collect::<Vec<_>>();
847
848 let BlobInfoIteratorRequest::Next { responder } =
849 stream.next().await.unwrap().unwrap();
850 responder.send(&chunk).unwrap();
851 }
852
853 let BlobInfoIteratorRequest::Next { responder } =
855 stream.next().await.unwrap().unwrap();
856 responder.send(&[]).unwrap();
857
858 assert_matches!(stream.next().await, None);
860 }
861 r => panic!("Unexpected request: {:?}", r),
862 }
863 self
864 }
865
866 async fn expect_get_missing_blobs_client_closes_channel(
867 mut self,
868 response_chunks: Vec<Vec<BlobInfo>>,
869 ) -> Self {
870 match self.stream.next().await {
871 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
872 let mut stream = iterator.into_stream();
873
874 for chunk in response_chunks {
876 let chunk = chunk
877 .into_iter()
878 .map(fidl_fuchsia_pkg::BlobInfo::from)
879 .collect::<Vec<_>>();
880
881 let BlobInfoIteratorRequest::Next { responder } =
882 stream.next().await.unwrap().unwrap();
883 responder.send(&chunk).unwrap();
884 }
885
886 assert_matches!(stream.next().await, None);
888 }
889 r => panic!("Unexpected request: {:?}", r),
890 }
891 self
892 }
893
894 async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
895 match self.stream.next().await {
896 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
897 iterator
898 .into_stream_and_control_handle()
899 .1
900 .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
901 }
902 r => panic!("Unexpected request: {:?}", r),
903 }
904 self
905 }
906
907 #[cfg(target_os = "fuchsia")]
908 async fn expect_abort(mut self) -> Self {
909 match self.stream.next().await {
910 Some(Ok(NeededBlobsRequest::Abort { responder })) => {
911 responder.send().unwrap();
912 }
913 r => panic!("Unexpected request: {:?}", r),
914 }
915 self
916 }
917 }
918
919 struct PackageDirProvider {
920 stream: fio::DirectoryRequestStream,
921 }
922
923 impl PackageDirProvider {
924 fn close_pkg_dir(self) {
925 self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
926 }
927 }
928
929 fn blob_id(n: u8) -> BlobId {
930 BlobId::from([n; 32])
931 }
932
933 fn blob_info(n: u8) -> BlobInfo {
934 BlobInfo { blob_id: blob_id(n), length: 0 }
935 }
936
937 #[fuchsia_async::run_singlethreaded(test)]
938 async fn constructor() {
939 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
940 let client = Client::from_proxy(proxy);
941
942 drop(stream);
943 assert_matches!(client.proxy().sync().await, Err(_));
944 }
945
946 #[fuchsia_async::run_singlethreaded(test)]
947 async fn get_present_package() {
948 let (client, mut server) = MockPackageCache::new();
949
950 let ((), ()) = future::join(
951 async {
952 server
953 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
954 .await
955 .finish()
956 .close_pkg_dir();
957 server.expect_closed().await;
958 },
959 async move {
960 let mut get =
961 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
962
963 assert_matches!(get.open_meta_blob().await.unwrap(), None);
964 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
965 let pkg_dir = get.finish().await.unwrap();
966
967 assert_matches!(
968 pkg_dir.into_proxy().take_event_stream().next().await,
969 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
970 );
971 },
972 )
973 .await;
974 }
975
976 #[fuchsia_async::run_singlethreaded(test)]
977 async fn get_present_package_handles_slow_stream_close() {
978 let (client, mut server) = MockPackageCache::new();
979
980 let (send, recv) = futures::channel::oneshot::channel::<()>();
981
982 let ((), ()) = future::join(
983 async {
984 let (needed_blobs_stream, pkg_dir) = server
985 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
986 .await
987 .finish_hold_stream_open();
988 pkg_dir.close_pkg_dir();
989
990 let _ = recv.await;
992 drop(needed_blobs_stream);
993 },
994 async move {
995 let mut get =
996 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
997
998 assert_matches!(get.open_meta_blob().await.unwrap(), None);
999
1000 let missing_blobs_stream = get.get_missing_blobs();
1004 drop(send);
1005 assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
1006 let pkg_dir = get.finish().await.unwrap();
1007
1008 assert_matches!(
1009 pkg_dir.into_proxy().take_event_stream().next().await,
1010 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1011 );
1012 },
1013 )
1014 .await;
1015 }
1016
1017 #[fuchsia_async::run_singlethreaded(test)]
1018 async fn needed_blobs_open_meta_far() {
1019 let (mut get, pending_get) = PendingGet::new().await;
1020
1021 let ((), ()) = future::join(
1022 async {
1023 pending_get
1024 .expect_open_meta_blob(Ok(None))
1025 .await
1026 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1027 .await
1028 .expect_open_meta_blob(Ok(None))
1029 .await
1030 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1031 .await
1032 .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
1033 .await
1034 .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1035 .await;
1036 },
1037 async {
1038 {
1039 let opener = get.make_open_meta_blob();
1040 assert_matches!(opener.open().await.unwrap(), None);
1041 assert_matches!(opener.open().await.unwrap(), Some(_));
1042 }
1043 assert_matches!(get.open_meta_blob().await.unwrap(), None);
1044 assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1045 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1046 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1047 },
1048 )
1049 .await;
1050 }
1051
1052 #[fuchsia_async::run_singlethreaded(test)]
1053 async fn needed_blobs_open_content_blob() {
1054 let (mut get, pending_get) = PendingGet::new().await;
1055
1056 let ((), ()) = future::join(
1057 async {
1058 pending_get
1059 .expect_open_blob(blob_id(2), Ok(None))
1060 .await
1061 .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1062 .await
1063 .expect_open_blob(blob_id(10), Ok(None))
1064 .await
1065 .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1066 .await
1067 .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1068 .await
1069 .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1070 .await;
1071 },
1072 async {
1073 {
1074 let opener = get.make_open_blob(blob_id(2));
1075 assert_matches!(opener.open().await.unwrap(), None);
1076 assert_matches!(opener.open().await.unwrap(), Some(_));
1077 }
1078 assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1079 assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1080 assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1081 assert_matches!(
1082 get.open_blob(blob_id(13),).await,
1083 Err(OpenBlobError::UnspecifiedIo)
1084 );
1085 },
1086 )
1087 .await;
1088 }
1089
1090 #[fuchsia_async::run_singlethreaded(test)]
1091 async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1092 let (mut get, pending_get) = PendingGet::new().await;
1093 let _ = pending_get.finish();
1094
1095 assert_matches!(get.open_meta_blob().await, Ok(None));
1096 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1097 }
1098
1099 #[fuchsia_async::run_singlethreaded(test)]
1100 async fn needed_blobs_get_missing_blobs() {
1101 let (mut get, pending_get) = PendingGet::new().await;
1102
1103 let ((), ()) = future::join(
1104 async {
1105 pending_get
1106 .expect_get_missing_blobs(vec![
1107 vec![blob_info(1), blob_info(2)],
1108 vec![blob_info(3)],
1109 ])
1110 .await;
1111 },
1112 async {
1113 assert_eq!(
1114 get.get_missing_blobs().try_concat().await.unwrap(),
1115 vec![blob_info(1), blob_info(2), blob_info(3)]
1116 );
1117 },
1118 )
1119 .await;
1120 }
1121
1122 #[fuchsia_async::run_singlethreaded(test)]
1123 async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1124 let (mut get, pending_get) = PendingGet::new().await;
1125 drop(pending_get);
1126
1127 assert_matches!(
1128 get.get_missing_blobs().try_concat().await,
1129 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1130 fidl::Error::ClientChannelClosed{status, ..})
1131 )
1132 if status == Status::PEER_CLOSED
1133 );
1134 }
1135
1136 #[fuchsia_async::run_singlethreaded(test)]
1137 async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1138 let (mut get, pending_get) = PendingGet::new().await;
1139
1140 let (_, ()) =
1141 future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1142 assert_matches!(
1143 get.get_missing_blobs().try_concat().await,
1144 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1145 fidl::Error::ClientChannelClosed{status, ..}
1146 ))
1147 if status == Status::ADDRESS_IN_USE
1148 );
1149 })
1150 .await;
1151 }
1152
1153 #[cfg(target_os = "fuchsia")]
1154 #[test]
1155 fn needed_blobs_abort() {
1156 use futures::future::Either;
1157 use futures::pin_mut;
1158 use std::task::Poll;
1159
1160 let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1161
1162 let fut = async {
1163 let (get, pending_get) = PendingGet::new().await;
1164
1165 let abort_fut = get.abort().boxed();
1166 let expect_abort_fut = pending_get.expect_abort();
1167 pin_mut!(expect_abort_fut);
1168
1169 match futures::future::select(abort_fut, expect_abort_fut).await {
1170 Either::Left(((), _expect_abort_fut)) => {
1171 panic!("abort should wait for the get future to complete")
1172 }
1173 Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1174 }
1175 };
1176 pin_mut!(fut);
1177
1178 let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1179 Poll::Pending => panic!("should complete"),
1180 Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1181 };
1182
1183 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1185 pending_get.fail_the_get();
1186 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1187 }
1188
1189 struct MockNeededBlob {
1190 blob: fio::FileRequestStream,
1191 needed_blobs: fpkg::NeededBlobsRequestStream,
1192 }
1193
1194 impl MockNeededBlob {
1195 fn mock_hash() -> BlobId {
1196 [7; 32].into()
1197 }
1198
1199 fn new() -> (NeededBlob, Self) {
1200 let (blob_proxy, blob) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
1201 let (needed_blobs_proxy, needed_blobs) =
1202 fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1203 (
1204 NeededBlob {
1205 blob: Blob {
1206 writer: Box::new(Clone::clone(&blob_proxy)),
1207 needed_blobs: needed_blobs_proxy,
1208 blob_id: Self::mock_hash(),
1209 state: NeedsTruncate,
1210 },
1211 closer: BlobCloser { closer: Box::new(blob_proxy), closed: false },
1212 },
1213 Self { blob, needed_blobs },
1214 )
1215 }
1216
1217 async fn fail_truncate(mut self) -> Self {
1218 match self.blob.next().await {
1219 Some(Ok(fio::FileRequest::Resize { length: _, responder })) => {
1220 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1221 }
1222 r => panic!("Unexpected request: {:?}", r),
1223 }
1224 self
1225 }
1226
1227 async fn expect_truncate(mut self, expected_length: u64) -> Self {
1228 match self.blob.next().await {
1229 Some(Ok(fio::FileRequest::Resize { length, responder })) => {
1230 assert_eq!(length, expected_length);
1231 responder.send(Ok(())).unwrap();
1232 }
1233 r => panic!("Unexpected request: {:?}", r),
1234 }
1235 self
1236 }
1237
1238 async fn fail_write(mut self) -> Self {
1239 match self.blob.next().await {
1240 Some(Ok(fio::FileRequest::Write { data: _, responder })) => {
1241 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1242 }
1243 r => panic!("Unexpected request: {:?}", r),
1244 }
1245 self
1246 }
1247
1248 async fn expect_write(mut self, expected_payload: &[u8]) -> Self {
1249 match self.blob.next().await {
1250 Some(Ok(fio::FileRequest::Write { data, responder })) => {
1251 assert_eq!(data, expected_payload);
1252 responder.send(Ok(data.len() as u64)).unwrap();
1253 }
1254 r => panic!("Unexpected request: {:?}", r),
1255 }
1256 self
1257 }
1258
1259 async fn expect_write_partial(
1260 mut self,
1261 expected_payload: &[u8],
1262 bytes_to_consume: u64,
1263 ) -> Self {
1264 match self.blob.next().await {
1265 Some(Ok(fio::FileRequest::Write { data, responder })) => {
1266 assert_eq!(data, expected_payload);
1267 responder.send(Ok(bytes_to_consume)).unwrap();
1268 }
1269 r => panic!("Unexpected request: {:?}", r),
1270 }
1271 self
1272 }
1273
1274 async fn expect_close(mut self) {
1275 match self.blob.next().await {
1276 Some(Ok(fio::FileRequest::Close { responder })) => {
1277 responder.send(Ok(())).unwrap();
1278 }
1279 r => panic!("Unexpected request: {:?}", r),
1280 }
1281 }
1282
1283 async fn expect_blob_written(mut self) -> Self {
1284 match self.needed_blobs.next().await {
1285 Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1286 assert_eq!(blob_id, Self::mock_hash().into());
1287 responder.send(Ok(())).unwrap();
1288 }
1289 r => panic!("Unexpected request: {:?}", r),
1290 }
1291 self
1292 }
1293 }
1294
1295 #[fuchsia_async::run_singlethreaded(test)]
1296 async fn empty_blob_write() {
1297 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1298
1299 let ((), ()) = future::join(
1300 async {
1301 blob_server
1302 .expect_truncate(0)
1303 .await
1304 .expect_blob_written()
1305 .await
1306 .expect_close()
1307 .await;
1308 },
1309 async {
1310 let blob = match blob.truncate(0).await.unwrap() {
1311 TruncateBlobSuccess::AllWritten(blob) => blob,
1312 other => panic!("empty blob shouldn't need bytes {other:?}"),
1313 };
1314 let () = blob.blob_written().await.unwrap();
1315 closer.close().await;
1316 },
1317 )
1318 .await;
1319 }
1320
1321 impl TruncateBlobSuccess {
1322 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1323 match self {
1324 TruncateBlobSuccess::NeedsData(blob) => blob,
1325 TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1326 }
1327 }
1328 }
1329
1330 impl BlobWriteSuccess {
1331 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1332 match self {
1333 BlobWriteSuccess::NeedsData(blob) => blob,
1334 BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1335 }
1336 }
1337
1338 fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1339 match self {
1340 BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1341 BlobWriteSuccess::AllWritten(blob) => blob,
1342 }
1343 }
1344 }
1345
1346 #[fuchsia_async::run_singlethreaded(test)]
1347 async fn small_blob_write() {
1348 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1349
1350 let ((), ()) = future::join(
1351 async {
1352 blob_server
1353 .expect_truncate(4)
1354 .await
1355 .expect_write(b"test")
1356 .await
1357 .expect_blob_written()
1358 .await
1359 .expect_close()
1360 .await;
1361 },
1362 async {
1363 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1364 let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1365 let () = blob.blob_written().await.unwrap();
1366 closer.close().await;
1367 },
1368 )
1369 .await;
1370 }
1371
1372 #[fuchsia_async::run_singlethreaded(test)]
1373 async fn blob_truncate_no_space() {
1374 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1375
1376 let ((), ()) = future::join(
1377 async {
1378 blob_server.fail_truncate().await;
1379 },
1380 async {
1381 assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1382 closer.close().await;
1383 },
1384 )
1385 .await;
1386 }
1387
1388 #[fuchsia_async::run_singlethreaded(test)]
1389 async fn blob_write_no_space() {
1390 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1391
1392 let ((), ()) = future::join(
1393 async {
1394 blob_server.expect_truncate(4).await.fail_write().await;
1395 },
1396 async {
1397 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1398 assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1399 closer.close().await;
1400 },
1401 )
1402 .await;
1403 }
1404
1405 #[fuchsia_async::run_singlethreaded(test)]
1406 async fn blob_write_server_partial_write() {
1407 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1408
1409 let ((), ()) = future::join(
1410 async {
1411 blob_server
1412 .expect_truncate(6)
1413 .await
1414 .expect_write_partial(b"abc123", 3)
1415 .await
1416 .expect_write(b"123")
1417 .await
1418 .expect_blob_written()
1419 .await;
1420 },
1421 async {
1422 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1423 let blob = blob.write(b"abc123").await.unwrap().unwrap_all_written();
1424 let () = blob.blob_written().await.unwrap();
1425 closer.close().await;
1426 },
1427 )
1428 .await;
1429 }
1430
1431 #[fuchsia_async::run_singlethreaded(test)]
1432 async fn blob_write_client_partial_write() {
1433 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1434
1435 let ((), ()) = future::join(
1436 async {
1437 blob_server
1438 .expect_truncate(6)
1439 .await
1440 .expect_write(b"abc")
1441 .await
1442 .expect_write(b"123")
1443 .await
1444 .expect_blob_written()
1445 .await;
1446 },
1447 async {
1448 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1449 let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1450 let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1451 let () = blob.blob_written().await.unwrap();
1452 closer.close().await;
1453 },
1454 )
1455 .await;
1456 }
1457
1458 #[fuchsia_async::run_singlethreaded(test)]
1459 async fn blob_write_chunkize_payload() {
1460 const CHUNK_SIZE: usize = fio::MAX_BUF as usize;
1461
1462 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1463
1464 let ((), ()) = future::join(
1465 async {
1466 blob_server
1467 .expect_truncate(3 * CHUNK_SIZE as u64)
1468 .await
1469 .expect_write(&[0u8; CHUNK_SIZE])
1470 .await
1471 .expect_write(&[1u8; CHUNK_SIZE])
1472 .await
1473 .expect_write(&[2u8; CHUNK_SIZE])
1474 .await
1475 .expect_blob_written()
1476 .await;
1477 },
1478 async {
1479 let payload =
1480 (0..3).flat_map(|n| std::iter::repeat(n).take(CHUNK_SIZE)).collect::<Vec<u8>>();
1481 let blob = blob.truncate(3 * CHUNK_SIZE as u64).await.unwrap().unwrap_needs_data();
1482 let blob = blob.write(&payload).await.unwrap().unwrap_all_written();
1483 let () = blob.blob_written().await.unwrap();
1484 closer.close().await;
1485 },
1486 )
1487 .await;
1488 }
1489
1490 #[fuchsia_async::run_singlethreaded(test)]
1491 async fn get_already_cached_success() {
1492 let (client, mut server) = MockPackageCache::new();
1493
1494 let ((), ()) = future::join(
1495 async {
1496 server
1497 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1498 .await
1499 .finish()
1500 .close_pkg_dir();
1501 server.expect_closed().await;
1502 },
1503 async move {
1504 let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1505
1506 assert_matches!(
1507 pkg_dir.into_proxy().take_event_stream().next().await,
1508 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1509 );
1510 },
1511 )
1512 .await;
1513 }
1514
1515 #[fuchsia_async::run_singlethreaded(test)]
1516 async fn get_already_cached_missing_meta_far() {
1517 let (client, mut server) = MockPackageCache::new();
1518
1519 let ((), ()) = future::join(
1520 async {
1521 server
1522 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1523 .await
1524 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1525 .await;
1526 },
1527 async move {
1528 assert_matches!(
1529 client.get_already_cached(blob_id(2)).await,
1530 Err(GetAlreadyCachedError::MissingMetaFar)
1531 );
1532 },
1533 )
1534 .await;
1535 }
1536
1537 #[fuchsia_async::run_singlethreaded(test)]
1538 async fn get_already_cached_missing_content_blob() {
1539 let (client, mut server) = MockPackageCache::new();
1540
1541 let ((), ()) = future::join(
1542 async {
1543 server
1544 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1545 .await
1546 .expect_open_meta_blob(Ok(None))
1547 .await
1548 .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1549 blob_id: [0; 32].into(),
1550 length: 0,
1551 }]])
1552 .await;
1553 },
1554 async move {
1555 assert_matches!(
1556 client.get_already_cached(blob_id(2)).await,
1557 Err(GetAlreadyCachedError::MissingContentBlobs(v))
1558 if v == vec![BlobInfo {
1559 blob_id: [0; 32].into(),
1560 length: 0,
1561 }]
1562 );
1563 },
1564 )
1565 .await;
1566 }
1567}