fidl_fuchsia_pkg_ext/cache/
storage.rs1use super::{OpenBlobError, TruncateBlobError, WriteBlobError};
6use anyhow::anyhow;
7use zx_status::Status;
8use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
9
10pub(super) fn into_blob_writer_and_closer(
11 fidl: fpkg::BlobWriter,
12) -> Result<(Box<dyn Writer>, Box<dyn Closer>), OpenBlobError> {
13 use fpkg::BlobWriter::*;
14 match fidl {
15 File(file) => {
16 let proxy = file.into_proxy();
17 Ok((Box::new(Clone::clone(&proxy)), Box::new(proxy)))
18 }
19 Writer(writer) => {
20 Ok((Box::new(FxBlob::new(writer.into_proxy())), Box::new(())))
23 }
24 }
25}
26
27#[async_trait::async_trait]
28pub(super) trait Closer: Send + Sync + std::fmt::Debug {
29 async fn close(&mut self);
31
32 fn best_effort_close(&mut self);
35}
36
37#[async_trait::async_trait]
38impl Closer for fio::FileProxy {
39 async fn close(&mut self) {
40 let _: Result<Result<(), i32>, fidl::Error> = fio::FileProxy::close(self).await;
41 }
42
43 fn best_effort_close(&mut self) {
44 let _: fidl::client::QueryResponseFut<Result<(), i32>> = fio::FileProxy::close(self);
45 }
46}
47
48#[async_trait::async_trait]
51impl Closer for () {
52 async fn close(&mut self) {}
53
54 fn best_effort_close(&mut self) {}
55}
56
57#[async_trait::async_trait]
58pub(super) trait Writer: Send + std::fmt::Debug {
59 async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError>;
63 async fn write(
67 &mut self,
68 bytes: &[u8],
69 after_write: &(dyn Fn(u64) + Send + Sync),
70 after_write_ack: &(dyn Fn() + Send + Sync),
71 ) -> Result<(), WriteBlobError>;
72}
73
74#[async_trait::async_trait]
75impl Writer for fio::FileProxy {
76 async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
77 self.resize(size).await?.map_err(|i| match Status::from_raw(i) {
78 Status::NO_SPACE => TruncateBlobError::NoSpace,
79 other => TruncateBlobError::UnexpectedResponse(other),
80 })
81 }
82
83 async fn write(
84 &mut self,
85 mut bytes: &[u8],
86 after_write: &(dyn Fn(u64) + Send + Sync),
87 after_write_ack: &(dyn Fn() + Send + Sync),
88 ) -> Result<(), WriteBlobError> {
89 while !bytes.is_empty() {
90 let limit = bytes.len().min(fio::MAX_BUF as usize);
91
92 let result_fut = fio::FileProxy::write(self, &bytes[..limit]);
93 after_write(bytes.len() as u64);
94
95 let result = result_fut.await;
96 after_write_ack();
97
98 let written = result?.map_err(|i| match Status::from_raw(i) {
99 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
100 Status::NO_SPACE => WriteBlobError::NoSpace,
101 other => WriteBlobError::UnexpectedResponse(other),
102 })? as usize;
103
104 if written > bytes.len() {
105 return Err(WriteBlobError::Overwrite);
106 }
107 bytes = &bytes[written..];
108 }
109
110 Ok(())
111 }
112}
113
114#[allow(clippy::large_enum_variant)] #[derive(Debug)]
116enum FxBlob {
117 NeedsTruncate(ffxfs::BlobWriterProxy),
118 NeedsBytes(blob_writer::BlobWriter),
119 Invalid,
120}
121
122impl FxBlob {
123 fn new(proxy: ffxfs::BlobWriterProxy) -> Self {
124 Self::NeedsTruncate(proxy)
125 }
126
127 fn state_str(&self) -> &'static str {
128 match self {
129 Self::NeedsTruncate(_) => "needs truncate",
130 Self::NeedsBytes(_) => "needs bytes",
131 Self::Invalid => "invalid",
132 }
133 }
134}
135
136#[async_trait::async_trait]
137impl Writer for FxBlob {
138 async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
139 *self = match std::mem::replace(self, Self::Invalid) {
140 Self::NeedsTruncate(proxy) => Self::NeedsBytes(
141 blob_writer::BlobWriter::create(proxy, size).await.map_err(|e| match e {
142 blob_writer::CreateError::GetVmo(status) => {
143 TruncateBlobError::UnexpectedResponse(status)
144 }
145 e => TruncateBlobError::Other(anyhow!(e).context("creating a BlobWriter")),
146 })?,
147 ),
148 Self::NeedsBytes(_) => {
149 return Err(TruncateBlobError::AlreadyTruncated(self.state_str()))
150 }
151 Self::Invalid => return Err(TruncateBlobError::BadState),
152 };
153 Ok(())
154 }
155
156 async fn write(
157 &mut self,
158 bytes: &[u8],
159 after_write: &(dyn Fn(u64) + Send + Sync),
160 after_write_ack: &(dyn Fn() + Send + Sync),
161 ) -> Result<(), WriteBlobError> {
162 let Self::NeedsBytes(writer) = self else {
163 return Err(WriteBlobError::BytesNotNeeded(self.state_str()));
164 };
165 let fut = writer.write(bytes);
166 let () = after_write(bytes.len() as u64);
167 let res = fut.await;
168 let () = after_write_ack();
169 res.map_err(|e| match e {
170 blob_writer::WriteError::BytesReady(s) => match s {
171 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
172 Status::NO_SPACE => WriteBlobError::NoSpace,
173 _ => WriteBlobError::UnexpectedResponse(s),
174 },
175 e => WriteBlobError::FxBlob(e),
176 })
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use futures::stream::TryStreamExt as _;
184
185 #[fuchsia_async::run_singlethreaded(test)]
186 async fn file_proxy_chunks_writes() {
187 let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
188 let bytes = vec![0; fio::MAX_BUF as usize + 1];
189
190 let write_fut = async move {
191 <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
192 };
193 let server_fut = async move {
194 match server.try_next().await.unwrap().unwrap() {
195 fio::FileRequest::Write { data, responder } => {
196 assert_eq!(data, vec![0; fio::MAX_BUF as usize]);
198 let () = responder.send(Ok(fio::MAX_BUF)).unwrap();
199 }
200 req => panic!("unexpected request {req:?}"),
201 }
202 match server.try_next().await.unwrap().unwrap() {
203 fio::FileRequest::Write { data, responder } => {
204 assert_eq!(data, vec![0; 1]);
205 let () = responder.send(Ok(1)).unwrap();
206 }
207 req => panic!("unexpected request {req:?}"),
208 }
209 assert!(server.try_next().await.unwrap().is_none());
210 };
211
212 let ((), ()) = futures::future::join(write_fut, server_fut).await;
213 }
214
215 #[fuchsia_async::run_singlethreaded(test)]
216 async fn file_proxy_handles_short_writes() {
217 let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
218 let bytes = [0; 10];
219
220 let write_fut = async move {
221 <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
222 };
223 let server_fut = async move {
224 match server.try_next().await.unwrap().unwrap() {
225 fio::FileRequest::Write { data, responder } => {
226 assert_eq!(data, [0; 10]);
227 let () = responder.send(Ok(8)).unwrap();
229 }
230 req => panic!("unexpected request {req:?}"),
231 }
232 match server.try_next().await.unwrap().unwrap() {
233 fio::FileRequest::Write { data, responder } => {
234 assert_eq!(data, [0; 2]);
235 let () = responder.send(Ok(2)).unwrap();
236 }
237 req => panic!("unexpected request {req:?}"),
238 }
239 assert!(server.try_next().await.unwrap().is_none());
240 };
241
242 let ((), ()) = futures::future::join(write_fut, server_fut).await;
243 }
244
245 #[fuchsia_async::run_singlethreaded(test)]
246 async fn fxblob_writer() {
247 let blobfs = blobfs_ramdisk::BlobfsRamdisk::builder().fxblob().start().await.unwrap();
248 assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::new());
249 let contents = [0u8; 7];
250 let hash = fuchsia_merkle::from_slice(&contents).root();
251 let compressed = delivery_blob::Type1Blob::generate(
252 &contents[..],
253 delivery_blob::CompressionMode::Attempt,
254 );
255 let writer = blobfs
256 .blob_creator_proxy()
257 .unwrap()
258 .unwrap()
259 .create(&hash.into(), false)
260 .await
261 .unwrap()
262 .unwrap();
263
264 let (mut writer, _closer) =
265 into_blob_writer_and_closer(fpkg::BlobWriter::Writer(writer)).unwrap();
266 let () = writer.truncate(compressed.len().try_into().unwrap()).await.unwrap();
267 let () = writer.write(&compressed, &|_| (), &|| ()).await.unwrap();
268
269 assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::from([hash]));
270
271 let () = blobfs.stop().await.unwrap();
272 }
273}