blob_writer/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_fxfs::BlobWriterProxy;
6
7use futures::future::{BoxFuture, FutureExt as _};
8use futures::stream::{FuturesOrdered, StreamExt as _, TryStreamExt as _};
9
10mod errors;
11pub use errors::{CreateError, WriteError};
12
13/// BlobWriter is a wrapper around the fuchsia.fxfs.BlobWriter fidl protocol. Clients will use this
14/// library to write blobs to disk.
15#[derive(Debug)]
16pub struct BlobWriter {
17    blob_writer_proxy: BlobWriterProxy,
18    vmo: zx::Vmo,
19    // Ordered queue of BytesReady requests. There are at most 2 outstanding requests on the
20    // queue at any point in time. Each BytesReady request takes up at most half the ring
21    // buffer (N).
22    //
23    // Our goal is to be constantly moving bytes out of the network and into storage without ever
24    // having to wait for a fidl roundtrip. Maintaining 2 outstanding requests on the queue allows
25    // us to pipeline requests, so that the server can respond to one request while the client is
26    // creating another. Limiting the size of any particular request to N/2 allows each of the
27    // two requests on the queue to be as big as they possibly can, which is particularly important
28    // when storage is the limiting factor. Namely, we want to avoid situations where the server
29    // has completed a small request and has to wait on a fidl roundtrip (i.e. has to wait for the
30    // network to receive the response, create a new request, and send the request back).
31    outstanding_writes:
32        FuturesOrdered<BoxFuture<'static, Result<Result<u64, zx::Status>, fidl::Error>>>,
33    // Number of bytes that have been written to the vmo, both acknowledged and unacknowledged.
34    bytes_sent: u64,
35    // Number of available bytes in the vmo (the size of the vmo minus the size of unacknowledged
36    // writes).
37    available: u64,
38    // Size of the blob being written.
39    blob_len: u64,
40    // Size of the vmo.
41    vmo_len: u64,
42}
43
44impl BlobWriter {
45    /// Creates a `BlobWriter`.  Exactly `size` bytes are expected to be written into the writer.
46    pub async fn create(
47        blob_writer_proxy: BlobWriterProxy,
48        size: u64,
49    ) -> Result<Self, CreateError> {
50        let vmo = blob_writer_proxy
51            .get_vmo(size)
52            .await
53            .map_err(CreateError::Fidl)?
54            .map_err(zx::Status::from_raw)
55            .map_err(CreateError::GetVmo)?;
56        let vmo_len = vmo.get_size().map_err(CreateError::GetSize)?;
57        Ok(BlobWriter {
58            blob_writer_proxy,
59            vmo,
60            outstanding_writes: FuturesOrdered::new(),
61            bytes_sent: 0,
62            available: vmo_len,
63            blob_len: size,
64            vmo_len,
65        })
66    }
67
68    /// Begins writing `bytes` to the server.
69    ///
70    /// If `bytes` contains all of the remaining unwritten bytes of the blob, i.e. the sum of the
71    /// lengths of the `bytes` slices from this and all prior calls to `write` is equal to the size
72    /// given to `create`, then the returned Future will not complete until all of the writes have
73    /// been acknowledged by the server and the blob can be opened for read.
74    /// Otherwise, the returned Future may complete before the write of `bytes` has been
75    /// acknowledged by the server.
76    ///
77    /// Returns an error if the length of `bytes` exceeds the remaining available space in the
78    /// blob, calculated as per `size`.
79    pub async fn write(&mut self, mut bytes: &[u8]) -> Result<(), WriteError> {
80        if self.bytes_sent + bytes.len() as u64 > self.blob_len {
81            return Err(WriteError::EndOfBlob);
82        }
83        while !bytes.is_empty() {
84            debug_assert!(self.outstanding_writes.len() <= 2);
85            // Wait until there is room in the vmo and fewer than 2 outstanding writes.
86            if self.available == 0 || self.outstanding_writes.len() == 2 {
87                let bytes_ackd = self
88                    .outstanding_writes
89                    .next()
90                    .await
91                    .ok_or_else(|| WriteError::QueueEnded)?
92                    .map_err(WriteError::Fidl)?
93                    .map_err(WriteError::BytesReady)?;
94                self.available += bytes_ackd;
95            }
96
97            let bytes_to_send_len = {
98                let mut bytes_to_send_len = std::cmp::min(self.available, bytes.len() as u64);
99                // If all the remaining bytes do not fit in the vmo, split writes to prevent
100                // blocking the server on an ack roundtrip.
101                if self.blob_len - self.bytes_sent > self.vmo_len {
102                    bytes_to_send_len = std::cmp::min(bytes_to_send_len, self.vmo_len / 2)
103                }
104                bytes_to_send_len
105            };
106
107            let (bytes_to_send, remaining_bytes) = bytes.split_at(bytes_to_send_len as usize);
108            bytes = remaining_bytes;
109
110            let vmo_index = self.bytes_sent % self.vmo_len;
111            let (bytes_to_send_before_wrap, bytes_to_send_after_wrap) = bytes_to_send
112                .split_at(std::cmp::min((self.vmo_len - vmo_index) as usize, bytes_to_send.len()));
113
114            self.vmo.write(bytes_to_send_before_wrap, vmo_index).map_err(WriteError::VmoWrite)?;
115            if !bytes_to_send_after_wrap.is_empty() {
116                self.vmo.write(bytes_to_send_after_wrap, 0).map_err(WriteError::VmoWrite)?;
117            }
118
119            let write_fut = self.blob_writer_proxy.bytes_ready(bytes_to_send_len);
120            self.outstanding_writes.push_back(
121                async move {
122                    write_fut
123                        .await
124                        .map(|res| res.map(|()| bytes_to_send_len).map_err(zx::Status::from_raw))
125                }
126                .boxed(),
127            );
128            self.available -= bytes_to_send_len;
129            self.bytes_sent += bytes_to_send_len;
130        }
131        debug_assert!(self.bytes_sent <= self.blob_len);
132
133        // The last write call should not complete until the blob is completely written.
134        if self.bytes_sent == self.blob_len {
135            while let Some(result) =
136                self.outstanding_writes.try_next().await.map_err(WriteError::Fidl)?
137            {
138                match result {
139                    Ok(bytes_ackd) => self.available += bytes_ackd,
140                    Err(e) => return Err(WriteError::BytesReady(e)),
141                }
142            }
143            // This should not be possible.
144            if self.available != self.vmo_len {
145                return Err(WriteError::EndOfBlob);
146            }
147        }
148        Ok(())
149    }
150
151    pub fn vmo_size(&self) -> u64 {
152        self.vmo_len
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use assert_matches::assert_matches;
160    use fidl::endpoints::create_proxy_and_stream;
161    use fidl_fuchsia_fxfs::{BlobWriterMarker, BlobWriterRequest};
162    use fuchsia_sync::Mutex;
163    use futures::{pin_mut, select};
164    use rand::{thread_rng, Rng as _};
165    use std::sync::Arc;
166    use zx::HandleBased;
167
168    const VMO_SIZE: usize = 4096;
169
170    async fn check_blob_writer(
171        write_fun: impl FnOnce(BlobWriterProxy) -> BoxFuture<'static, ()>,
172        data: &[u8],
173        writes: &[(usize, usize)],
174    ) {
175        let (proxy, mut stream) = create_proxy_and_stream::<BlobWriterMarker>();
176        let count = Arc::new(Mutex::new(0));
177        let count_clone = count.clone();
178        let expected_count = writes.len();
179        let mut check_vmo = None;
180        let mock_server = async move {
181            while let Some(request) = stream.next().await {
182                match request {
183                    Ok(BlobWriterRequest::GetVmo { responder, .. }) => {
184                        let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("failed to create vmo");
185                        let vmo_dup = vmo
186                            .duplicate_handle(zx::Rights::SAME_RIGHTS)
187                            .expect("failed to duplicate VMO");
188                        check_vmo = Some(vmo);
189                        responder.send(Ok(vmo_dup)).unwrap();
190                    }
191                    Ok(BlobWriterRequest::BytesReady { responder, bytes_written, .. }) => {
192                        let vmo = check_vmo.as_ref().unwrap();
193                        let mut count_locked = count.lock();
194                        let mut buf = vec![0; bytes_written as usize];
195                        let data_range = writes[*count_locked];
196                        let vmo_offset = data_range.0 % VMO_SIZE;
197                        if vmo_offset + bytes_written as usize > VMO_SIZE {
198                            let split = VMO_SIZE - vmo_offset;
199                            vmo.read(&mut buf[0..split], vmo_offset as u64).unwrap();
200                            vmo.read(&mut buf[split..], 0).unwrap();
201                        } else {
202                            vmo.read(&mut buf, vmo_offset as u64).unwrap();
203                        }
204                        assert_eq!(bytes_written, (data_range.1 - data_range.0) as u64);
205                        assert_eq!(&data[data_range.0..data_range.1], buf);
206                        *count_locked += 1;
207                        responder.send(Ok(())).unwrap();
208                    }
209                    _ => {
210                        unreachable!()
211                    }
212                }
213            }
214        }
215        .fuse();
216
217        pin_mut!(mock_server);
218
219        select! {
220            _ = mock_server => unreachable!(),
221            _ = write_fun(proxy).fuse() => {
222                assert_eq!(*count_clone.lock(), expected_count);
223            }
224        }
225    }
226
227    #[fuchsia::test]
228    async fn invalid_write_past_end_of_blob() {
229        let mut data = [0; VMO_SIZE];
230        thread_rng().fill(&mut data[..]);
231
232        let write_fun = |proxy: BlobWriterProxy| {
233            async move {
234                let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
235                    .await
236                    .expect("failed to create BlobWriter");
237                let () = blob_writer.write(&data).await.unwrap();
238                let invalid_write = [0; 4096];
239                assert_matches!(
240                    blob_writer.write(&invalid_write).await,
241                    Err(WriteError::EndOfBlob)
242                );
243            }
244            .boxed()
245        };
246
247        check_blob_writer(write_fun, &data, &[(0, VMO_SIZE)]).await;
248    }
249
250    #[fuchsia::test]
251    async fn do_not_split_writes_if_blob_fits_in_vmo() {
252        let mut data = [0; VMO_SIZE - 1];
253        thread_rng().fill(&mut data[..]);
254
255        let write_fun = |proxy: BlobWriterProxy| {
256            async move {
257                let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
258                    .await
259                    .expect("failed to create BlobWriter");
260                let () = blob_writer.write(&data[..]).await.unwrap();
261            }
262            .boxed()
263        };
264
265        check_blob_writer(write_fun, &data, &[(0, 4095)]).await;
266    }
267
268    #[fuchsia::test]
269    async fn split_writes_if_blob_does_not_fit_in_vmo() {
270        let mut data = [0; VMO_SIZE + 1];
271        thread_rng().fill(&mut data[..]);
272
273        let write_fun = |proxy: BlobWriterProxy| {
274            async move {
275                let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
276                    .await
277                    .expect("failed to create BlobWriter");
278                let () = blob_writer.write(&data[..]).await.unwrap();
279            }
280            .boxed()
281        };
282
283        check_blob_writer(write_fun, &data, &[(0, 2048), (2048, 4096), (4096, 4097)]).await;
284    }
285
286    #[fuchsia::test]
287    async fn third_write_wraps() {
288        let mut data = [0; 1024 * 6];
289        thread_rng().fill(&mut data[..]);
290
291        let writes =
292            [(0, 1024 * 2), (1024 * 2, 1024 * 3), (1024 * 3, 1024 * 5), (1024 * 5, 1024 * 6)];
293
294        let write_fun = |proxy: BlobWriterProxy| {
295            async move {
296                let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
297                    .await
298                    .expect("failed to create BlobWriter");
299                for (i, j) in writes {
300                    let () = blob_writer.write(&data[i..j]).await.unwrap();
301                }
302            }
303            .boxed()
304        };
305
306        check_blob_writer(write_fun, &data, &writes[..]).await;
307    }
308
309    #[fuchsia::test]
310    async fn many_wraps() {
311        let mut data = [0; VMO_SIZE * 3];
312        thread_rng().fill(&mut data[..]);
313
314        let write_fun = |proxy: BlobWriterProxy| {
315            async move {
316                let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
317                    .await
318                    .expect("failed to create BlobWriter");
319                let () = blob_writer.write(&data[0..1]).await.unwrap();
320                let () = blob_writer.write(&data[1..]).await.unwrap();
321            }
322            .boxed()
323        };
324
325        check_blob_writer(
326            write_fun,
327            &data,
328            &[
329                (0, 1),
330                (1, 2049),
331                (2049, 4097),
332                (4097, 6145),
333                (6145, 8193),
334                (8193, 10241),
335                (10241, 12288),
336            ],
337        )
338        .await;
339    }
340}