1use fidl_fuchsia_fxfs::BlobWriterProxy;
6
7use futures::future::{BoxFuture, FutureExt as _};
8use futures::stream::{FuturesOrdered, StreamExt as _, TryStreamExt as _};
9
10mod errors;
11pub use errors::{CreateError, WriteError};
12
13#[derive(Debug)]
16pub struct BlobWriter {
17 blob_writer_proxy: BlobWriterProxy,
18 vmo: zx::Vmo,
19 outstanding_writes:
32 FuturesOrdered<BoxFuture<'static, Result<Result<u64, zx::Status>, fidl::Error>>>,
33 bytes_sent: u64,
35 available: u64,
38 blob_len: u64,
40 vmo_len: u64,
42}
43
44impl BlobWriter {
45 pub async fn create(
47 blob_writer_proxy: BlobWriterProxy,
48 size: u64,
49 ) -> Result<Self, CreateError> {
50 let vmo = blob_writer_proxy
51 .get_vmo(size)
52 .await
53 .map_err(CreateError::Fidl)?
54 .map_err(zx::Status::from_raw)
55 .map_err(CreateError::GetVmo)?;
56 let vmo_len = vmo.get_size().map_err(CreateError::GetSize)?;
57 Ok(BlobWriter {
58 blob_writer_proxy,
59 vmo,
60 outstanding_writes: FuturesOrdered::new(),
61 bytes_sent: 0,
62 available: vmo_len,
63 blob_len: size,
64 vmo_len,
65 })
66 }
67
68 pub async fn write(&mut self, mut bytes: &[u8]) -> Result<(), WriteError> {
80 if self.bytes_sent + bytes.len() as u64 > self.blob_len {
81 return Err(WriteError::EndOfBlob);
82 }
83 while !bytes.is_empty() {
84 debug_assert!(self.outstanding_writes.len() <= 2);
85 if self.available == 0 || self.outstanding_writes.len() == 2 {
87 let bytes_ackd = self
88 .outstanding_writes
89 .next()
90 .await
91 .ok_or_else(|| WriteError::QueueEnded)?
92 .map_err(WriteError::Fidl)?
93 .map_err(WriteError::BytesReady)?;
94 self.available += bytes_ackd;
95 }
96
97 let bytes_to_send_len = {
98 let mut bytes_to_send_len = std::cmp::min(self.available, bytes.len() as u64);
99 if self.blob_len - self.bytes_sent > self.vmo_len {
102 bytes_to_send_len = std::cmp::min(bytes_to_send_len, self.vmo_len / 2)
103 }
104 bytes_to_send_len
105 };
106
107 let (bytes_to_send, remaining_bytes) = bytes.split_at(bytes_to_send_len as usize);
108 bytes = remaining_bytes;
109
110 let vmo_index = self.bytes_sent % self.vmo_len;
111 let (bytes_to_send_before_wrap, bytes_to_send_after_wrap) = bytes_to_send
112 .split_at(std::cmp::min((self.vmo_len - vmo_index) as usize, bytes_to_send.len()));
113
114 self.vmo.write(bytes_to_send_before_wrap, vmo_index).map_err(WriteError::VmoWrite)?;
115 if !bytes_to_send_after_wrap.is_empty() {
116 self.vmo.write(bytes_to_send_after_wrap, 0).map_err(WriteError::VmoWrite)?;
117 }
118
119 let write_fut = self.blob_writer_proxy.bytes_ready(bytes_to_send_len);
120 self.outstanding_writes.push_back(
121 async move {
122 write_fut
123 .await
124 .map(|res| res.map(|()| bytes_to_send_len).map_err(zx::Status::from_raw))
125 }
126 .boxed(),
127 );
128 self.available -= bytes_to_send_len;
129 self.bytes_sent += bytes_to_send_len;
130 }
131 debug_assert!(self.bytes_sent <= self.blob_len);
132
133 if self.bytes_sent == self.blob_len {
135 while let Some(result) =
136 self.outstanding_writes.try_next().await.map_err(WriteError::Fidl)?
137 {
138 match result {
139 Ok(bytes_ackd) => self.available += bytes_ackd,
140 Err(e) => return Err(WriteError::BytesReady(e)),
141 }
142 }
143 if self.available != self.vmo_len {
145 return Err(WriteError::EndOfBlob);
146 }
147 }
148 Ok(())
149 }
150
151 pub fn vmo_size(&self) -> u64 {
152 self.vmo_len
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use assert_matches::assert_matches;
160 use fidl::endpoints::create_proxy_and_stream;
161 use fidl_fuchsia_fxfs::{BlobWriterMarker, BlobWriterRequest};
162 use fuchsia_sync::Mutex;
163 use futures::{pin_mut, select};
164 use rand::{thread_rng, Rng as _};
165 use std::sync::Arc;
166 use zx::HandleBased;
167
168 const VMO_SIZE: usize = 4096;
169
170 async fn check_blob_writer(
171 write_fun: impl FnOnce(BlobWriterProxy) -> BoxFuture<'static, ()>,
172 data: &[u8],
173 writes: &[(usize, usize)],
174 ) {
175 let (proxy, mut stream) = create_proxy_and_stream::<BlobWriterMarker>();
176 let count = Arc::new(Mutex::new(0));
177 let count_clone = count.clone();
178 let expected_count = writes.len();
179 let mut check_vmo = None;
180 let mock_server = async move {
181 while let Some(request) = stream.next().await {
182 match request {
183 Ok(BlobWriterRequest::GetVmo { responder, .. }) => {
184 let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("failed to create vmo");
185 let vmo_dup = vmo
186 .duplicate_handle(zx::Rights::SAME_RIGHTS)
187 .expect("failed to duplicate VMO");
188 check_vmo = Some(vmo);
189 responder.send(Ok(vmo_dup)).unwrap();
190 }
191 Ok(BlobWriterRequest::BytesReady { responder, bytes_written, .. }) => {
192 let vmo = check_vmo.as_ref().unwrap();
193 let mut count_locked = count.lock();
194 let mut buf = vec![0; bytes_written as usize];
195 let data_range = writes[*count_locked];
196 let vmo_offset = data_range.0 % VMO_SIZE;
197 if vmo_offset + bytes_written as usize > VMO_SIZE {
198 let split = VMO_SIZE - vmo_offset;
199 vmo.read(&mut buf[0..split], vmo_offset as u64).unwrap();
200 vmo.read(&mut buf[split..], 0).unwrap();
201 } else {
202 vmo.read(&mut buf, vmo_offset as u64).unwrap();
203 }
204 assert_eq!(bytes_written, (data_range.1 - data_range.0) as u64);
205 assert_eq!(&data[data_range.0..data_range.1], buf);
206 *count_locked += 1;
207 responder.send(Ok(())).unwrap();
208 }
209 _ => {
210 unreachable!()
211 }
212 }
213 }
214 }
215 .fuse();
216
217 pin_mut!(mock_server);
218
219 select! {
220 _ = mock_server => unreachable!(),
221 _ = write_fun(proxy).fuse() => {
222 assert_eq!(*count_clone.lock(), expected_count);
223 }
224 }
225 }
226
227 #[fuchsia::test]
228 async fn invalid_write_past_end_of_blob() {
229 let mut data = [0; VMO_SIZE];
230 thread_rng().fill(&mut data[..]);
231
232 let write_fun = |proxy: BlobWriterProxy| {
233 async move {
234 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
235 .await
236 .expect("failed to create BlobWriter");
237 let () = blob_writer.write(&data).await.unwrap();
238 let invalid_write = [0; 4096];
239 assert_matches!(
240 blob_writer.write(&invalid_write).await,
241 Err(WriteError::EndOfBlob)
242 );
243 }
244 .boxed()
245 };
246
247 check_blob_writer(write_fun, &data, &[(0, VMO_SIZE)]).await;
248 }
249
250 #[fuchsia::test]
251 async fn do_not_split_writes_if_blob_fits_in_vmo() {
252 let mut data = [0; VMO_SIZE - 1];
253 thread_rng().fill(&mut data[..]);
254
255 let write_fun = |proxy: BlobWriterProxy| {
256 async move {
257 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
258 .await
259 .expect("failed to create BlobWriter");
260 let () = blob_writer.write(&data[..]).await.unwrap();
261 }
262 .boxed()
263 };
264
265 check_blob_writer(write_fun, &data, &[(0, 4095)]).await;
266 }
267
268 #[fuchsia::test]
269 async fn split_writes_if_blob_does_not_fit_in_vmo() {
270 let mut data = [0; VMO_SIZE + 1];
271 thread_rng().fill(&mut data[..]);
272
273 let write_fun = |proxy: BlobWriterProxy| {
274 async move {
275 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
276 .await
277 .expect("failed to create BlobWriter");
278 let () = blob_writer.write(&data[..]).await.unwrap();
279 }
280 .boxed()
281 };
282
283 check_blob_writer(write_fun, &data, &[(0, 2048), (2048, 4096), (4096, 4097)]).await;
284 }
285
286 #[fuchsia::test]
287 async fn third_write_wraps() {
288 let mut data = [0; 1024 * 6];
289 thread_rng().fill(&mut data[..]);
290
291 let writes =
292 [(0, 1024 * 2), (1024 * 2, 1024 * 3), (1024 * 3, 1024 * 5), (1024 * 5, 1024 * 6)];
293
294 let write_fun = |proxy: BlobWriterProxy| {
295 async move {
296 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
297 .await
298 .expect("failed to create BlobWriter");
299 for (i, j) in writes {
300 let () = blob_writer.write(&data[i..j]).await.unwrap();
301 }
302 }
303 .boxed()
304 };
305
306 check_blob_writer(write_fun, &data, &writes[..]).await;
307 }
308
309 #[fuchsia::test]
310 async fn many_wraps() {
311 let mut data = [0; VMO_SIZE * 3];
312 thread_rng().fill(&mut data[..]);
313
314 let write_fun = |proxy: BlobWriterProxy| {
315 async move {
316 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
317 .await
318 .expect("failed to create BlobWriter");
319 let () = blob_writer.write(&data[0..1]).await.unwrap();
320 let () = blob_writer.write(&data[1..]).await.unwrap();
321 }
322 .boxed()
323 };
324
325 check_blob_writer(
326 write_fun,
327 &data,
328 &[
329 (0, 1),
330 (1, 2049),
331 (2049, 4097),
332 (4097, 6145),
333 (6145, 8193),
334 (8193, 10241),
335 (10241, 12288),
336 ],
337 )
338 .await;
339 }
340}