fidl_fuchsia_pkg_ext/cache/
storage.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{OpenBlobError, TruncateBlobError, WriteBlobError};
6use anyhow::anyhow;
7use zx_status::Status;
8use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
9
10pub(super) fn into_blob_writer_and_closer(
11    fidl: fpkg::BlobWriter,
12) -> Result<(Box<dyn Writer>, Box<dyn Closer>), OpenBlobError> {
13    use fpkg::BlobWriter::*;
14    match fidl {
15        File(file) => {
16            let proxy = file.into_proxy();
17            Ok((Box::new(Clone::clone(&proxy)), Box::new(proxy)))
18        }
19        Writer(writer) => {
20            // fuchsia.fxfs/BlobCreator allows concurrent creation attempts, so we don't need to
21            // cancel an ongoing attempt before trying again.
22            Ok((Box::new(FxBlob::new(writer.into_proxy())), Box::new(())))
23        }
24    }
25}
26
27#[async_trait::async_trait]
28pub(super) trait Closer: Send + Sync + std::fmt::Debug {
29    /// Close the blob to enable immediate retry of create and write.
30    async fn close(&mut self);
31
32    /// Attempt to close the blob. Function may return before blob is closed if closing requires
33    /// async.
34    fn best_effort_close(&mut self);
35}
36
37#[async_trait::async_trait]
38impl Closer for fio::FileProxy {
39    async fn close(&mut self) {
40        let _: Result<Result<(), i32>, fidl::Error> = fio::FileProxy::close(self).await;
41    }
42
43    fn best_effort_close(&mut self) {
44        let _: fidl::client::QueryResponseFut<Result<(), i32>> = fio::FileProxy::close(self);
45    }
46}
47
48// fuchsia.fxfs/BlobCreator allows concurrent creation attempts, so we don't need to cancel an
49// ongoing attempt before trying again.
50#[async_trait::async_trait]
51impl Closer for () {
52    async fn close(&mut self) {}
53
54    fn best_effort_close(&mut self) {}
55}
56
57#[async_trait::async_trait]
58pub(super) trait Writer: Send + std::fmt::Debug {
59    /// Set the size of the blob.
60    /// If the blob is size zero, the returned Future should not complete until the blob
61    /// is readable.
62    async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError>;
63    /// Write `bytes` to the blob.
64    /// The Future returned by the `write` call that writes the final bytes should
65    /// not complete until the blob is readable.
66    async fn write(
67        &mut self,
68        bytes: &[u8],
69        after_write: &(dyn Fn(u64) + Send + Sync),
70        after_write_ack: &(dyn Fn() + Send + Sync),
71    ) -> Result<(), WriteBlobError>;
72}
73
74#[async_trait::async_trait]
75impl Writer for fio::FileProxy {
76    async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
77        self.resize(size).await?.map_err(|i| match Status::from_raw(i) {
78            Status::NO_SPACE => TruncateBlobError::NoSpace,
79            other => TruncateBlobError::UnexpectedResponse(other),
80        })
81    }
82
83    async fn write(
84        &mut self,
85        mut bytes: &[u8],
86        after_write: &(dyn Fn(u64) + Send + Sync),
87        after_write_ack: &(dyn Fn() + Send + Sync),
88    ) -> Result<(), WriteBlobError> {
89        while !bytes.is_empty() {
90            let limit = bytes.len().min(fio::MAX_BUF as usize);
91
92            let result_fut = fio::FileProxy::write(self, &bytes[..limit]);
93            after_write(bytes.len() as u64);
94
95            let result = result_fut.await;
96            after_write_ack();
97
98            let written = result?.map_err(|i| match Status::from_raw(i) {
99                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
100                Status::NO_SPACE => WriteBlobError::NoSpace,
101                other => WriteBlobError::UnexpectedResponse(other),
102            })? as usize;
103
104            if written > bytes.len() {
105                return Err(WriteBlobError::Overwrite);
106            }
107            bytes = &bytes[written..];
108        }
109
110        Ok(())
111    }
112}
113
114#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087293)
115#[derive(Debug)]
116enum FxBlob {
117    NeedsTruncate(ffxfs::BlobWriterProxy),
118    NeedsBytes(blob_writer::BlobWriter),
119    Invalid,
120}
121
122impl FxBlob {
123    fn new(proxy: ffxfs::BlobWriterProxy) -> Self {
124        Self::NeedsTruncate(proxy)
125    }
126
127    fn state_str(&self) -> &'static str {
128        match self {
129            Self::NeedsTruncate(_) => "needs truncate",
130            Self::NeedsBytes(_) => "needs bytes",
131            Self::Invalid => "invalid",
132        }
133    }
134}
135
136#[async_trait::async_trait]
137impl Writer for FxBlob {
138    async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
139        *self = match std::mem::replace(self, Self::Invalid) {
140            Self::NeedsTruncate(proxy) => Self::NeedsBytes(
141                blob_writer::BlobWriter::create(proxy, size).await.map_err(|e| match e {
142                    blob_writer::CreateError::GetVmo(status) => {
143                        TruncateBlobError::UnexpectedResponse(status)
144                    }
145                    e => TruncateBlobError::Other(anyhow!(e).context("creating a BlobWriter")),
146                })?,
147            ),
148            Self::NeedsBytes(_) => {
149                return Err(TruncateBlobError::AlreadyTruncated(self.state_str()))
150            }
151            Self::Invalid => return Err(TruncateBlobError::BadState),
152        };
153        Ok(())
154    }
155
156    async fn write(
157        &mut self,
158        bytes: &[u8],
159        after_write: &(dyn Fn(u64) + Send + Sync),
160        after_write_ack: &(dyn Fn() + Send + Sync),
161    ) -> Result<(), WriteBlobError> {
162        let Self::NeedsBytes(writer) = self else {
163            return Err(WriteBlobError::BytesNotNeeded(self.state_str()));
164        };
165        let fut = writer.write(bytes);
166        let () = after_write(bytes.len() as u64);
167        let res = fut.await;
168        let () = after_write_ack();
169        res.map_err(|e| match e {
170            blob_writer::WriteError::BytesReady(s) => match s {
171                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
172                Status::NO_SPACE => WriteBlobError::NoSpace,
173                _ => WriteBlobError::UnexpectedResponse(s),
174            },
175            e => WriteBlobError::FxBlob(e),
176        })
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use futures::stream::TryStreamExt as _;
184
185    #[fuchsia_async::run_singlethreaded(test)]
186    async fn file_proxy_chunks_writes() {
187        let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
188        let bytes = vec![0; fio::MAX_BUF as usize + 1];
189
190        let write_fut = async move {
191            <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
192        };
193        let server_fut = async move {
194            match server.try_next().await.unwrap().unwrap() {
195                fio::FileRequest::Write { data, responder } => {
196                    // Proxy limited writes to MAX_BUF bytes.
197                    assert_eq!(data, vec![0; fio::MAX_BUF as usize]);
198                    let () = responder.send(Ok(fio::MAX_BUF)).unwrap();
199                }
200                req => panic!("unexpected request {req:?}"),
201            }
202            match server.try_next().await.unwrap().unwrap() {
203                fio::FileRequest::Write { data, responder } => {
204                    assert_eq!(data, vec![0; 1]);
205                    let () = responder.send(Ok(1)).unwrap();
206                }
207                req => panic!("unexpected request {req:?}"),
208            }
209            assert!(server.try_next().await.unwrap().is_none());
210        };
211
212        let ((), ()) = futures::future::join(write_fut, server_fut).await;
213    }
214
215    #[fuchsia_async::run_singlethreaded(test)]
216    async fn file_proxy_handles_short_writes() {
217        let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
218        let bytes = [0; 10];
219
220        let write_fut = async move {
221            <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
222        };
223        let server_fut = async move {
224            match server.try_next().await.unwrap().unwrap() {
225                fio::FileRequest::Write { data, responder } => {
226                    assert_eq!(data, [0; 10]);
227                    // Ack only 8 of the 10 bytes.
228                    let () = responder.send(Ok(8)).unwrap();
229                }
230                req => panic!("unexpected request {req:?}"),
231            }
232            match server.try_next().await.unwrap().unwrap() {
233                fio::FileRequest::Write { data, responder } => {
234                    assert_eq!(data, [0; 2]);
235                    let () = responder.send(Ok(2)).unwrap();
236                }
237                req => panic!("unexpected request {req:?}"),
238            }
239            assert!(server.try_next().await.unwrap().is_none());
240        };
241
242        let ((), ()) = futures::future::join(write_fut, server_fut).await;
243    }
244
245    #[fuchsia_async::run_singlethreaded(test)]
246    async fn fxblob_writer() {
247        let blobfs = blobfs_ramdisk::BlobfsRamdisk::builder().fxblob().start().await.unwrap();
248        assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::new());
249        let contents = [0u8; 7];
250        let hash = fuchsia_merkle::from_slice(&contents).root();
251        let compressed = delivery_blob::Type1Blob::generate(
252            &contents[..],
253            delivery_blob::CompressionMode::Attempt,
254        );
255        let writer = blobfs
256            .blob_creator_proxy()
257            .unwrap()
258            .unwrap()
259            .create(&hash.into(), false)
260            .await
261            .unwrap()
262            .unwrap();
263
264        let (mut writer, _closer) =
265            into_blob_writer_and_closer(fpkg::BlobWriter::Writer(writer)).unwrap();
266        let () = writer.truncate(compressed.len().try_into().unwrap()).await.unwrap();
267        let () = writer.write(&compressed, &|_| (), &|| ()).await.unwrap();
268
269        assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::from([hash]));
270
271        let () = blobfs.stop().await.unwrap();
272    }
273}