1#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8use crate::types::{BlobId, BlobInfo};
11use fuchsia_pkg::PackageDirectory;
12use futures::prelude::*;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use zx_status::Status;
16use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_pkg as fpkg};
17
18#[derive(Debug, Clone)]
20pub struct Client {
21 proxy: fpkg::PackageCacheProxy,
22}
23
24impl Client {
25 pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
27 Self { proxy }
28 }
29
30 pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
32 &self.proxy
33 }
34
35 pub fn get(
38 &self,
39 meta_far_blob: BlobInfo,
40 gc_protection: fpkg::GcProtection,
41 ) -> Result<Get, fidl::Error> {
42 let (needed_blobs, needed_blobs_server_end) =
43 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
44 let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
45
46 let get_fut = self.proxy.get(
47 &meta_far_blob.into(),
48 gc_protection,
49 needed_blobs_server_end,
50 pkg_dir_server_end,
51 );
52
53 Ok(Get {
54 get_fut,
55 pkg_dir,
56 needed_blobs,
57 pkg_present: SharedBoolEvent::new(),
58 meta_far: meta_far_blob.blob_id,
59 })
60 }
61
62 pub async fn get_already_cached(
72 &self,
73 meta_far_blob: BlobId,
74 ) -> Result<PackageDirectory, GetAlreadyCachedError> {
75 let mut get = self
76 .get(
77 BlobInfo { blob_id: meta_far_blob, length: 0 },
78 fpkg::GcProtection::OpenPackageTracking,
79 )
80 .map_err(GetAlreadyCachedError::Get)?;
81 if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
82 return Err(GetAlreadyCachedError::MissingMetaFar);
83 }
84
85 if let Some(missing_blobs) = get
86 .get_missing_blobs()
87 .try_next()
88 .await
89 .map_err(GetAlreadyCachedError::GetMissingBlobs)?
90 {
91 return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
92 }
93
94 get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
95 }
96
97 pub async fn get_subpackage(
100 &self,
101 superpackage: BlobId,
102 subpackage: &fuchsia_url::RelativePackageUrl,
103 ) -> Result<PackageDirectory, GetSubpackageError> {
104 let (dir, dir_server_end) =
105 PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
106 let () = self
107 .proxy
108 .get_subpackage(
109 &superpackage.into(),
110 &fpkg::PackageUrl { url: subpackage.into() },
111 dir_server_end,
112 )
113 .await
114 .map_err(GetSubpackageError::CallingGetSubpackage)??;
115 Ok(dir)
116 }
117
118 pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
120 let (needed_blobs, needed_blobs_server_end) =
121 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
122
123 let () = self.proxy.write_blobs(needed_blobs_server_end)?;
124
125 Ok(WriteBlobs { needed_blobs })
126 }
127}
128
129#[derive(thiserror::Error, Debug)]
130#[allow(missing_docs)]
131pub enum GetAlreadyCachedError {
132 #[error("calling get")]
133 Get(#[source] fidl::Error),
134
135 #[error("opening meta blob")]
136 OpenMetaBlob(#[source] OpenBlobError),
137
138 #[error("meta.far blob not cached")]
139 MissingMetaFar,
140
141 #[error("getting missing blobs")]
142 GetMissingBlobs(#[source] ListMissingBlobsError),
143
144 #[error("content blobs not cached {0:?}")]
145 MissingContentBlobs(Vec<BlobInfo>),
146
147 #[error("finishing get")]
148 FinishGet(#[source] GetError),
149}
150
151impl GetAlreadyCachedError {
152 pub fn was_not_cached(&self) -> bool {
154 use GetAlreadyCachedError::*;
155 match self {
156 Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
157 MissingMetaFar | MissingContentBlobs(..) => true,
158 }
159 }
160}
161
162#[derive(thiserror::Error, Debug)]
163#[allow(missing_docs)]
164pub enum GetSubpackageError {
165 #[error("creating handles")]
166 CreatingHandles(#[source] fidl::Error),
167
168 #[error("calling GetCached FIDL")]
169 CallingGetSubpackage(#[source] fidl::Error),
170
171 #[error("the superpackage does not have an open package connection")]
172 SuperpackageClosed,
173
174 #[error("the subpackage does not exist")]
175 DoesNotExist,
176
177 #[error("internal")]
178 Internal,
179}
180
181impl From<fpkg::GetSubpackageError> for GetSubpackageError {
182 fn from(fidl: fpkg::GetSubpackageError) -> Self {
183 use GetSubpackageError::*;
184 use fpkg::GetSubpackageError as fErr;
185 match fidl {
186 fErr::SuperpackageClosed => SuperpackageClosed,
187 fErr::DoesNotExist => DoesNotExist,
188 fErr::Internal => Internal,
189 }
190 }
191}
192
193#[derive(Debug, Clone)]
194struct SharedBoolEvent(Arc<AtomicBool>);
195
196impl SharedBoolEvent {
197 fn new() -> Self {
198 Self(Arc::new(AtomicBool::new(false)))
199 }
200
201 fn get(&self) -> bool {
202 self.0.load(Ordering::SeqCst)
203 }
204
205 fn set(&self) {
206 self.0.store(true, Ordering::SeqCst)
207 }
208}
209
210async fn open_blob(
211 needed_blobs: &fpkg::NeededBlobsProxy,
212 kind: OpenKind,
213 blob_id: BlobId,
214 pkg_present: Option<&SharedBoolEvent>,
215) -> Result<Option<NeededBlob>, OpenBlobError> {
216 let open_fut = match kind {
217 OpenKind::Meta => needed_blobs.open_meta_blob(),
218 OpenKind::Content => needed_blobs.open_blob(&blob_id.into()),
219 };
220 match open_fut.await {
221 Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
222 if let Some(pkg_present) = pkg_present {
223 pkg_present.set();
224 }
225 Ok(None)
226 }
227 res => {
228 if let Some(blob) = res?? {
229 Ok(Some(NeededBlob {
230 blob: Blob {
231 needed_blobs: needed_blobs.clone(),
232 blob_id,
233 state: NeedsTruncate(blob),
234 },
235 }))
236 } else {
237 Ok(None)
238 }
239 }
240 }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244enum OpenKind {
245 Meta,
246 Content,
247}
248
249#[derive(Debug)]
251pub struct DeferredOpenBlob {
252 needed_blobs: fpkg::NeededBlobsProxy,
253 kind: OpenKind,
254 blob_id: BlobId,
255 pkg_present: Option<SharedBoolEvent>,
256}
257
258impl DeferredOpenBlob {
259 pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
262 open_blob(&self.needed_blobs, self.kind, self.blob_id, self.pkg_present.as_ref()).await
263 }
264
265 fn proxy_cmp_key(&self) -> u32 {
266 use fidl::AsHandleRef;
267 use fidl::endpoints::Proxy;
268 self.needed_blobs.as_channel().raw_handle()
269 }
270}
271
272impl std::cmp::PartialEq for DeferredOpenBlob {
273 fn eq(&self, other: &Self) -> bool {
274 self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
275 }
276}
277
278impl std::cmp::Eq for DeferredOpenBlob {}
279
280#[derive(Debug)]
286pub struct Get {
287 get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
288 needed_blobs: fpkg::NeededBlobsProxy,
289 pkg_dir: PackageDirectory,
290 pkg_present: SharedBoolEvent,
291 meta_far: BlobId,
292}
293
294impl Get {
295 pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
298 DeferredOpenBlob {
299 needed_blobs: self.needed_blobs.clone(),
300 kind: OpenKind::Meta,
301 blob_id: self.meta_far,
302 pkg_present: Some(self.pkg_present.clone()),
303 }
304 }
305
306 pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
309 open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present)).await
310 }
311
312 fn start_get_missing_blobs(
313 &mut self,
314 ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
315 if self.pkg_present.get() {
316 return Ok(None);
317 }
318
319 let (blob_iterator, blob_iterator_server_end) =
320 fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
321
322 self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
323 Ok(Some(blob_iterator))
324 }
325
326 pub fn get_missing_blobs(
332 &mut self,
333 ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin {
334 match self.start_get_missing_blobs() {
335 Ok(option_iter) => match option_iter {
336 Some(iterator) => crate::fidl_iterator_to_stream(iterator)
337 .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
338 .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
339 .left_stream(),
340 None => futures::stream::empty().right_stream(),
341 }
342 .left_stream(),
343 Err(e) => {
344 futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
345 .right_stream()
346 }
347 }
348 }
349
350 pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
353 DeferredOpenBlob {
354 needed_blobs: self.needed_blobs.clone(),
355 kind: OpenKind::Content,
356 blob_id: content_blob,
357 pkg_present: None,
358 }
359 }
360
361 pub async fn open_blob(
364 &mut self,
365 content_blob: BlobId,
366 ) -> Result<Option<NeededBlob>, OpenBlobError> {
367 open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None).await
368 }
369
370 pub async fn finish(self) -> Result<PackageDirectory, GetError> {
373 drop(self.needed_blobs);
374 let () = self.get_fut.await?.map_err(Status::from_raw)?;
375 Ok(self.pkg_dir)
376 }
377
378 pub async fn abort(self) {
380 self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
381 let _ = self.get_fut.await;
386 }
387}
388
389#[derive(Clone, Debug)]
391pub struct WriteBlobs {
392 needed_blobs: fpkg::NeededBlobsProxy,
393}
394
395impl WriteBlobs {
396 pub fn make_open_blob(&mut self, blob: BlobId) -> DeferredOpenBlob {
399 DeferredOpenBlob {
400 needed_blobs: self.needed_blobs.clone(),
401 kind: OpenKind::Content,
402 blob_id: blob,
403 pkg_present: None,
404 }
405 }
406
407 pub async fn open_blob(&mut self, blob: BlobId) -> Result<Option<NeededBlob>, OpenBlobError> {
409 open_blob(&self.needed_blobs, OpenKind::Content, blob, None).await
410 }
411}
412
413#[derive(Debug)]
415pub struct NeededBlob {
416 pub blob: Blob<NeedsTruncate>,
419}
420
421#[derive(Debug)]
423pub enum TruncateBlobSuccess {
424 NeedsData(Blob<NeedsData>),
426
427 AllWritten(Blob<NeedsBlobWritten>),
430}
431
432#[derive(Debug)]
434pub enum BlobWriteSuccess {
435 NeedsData(Blob<NeedsData>),
437
438 AllWritten(Blob<NeedsBlobWritten>),
441}
442
443#[derive(Debug)]
445pub struct NeedsTruncate(fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>);
446
447#[derive(Debug)]
449pub struct NeedsData {
450 size: u64,
451 written: u64,
452 writer: blob_writer::BlobWriter,
453}
454
455#[derive(Debug)]
458pub struct NeedsBlobWritten;
459
460#[derive(Debug)]
462#[must_use]
463pub struct Blob<S> {
464 needed_blobs: fpkg::NeededBlobsProxy,
465 blob_id: BlobId,
466 state: S,
467}
468
469impl Blob<NeedsTruncate> {
470 pub async fn truncate(self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
472 let Self { needed_blobs, blob_id, state: NeedsTruncate(blob) } = self;
473
474 let writer = blob_writer::BlobWriter::create(blob.into_proxy(), size).await.map_err(
475 |e| match e {
476 blob_writer::CreateError::GetVmo(s) if s == Status::NO_SPACE => {
477 TruncateBlobError::NoSpace
478 }
479 _ => TruncateBlobError::CreateBlobWriter(e),
480 },
481 )?;
482
483 Ok(if size == 0 {
484 TruncateBlobSuccess::AllWritten(Blob { needed_blobs, blob_id, state: NeedsBlobWritten })
485 } else {
486 TruncateBlobSuccess::NeedsData(Blob {
487 needed_blobs,
488 blob_id,
489 state: NeedsData { size, written: 0, writer },
490 })
491 })
492 }
493}
494
495impl Blob<NeedsData> {
496 pub fn write(
502 self,
503 buf: &[u8],
504 ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
505 self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
506 }
507
508 pub async fn write_with_trace_callbacks(
520 mut self,
521 buf: &[u8],
522 after_write: &(dyn Fn(u64) + Send + Sync),
523 after_write_ack: &(dyn Fn() + Send + Sync),
524 ) -> Result<BlobWriteSuccess, WriteBlobError> {
525 assert!(self.state.written + buf.len() as u64 <= self.state.size);
526
527 let fut = self.state.writer.write(buf);
528 let () = after_write(buf.len() as u64);
529 let res = fut.await;
530 let () = after_write_ack();
531 let () = res.map_err(|e| match e {
532 e @ blob_writer::WriteError::BytesReady(s) => match s {
533 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
534 Status::NO_SPACE => WriteBlobError::NoSpace,
535 _ => WriteBlobError::FxBlob(e),
536 },
537 e => WriteBlobError::FxBlob(e),
538 })?;
539
540 self.state.written += buf.len() as u64;
541
542 if self.state.written == self.state.size {
543 let Self { needed_blobs, blob_id, state: _ } = self;
544 Ok(BlobWriteSuccess::AllWritten(Blob {
545 needed_blobs,
546 blob_id,
547 state: NeedsBlobWritten,
548 }))
549 } else {
550 Ok(BlobWriteSuccess::NeedsData(self))
551 }
552 }
553}
554
555impl Blob<NeedsBlobWritten> {
556 pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
558 Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
559 }
560}
561
562#[derive(Debug, thiserror::Error)]
564#[allow(missing_docs)]
565pub enum OpenError {
566 #[error("the package does not exist")]
567 NotFound,
568
569 #[error("Open() responded with an unexpected status")]
570 UnexpectedResponse(#[source] Status),
571
572 #[error("transport error")]
573 Fidl(#[from] fidl::Error),
574}
575#[derive(Debug, thiserror::Error)]
577#[allow(missing_docs)]
578pub enum GetError {
579 #[error("Get() responded with an unexpected status")]
580 UnexpectedResponse(#[from] Status),
581
582 #[error("transport error")]
583 Fidl(#[from] fidl::Error),
584}
585
586#[derive(Debug, thiserror::Error)]
588#[allow(missing_docs)]
589pub enum OpenBlobError {
590 #[error("there is insufficient storage space available to persist this blob")]
591 OutOfSpace,
592
593 #[error("this blob is already open for write by another cache operation")]
594 ConcurrentWrite,
595
596 #[error("an unspecified error occurred during underlying I/O")]
597 UnspecifiedIo,
598
599 #[error("an unspecified error occurred")]
600 Internal,
601
602 #[error("transport error")]
603 Fidl(#[from] fidl::Error),
604}
605
606impl From<fpkg::OpenBlobError> for OpenBlobError {
607 fn from(e: fpkg::OpenBlobError) -> Self {
608 match e {
609 fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
610 fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
611 fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
612 fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
613 }
614 }
615}
616
617#[derive(Debug, thiserror::Error)]
619#[allow(missing_docs)]
620pub enum ListMissingBlobsError {
621 #[error("while obtaining the missing blobs fidl iterator")]
622 CallGetMissingBlobs(#[source] fidl::Error),
623
624 #[error("while obtaining the next chunk of blobs from the fidl iterator")]
625 CallNextOnBlobIterator(#[source] fidl::Error),
626}
627
628#[derive(Debug, thiserror::Error)]
630#[allow(missing_docs)]
631pub enum TruncateBlobError {
632 #[error("insufficient storage space is available")]
633 NoSpace,
634
635 #[error("creating blob writer")]
636 CreateBlobWriter(#[source] blob_writer::CreateError),
637
638 #[error("transport error")]
639 Fidl(#[from] fidl::Error),
640
641 #[error("blob is in an invalid state")]
642 BadState,
643}
644
645#[derive(Debug, thiserror::Error)]
647#[allow(missing_docs)]
648pub enum WriteBlobError {
649 #[error("the written data was corrupt")]
650 Corrupt,
651
652 #[error("insufficient storage space is available")]
653 NoSpace,
654
655 #[error("transport error")]
656 Fidl(#[from] fidl::Error),
657
658 #[error("while using the fxblob writer")]
659 FxBlob(#[source] blob_writer::WriteError),
660}
661
662#[derive(Debug, thiserror::Error)]
664#[allow(missing_docs)]
665pub enum BlobWrittenError {
666 #[error("pkg-cache could not find the blob after it was successfully written")]
667 MissingAfterWritten,
668
669 #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
670 UnopenedBlob,
671
672 #[error("transport error")]
673 Fidl(#[from] fidl::Error),
674}
675
676impl From<fpkg::BlobWrittenError> for BlobWrittenError {
677 fn from(e: fpkg::BlobWrittenError) -> Self {
678 match e {
679 fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
680 fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
681 }
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use assert_matches::assert_matches;
689 use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
690 use fidl_fuchsia_io as fio;
691 use fidl_fuchsia_pkg::{
692 BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
693 PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
694 PackageCacheRequestStream,
695 };
696 use zx::HandleBased as _;
697
698 struct MockPackageCache {
699 stream: PackageCacheRequestStream,
700 }
701
702 impl MockPackageCache {
703 fn new() -> (Client, Self) {
704 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
705 (Client::from_proxy(proxy), Self { stream })
706 }
707
708 async fn expect_get(
709 &mut self,
710 blob_info: BlobInfo,
711 expected_gc_protection: fpkg::GcProtection,
712 ) -> PendingGet {
713 match self.stream.next().await {
714 Some(Ok(PackageCacheRequest::Get {
715 meta_far_blob,
716 gc_protection,
717 needed_blobs,
718 dir,
719 responder,
720 })) => {
721 assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
722 assert_eq!(gc_protection, expected_gc_protection);
723 let needed_blobs = needed_blobs.into_stream();
724 let dir = dir.into_stream();
725
726 PendingGet { stream: needed_blobs, dir, responder }
727 }
728 r => panic!("Unexpected request: {r:?}"),
729 }
730 }
731
732 async fn expect_closed(mut self) {
733 assert_matches!(self.stream.next().await, None);
734 }
735 }
736
737 struct PendingGet {
738 stream: NeededBlobsRequestStream,
739 dir: fio::DirectoryRequestStream,
740 responder: PackageCacheGetResponder,
741 }
742
743 impl PendingGet {
744 async fn new() -> (Get, PendingGet) {
745 let (client, mut server) = MockPackageCache::new();
746
747 let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
748 let pending_get =
749 server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
750 (get, pending_get)
751 }
752
753 fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
754 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
755 self.responder.send(Ok(())).unwrap();
756 (self.stream, PackageDirProvider { stream: self.dir })
757 }
758
759 fn finish(self) -> PackageDirProvider {
760 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
761 self.responder.send(Ok(())).unwrap();
762 PackageDirProvider { stream: self.dir }
763 }
764
765 #[cfg(target_os = "fuchsia")]
766 fn fail_the_get(self) {
767 self.responder
768 .send(Err(Status::IO_INVALID.into_raw()))
769 .expect("client should be waiting");
770 }
771
772 async fn expect_open_meta_blob(
773 mut self,
774 res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
775 ) -> Self {
776 match self.stream.next().await {
777 Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
778 responder.send(res).unwrap();
779 }
780 r => panic!("Unexpected request: {r:?}"),
781 }
782 self
783 }
784
785 async fn expect_open_blob(
786 mut self,
787 expected_blob_id: BlobId,
788 res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
789 ) -> Self {
790 match self.stream.next().await {
791 Some(Ok(NeededBlobsRequest::OpenBlob { blob_id, responder })) => {
792 assert_eq!(BlobId::from(blob_id), expected_blob_id);
793 responder.send(res).unwrap();
794 }
795 r => panic!("Unexpected request: {r:?}"),
796 }
797 self
798 }
799
800 async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
801 match self.stream.next().await {
802 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
803 let mut stream = iterator.into_stream();
804
805 for chunk in response_chunks {
807 let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
808
809 let BlobInfoIteratorRequest::Next { responder } =
810 stream.next().await.unwrap().unwrap();
811 responder.send(&chunk).unwrap();
812 }
813
814 let BlobInfoIteratorRequest::Next { responder } =
816 stream.next().await.unwrap().unwrap();
817 responder.send(&[]).unwrap();
818
819 assert_matches!(stream.next().await, None);
821 }
822 r => panic!("Unexpected request: {r:?}"),
823 }
824 self
825 }
826
827 async fn expect_get_missing_blobs_client_closes_channel(
828 mut self,
829 response_chunks: Vec<Vec<BlobInfo>>,
830 ) -> Self {
831 match self.stream.next().await {
832 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
833 let mut stream = iterator.into_stream();
834
835 for chunk in response_chunks {
837 let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
838
839 let BlobInfoIteratorRequest::Next { responder } =
840 stream.next().await.unwrap().unwrap();
841 responder.send(&chunk).unwrap();
842 }
843
844 assert_matches!(stream.next().await, None);
846 }
847 r => panic!("Unexpected request: {r:?}"),
848 }
849 self
850 }
851
852 async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
853 match self.stream.next().await {
854 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
855 iterator
856 .into_stream_and_control_handle()
857 .1
858 .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
859 }
860 r => panic!("Unexpected request: {r:?}"),
861 }
862 self
863 }
864
865 #[cfg(target_os = "fuchsia")]
866 async fn expect_abort(mut self) -> Self {
867 match self.stream.next().await {
868 Some(Ok(NeededBlobsRequest::Abort { responder })) => {
869 responder.send().unwrap();
870 }
871 r => panic!("Unexpected request: {r:?}"),
872 }
873 self
874 }
875 }
876
877 struct PackageDirProvider {
878 stream: fio::DirectoryRequestStream,
879 }
880
881 impl PackageDirProvider {
882 fn close_pkg_dir(self) {
883 self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
884 }
885 }
886
887 fn blob_id(n: u8) -> BlobId {
888 BlobId::from([n; 32])
889 }
890
891 fn blob_info(n: u8) -> BlobInfo {
892 BlobInfo { blob_id: blob_id(n), length: 0 }
893 }
894
895 #[fuchsia_async::run_singlethreaded(test)]
896 async fn constructor() {
897 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
898 let client = Client::from_proxy(proxy);
899
900 drop(stream);
901 assert_matches!(client.proxy().sync().await, Err(_));
902 }
903
904 #[fuchsia_async::run_singlethreaded(test)]
905 async fn get_present_package() {
906 let (client, mut server) = MockPackageCache::new();
907
908 let ((), ()) = future::join(
909 async {
910 server
911 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
912 .await
913 .finish()
914 .close_pkg_dir();
915 server.expect_closed().await;
916 },
917 async move {
918 let mut get =
919 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
920
921 assert_matches!(get.open_meta_blob().await.unwrap(), None);
922 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
923 let pkg_dir = get.finish().await.unwrap();
924
925 assert_matches!(
926 pkg_dir.into_proxy().take_event_stream().next().await,
927 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
928 );
929 },
930 )
931 .await;
932 }
933
934 #[fuchsia_async::run_singlethreaded(test)]
935 async fn get_present_package_handles_slow_stream_close() {
936 let (client, mut server) = MockPackageCache::new();
937
938 let (send, recv) = futures::channel::oneshot::channel::<()>();
939
940 let ((), ()) = future::join(
941 async {
942 let (needed_blobs_stream, pkg_dir) = server
943 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
944 .await
945 .finish_hold_stream_open();
946 pkg_dir.close_pkg_dir();
947
948 let _ = recv.await;
950 drop(needed_blobs_stream);
951 },
952 async move {
953 let mut get =
954 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
955
956 assert_matches!(get.open_meta_blob().await.unwrap(), None);
957
958 let missing_blobs_stream = get.get_missing_blobs();
962 drop(send);
963 assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
964 let pkg_dir = get.finish().await.unwrap();
965
966 assert_matches!(
967 pkg_dir.into_proxy().take_event_stream().next().await,
968 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
969 );
970 },
971 )
972 .await;
973 }
974
975 #[fuchsia_async::run_singlethreaded(test)]
976 async fn needed_blobs_open_meta_far() {
977 let (mut get, pending_get) = PendingGet::new().await;
978
979 let ((), ()) = future::join(
980 async {
981 pending_get
982 .expect_open_meta_blob(Ok(None))
983 .await
984 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
985 .await
986 .expect_open_meta_blob(Ok(None))
987 .await
988 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
989 .await
990 .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
991 .await
992 .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
993 .await;
994 },
995 async {
996 {
997 let opener = get.make_open_meta_blob();
998 assert_matches!(opener.open().await.unwrap(), None);
999 assert_matches!(opener.open().await.unwrap(), Some(_));
1000 }
1001 assert_matches!(get.open_meta_blob().await.unwrap(), None);
1002 assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1003 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1004 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1005 },
1006 )
1007 .await;
1008 }
1009
1010 #[fuchsia_async::run_singlethreaded(test)]
1011 async fn needed_blobs_open_content_blob() {
1012 let (mut get, pending_get) = PendingGet::new().await;
1013
1014 let ((), ()) = future::join(
1015 async {
1016 pending_get
1017 .expect_open_blob(blob_id(2), Ok(None))
1018 .await
1019 .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1020 .await
1021 .expect_open_blob(blob_id(10), Ok(None))
1022 .await
1023 .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1024 .await
1025 .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1026 .await
1027 .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1028 .await;
1029 },
1030 async {
1031 {
1032 let opener = get.make_open_blob(blob_id(2));
1033 assert_matches!(opener.open().await.unwrap(), None);
1034 assert_matches!(opener.open().await.unwrap(), Some(_));
1035 }
1036 assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1037 assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1038 assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1039 assert_matches!(
1040 get.open_blob(blob_id(13),).await,
1041 Err(OpenBlobError::UnspecifiedIo)
1042 );
1043 },
1044 )
1045 .await;
1046 }
1047
1048 #[fuchsia_async::run_singlethreaded(test)]
1049 async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1050 let (mut get, pending_get) = PendingGet::new().await;
1051 let _ = pending_get.finish();
1052
1053 assert_matches!(get.open_meta_blob().await, Ok(None));
1054 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1055 }
1056
1057 #[fuchsia_async::run_singlethreaded(test)]
1058 async fn needed_blobs_get_missing_blobs() {
1059 let (mut get, pending_get) = PendingGet::new().await;
1060
1061 let ((), ()) = future::join(
1062 async {
1063 pending_get
1064 .expect_get_missing_blobs(vec![
1065 vec![blob_info(1), blob_info(2)],
1066 vec![blob_info(3)],
1067 ])
1068 .await;
1069 },
1070 async {
1071 assert_eq!(
1072 get.get_missing_blobs().try_concat().await.unwrap(),
1073 vec![blob_info(1), blob_info(2), blob_info(3)]
1074 );
1075 },
1076 )
1077 .await;
1078 }
1079
1080 #[fuchsia_async::run_singlethreaded(test)]
1081 async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1082 let (mut get, pending_get) = PendingGet::new().await;
1083 drop(pending_get);
1084
1085 assert_matches!(
1086 get.get_missing_blobs().try_concat().await,
1087 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1088 fidl::Error::ClientChannelClosed{status, ..})
1089 )
1090 if status == Status::PEER_CLOSED
1091 );
1092 }
1093
1094 #[fuchsia_async::run_singlethreaded(test)]
1095 async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1096 let (mut get, pending_get) = PendingGet::new().await;
1097
1098 let (_, ()) =
1099 future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1100 assert_matches!(
1101 get.get_missing_blobs().try_concat().await,
1102 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1103 fidl::Error::ClientChannelClosed{status, ..}
1104 ))
1105 if status == Status::ADDRESS_IN_USE
1106 );
1107 })
1108 .await;
1109 }
1110
1111 #[cfg(target_os = "fuchsia")]
1112 #[test]
1113 fn needed_blobs_abort() {
1114 use futures::future::Either;
1115 use futures::pin_mut;
1116 use std::task::Poll;
1117
1118 let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1119
1120 let fut = async {
1121 let (get, pending_get) = PendingGet::new().await;
1122
1123 let abort_fut = get.abort().boxed();
1124 let expect_abort_fut = pending_get.expect_abort();
1125 pin_mut!(expect_abort_fut);
1126
1127 match futures::future::select(abort_fut, expect_abort_fut).await {
1128 Either::Left(((), _expect_abort_fut)) => {
1129 panic!("abort should wait for the get future to complete")
1130 }
1131 Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1132 }
1133 };
1134 pin_mut!(fut);
1135
1136 let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1137 Poll::Pending => panic!("should complete"),
1138 Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1139 };
1140
1141 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1143 pending_get.fail_the_get();
1144 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1145 }
1146
1147 struct MockNeededBlob {
1148 writer: ffxfs::BlobWriterRequestStream,
1149 needed_blobs: fpkg::NeededBlobsRequestStream,
1150 vmo: Option<zx::Vmo>,
1151 }
1152
1153 impl MockNeededBlob {
1154 fn mock_hash() -> BlobId {
1155 [7; 32].into()
1156 }
1157
1158 fn new() -> (NeededBlob, Self) {
1159 let (writer_client, writer) =
1160 fidl::endpoints::create_request_stream::<ffxfs::BlobWriterMarker>();
1161 let (needed_blobs_proxy, needed_blobs) =
1162 fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1163 (
1164 NeededBlob {
1165 blob: Blob {
1166 needed_blobs: needed_blobs_proxy,
1167 blob_id: Self::mock_hash(),
1168 state: NeedsTruncate(writer_client),
1169 },
1170 },
1171 Self { writer, needed_blobs, vmo: None },
1172 )
1173 }
1174
1175 async fn fail_get_vmo(mut self) -> Self {
1176 match self.writer.next().await {
1177 Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size: _, responder })) => {
1178 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1179 }
1180 r => panic!("Unexpected request: {r:?}"),
1181 }
1182 self
1183 }
1184
1185 async fn expect_get_vmo(mut self, expected_size: u64) -> Self {
1186 match self.writer.next().await {
1187 Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size, responder })) => {
1188 assert_eq!(size, expected_size);
1189 let vmo = zx::Vmo::create(size).unwrap();
1190 assert!(self.vmo.is_none());
1191 self.vmo = Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap());
1192 responder.send(Ok(vmo)).unwrap();
1193 }
1194 r => panic!("Unexpected request: {r:?}"),
1195 }
1196 self
1197 }
1198
1199 async fn fail_bytes_ready(mut self) -> Self {
1200 match self.writer.next().await {
1201 Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written: _, responder })) => {
1202 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1203 }
1204 r => panic!("Unexpected request: {r:?}"),
1205 }
1206 self
1207 }
1208
1209 async fn expect_bytes_ready(mut self, expected_payload: &[u8], offset: u64) -> Self {
1210 match self.writer.next().await {
1211 Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written, responder })) => {
1212 assert_eq!(bytes_written, u64::try_from(expected_payload.len()).unwrap());
1213
1214 let vmo = self.vmo.as_ref().unwrap();
1215 let mut buf = vec![0; expected_payload.len()];
1216 let () = vmo.read(&mut buf, offset).unwrap();
1217 assert_eq!(buf, expected_payload);
1218
1219 responder.send(Ok(())).unwrap();
1220 }
1221 r => panic!("Unexpected request: {r:?}"),
1222 }
1223 self
1224 }
1225
1226 async fn expect_blob_written(mut self) -> Self {
1227 match self.needed_blobs.next().await {
1228 Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1229 assert_eq!(blob_id, Self::mock_hash().into());
1230 responder.send(Ok(())).unwrap();
1231 }
1232 r => panic!("Unexpected request: {r:?}"),
1233 }
1234 self
1235 }
1236 }
1237
1238 #[fuchsia_async::run_singlethreaded(test)]
1239 async fn empty_blob_write() {
1240 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1241
1242 let ((), ()) = future::join(
1243 async {
1244 blob_server.expect_get_vmo(0).await.expect_blob_written().await;
1245 },
1246 async {
1247 let blob = match blob.truncate(0).await.unwrap() {
1248 TruncateBlobSuccess::AllWritten(blob) => blob,
1249 other => panic!("empty blob shouldn't need bytes {other:?}"),
1250 };
1251 let () = blob.blob_written().await.unwrap();
1252 },
1253 )
1254 .await;
1255 }
1256
1257 impl TruncateBlobSuccess {
1258 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1259 match self {
1260 TruncateBlobSuccess::NeedsData(blob) => blob,
1261 TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1262 }
1263 }
1264 }
1265
1266 impl BlobWriteSuccess {
1267 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1268 match self {
1269 BlobWriteSuccess::NeedsData(blob) => blob,
1270 BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1271 }
1272 }
1273
1274 fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1275 match self {
1276 BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1277 BlobWriteSuccess::AllWritten(blob) => blob,
1278 }
1279 }
1280 }
1281
1282 #[fuchsia_async::run_singlethreaded(test)]
1283 async fn small_blob_write() {
1284 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1285
1286 let ((), ()) = future::join(
1287 async {
1288 blob_server
1289 .expect_get_vmo(4)
1290 .await
1291 .expect_bytes_ready(b"test", 0)
1292 .await
1293 .expect_blob_written()
1294 .await;
1295 },
1296 async {
1297 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1298 let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1299 let () = blob.blob_written().await.unwrap();
1300 },
1301 )
1302 .await;
1303 }
1304
1305 #[fuchsia_async::run_singlethreaded(test)]
1306 async fn blob_truncate_no_space() {
1307 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1308
1309 let ((), ()) = future::join(
1310 async {
1311 blob_server.fail_get_vmo().await;
1312 },
1313 async {
1314 assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1315 },
1316 )
1317 .await;
1318 }
1319
1320 #[fuchsia_async::run_singlethreaded(test)]
1321 async fn blob_write_no_space() {
1322 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1323
1324 let ((), ()) = future::join(
1325 async {
1326 blob_server.expect_get_vmo(4).await.fail_bytes_ready().await;
1327 },
1328 async {
1329 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1330 assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1331 },
1332 )
1333 .await;
1334 }
1335
1336 #[fuchsia_async::run_singlethreaded(test)]
1337 async fn blob_write_multiple_write() {
1338 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1339
1340 let ((), ()) = future::join(
1341 async {
1342 blob_server
1343 .expect_get_vmo(6)
1344 .await
1345 .expect_bytes_ready(b"abc", 0)
1346 .await
1347 .expect_bytes_ready(b"123", 3)
1348 .await
1349 .expect_blob_written()
1350 .await;
1351 },
1352 async {
1353 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1354 let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1355 let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1356 let () = blob.blob_written().await.unwrap();
1357 },
1358 )
1359 .await;
1360 }
1361
1362 #[fuchsia_async::run_singlethreaded(test)]
1363 async fn get_already_cached_success() {
1364 let (client, mut server) = MockPackageCache::new();
1365
1366 let ((), ()) = future::join(
1367 async {
1368 server
1369 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1370 .await
1371 .finish()
1372 .close_pkg_dir();
1373 server.expect_closed().await;
1374 },
1375 async move {
1376 let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1377
1378 assert_matches!(
1379 pkg_dir.into_proxy().take_event_stream().next().await,
1380 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1381 );
1382 },
1383 )
1384 .await;
1385 }
1386
1387 #[fuchsia_async::run_singlethreaded(test)]
1388 async fn get_already_cached_missing_meta_far() {
1389 let (client, mut server) = MockPackageCache::new();
1390
1391 let ((), ()) = future::join(
1392 async {
1393 server
1394 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1395 .await
1396 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1397 .await;
1398 },
1399 async move {
1400 assert_matches!(
1401 client.get_already_cached(blob_id(2)).await,
1402 Err(GetAlreadyCachedError::MissingMetaFar)
1403 );
1404 },
1405 )
1406 .await;
1407 }
1408
1409 #[fuchsia_async::run_singlethreaded(test)]
1410 async fn get_already_cached_missing_content_blob() {
1411 let (client, mut server) = MockPackageCache::new();
1412
1413 let ((), ()) = future::join(
1414 async {
1415 server
1416 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1417 .await
1418 .expect_open_meta_blob(Ok(None))
1419 .await
1420 .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1421 blob_id: [0; 32].into(),
1422 length: 0,
1423 }]])
1424 .await;
1425 },
1426 async move {
1427 assert_matches!(
1428 client.get_already_cached(blob_id(2)).await,
1429 Err(GetAlreadyCachedError::MissingContentBlobs(v))
1430 if v == vec![BlobInfo {
1431 blob_id: [0; 32].into(),
1432 length: 0,
1433 }]
1434 );
1435 },
1436 )
1437 .await;
1438 }
1439}