blobfs/
mock.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Mock implementation of blobfs for blobfs::Client.
6
7use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
14use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
15
16/// A testing server implementation of /blob.
17///
18/// Mock does not handle requests until instructed to do so.
19pub struct Mock {
20    pub(super) stream: fio::DirectoryRequestStream,
21}
22
23impl Mock {
24    /// Consume the next directory request, verifying it is intended to read the blob identified
25    /// by `merkle`.  Returns a `Blob` representing the open blob file.
26    ///
27    /// # Panics
28    ///
29    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
30    pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
31        match self.stream.next().await {
32            Some(Ok(fio::DirectoryRequest::Open {
33                path,
34                flags,
35                options: _,
36                object,
37                control_handle: _,
38            })) => {
39                assert_eq!(path, merkle.to_string());
40                assert!(flags.contains(fio::PERM_READABLE));
41                assert!(
42                    !flags.intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE)
43                );
44
45                let stream =
46                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
47                        .cast_stream();
48                Blob { stream }
49            }
50            other => panic!("unexpected request: {other:?}"),
51        }
52    }
53
54    /// Consume the next directory request, verifying it is intended to create the blob identified
55    /// by `merkle`.  Returns a `Blob` representing the open blob file.
56    ///
57    /// # Panics
58    ///
59    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
60    pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
61        match self.stream.next().await {
62            Some(Ok(fio::DirectoryRequest::Open {
63                path,
64                flags,
65                options: _,
66                object,
67                control_handle: _,
68            })) => {
69                assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
70                assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
71                let stream =
72                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
73                        .cast_stream();
74                Blob { stream }
75            }
76            other => panic!("unexpected request: {other:?}"),
77        }
78    }
79
80    async fn handle_rewind(&mut self) {
81        match self.stream.next().await {
82            Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
83                responder.send(Status::OK.into_raw()).unwrap();
84            }
85            other => panic!("unexpected request: {other:?}"),
86        }
87    }
88
89    /// Consume directory requests, verifying they are requests to read directory entries.  Respond
90    /// with dirents constructed from the given entries.
91    ///
92    /// # Panics
93    ///
94    /// Panics on error or assertion violation (unexpected requests or not all entries are read)
95    pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
96        // fuchsia_fs::directory starts by resetting the directory channel's readdir position.
97        self.handle_rewind().await;
98
99        const NAME_LEN: usize = 64;
100        #[repr(C, packed)]
101        struct Dirent {
102            ino: u64,
103            size: u8,
104            kind: u8,
105            name: [u8; NAME_LEN],
106        }
107
108        impl Dirent {
109            fn as_bytes(&self) -> &'_ [u8] {
110                let start = self as *const Self as *const u8;
111                // Safe because the FIDL wire format for directory entries is
112                // defined to be the C packed struct representation used here.
113                unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
114            }
115        }
116
117        let mut entries_iter = entries.map(|hash| Dirent {
118            ino: fio::INO_UNKNOWN,
119            size: NAME_LEN as u8,
120            kind: fio::DirentType::File.into_primitive(),
121            name: hash.to_string().as_bytes().try_into().unwrap(),
122        });
123
124        loop {
125            match self.stream.try_next().await.unwrap() {
126                Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
127                    let max_bytes = max_bytes as usize;
128                    assert!(max_bytes >= std::mem::size_of::<Dirent>());
129
130                    let mut buf = vec![];
131                    while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
132                        match entries_iter.next() {
133                            Some(need) => {
134                                buf.extend(need.as_bytes());
135                            }
136                            None => break,
137                        }
138                    }
139
140                    responder.send(Status::OK.into_raw(), &buf).unwrap();
141
142                    // Finish after providing an empty chunk.
143                    if buf.is_empty() {
144                        break;
145                    }
146                }
147                Some(other) => panic!("unexpected request: {other:?}"),
148                None => panic!("unexpected stream termination"),
149            }
150        }
151    }
152
153    /// Consume N directory requests, verifying they are intended to determine whether the blobs
154    /// specified `readable` and `missing` are readable or not, responding to the check based on
155    /// which collection the hash is in.
156    ///
157    /// # Panics
158    ///
159    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
160    pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
161        let mut readable = readable.iter().copied().collect::<HashSet<_>>();
162        let mut missing = missing.iter().copied().collect::<HashSet<_>>();
163
164        while !(readable.is_empty() && missing.is_empty()) {
165            match self.stream.next().await {
166                Some(Ok(fio::DirectoryRequest::Open {
167                    path,
168                    flags,
169                    options: _,
170                    object,
171                    control_handle: _,
172                })) => {
173                    assert!(flags.contains(fio::PERM_READABLE));
174                    assert!(!flags
175                        .intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE));
176                    let path: Hash = path.parse().unwrap();
177
178                    let stream =
179                        fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
180                            .cast_stream();
181                    let blob = Blob { stream };
182                    if readable.remove(&path) {
183                        blob.succeed_open_with_blob_readable().await;
184                    } else if missing.remove(&path) {
185                        blob.fail_open_with_not_found();
186                    } else {
187                        panic!("Unexpected blob existance check for {path}");
188                    }
189                }
190                other => panic!("unexpected request: {other:?}"),
191            }
192        }
193    }
194
195    /// Expects and handles a call to [`Client::filter_to_missing_blobs`].
196    /// Verifies the call intends to determine whether the blobs specified in `readable` and
197    /// `missing` are readable or not, responding to the check based on which collection the hash is
198    /// in.
199    ///
200    /// # Panics
201    ///
202    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
203    pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
204        &mut self,
205        readable: &[Hash],
206        missing: &[Hash],
207    ) {
208        self.expect_readable_missing_checks(readable, missing).await;
209    }
210
211    /// Asserts that the request stream closes without any further requests.
212    ///
213    /// # Panics
214    ///
215    /// Panics on error
216    pub async fn expect_done(mut self) {
217        match self.stream.next().await {
218            None => {}
219            Some(request) => panic!("unexpected request: {request:?}"),
220        }
221    }
222}
223
224/// A testing server implementation of an open /blob/<merkle> file.
225///
226/// Blob does not send the OnOpen event or handle requests until instructed to do so.
227pub struct Blob {
228    stream: fio::FileRequestStream,
229}
230
231impl Blob {
232    fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
233        let event = fidl::Event::create();
234        event.signal_handle(zx::Signals::NONE, signals).unwrap();
235
236        let info =
237            fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
238        let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
239    }
240
241    fn send_on_open(&mut self, status: Status) {
242        self.send_on_open_with_file_signals(status, zx::Signals::NONE);
243    }
244
245    fn send_on_open_with_readable(&mut self, status: Status) {
246        // Send USER_0 signal to indicate that the blob is available.
247        self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
248    }
249
250    fn fail_open_with_error(mut self, status: Status) {
251        assert_ne!(status, Status::OK);
252        self.send_on_open(status);
253    }
254
255    /// Fail the open request with an error indicating the blob already exists.
256    ///
257    /// # Panics
258    ///
259    /// Panics on error
260    pub fn fail_open_with_already_exists(self) {
261        self.fail_open_with_error(Status::ACCESS_DENIED);
262    }
263
264    /// Fail the open request with an error indicating the blob does not exist.
265    ///
266    /// # Panics
267    ///
268    /// Panics on error
269    pub fn fail_open_with_not_found(self) {
270        self.fail_open_with_error(Status::NOT_FOUND);
271    }
272
273    /// Fail the open request with a generic IO error.
274    ///
275    /// # Panics
276    ///
277    /// Panics on error
278    pub fn fail_open_with_io_error(self) {
279        self.fail_open_with_error(Status::IO);
280    }
281
282    /// Succeeds the open request, but indicate the blob is not yet readable by not asserting the
283    /// USER_0 signal on the file event handle, then asserts that the connection to the blob is
284    /// closed.
285    ///
286    /// # Panics
287    ///
288    /// Panics on error
289    pub async fn fail_open_with_not_readable(mut self) {
290        self.send_on_open(Status::OK);
291        self.expect_done().await;
292    }
293
294    /// Succeeds the open request, indicating that the blob is readable, then asserts that the
295    /// connection to the blob is closed.
296    ///
297    /// # Panics
298    ///
299    /// Panics on error
300    pub async fn succeed_open_with_blob_readable(mut self) {
301        self.send_on_open_with_readable(Status::OK);
302        self.expect_done().await;
303    }
304
305    /// Succeeds the open request, then verifies the blob is immediately closed (possibly after
306    /// handling a single Close request).
307    ///
308    /// # Panics
309    ///
310    /// Panics on error
311    pub async fn expect_close(mut self) {
312        self.send_on_open_with_readable(Status::OK);
313
314        match self.stream.next().await {
315            None => {}
316            Some(Ok(fio::FileRequest::Close { responder })) => {
317                let _ = responder.send(Ok(()));
318                self.expect_done().await;
319            }
320            Some(other) => panic!("unexpected request: {other:?}"),
321        }
322    }
323
324    /// Asserts that the request stream closes without any further requests.
325    ///
326    /// # Panics
327    ///
328    /// Panics on error
329    pub async fn expect_done(mut self) {
330        match self.stream.next().await {
331            None => {}
332            Some(request) => panic!("unexpected request: {request:?}"),
333        }
334    }
335
336    async fn handle_read(&mut self, data: &[u8]) -> usize {
337        match self.stream.next().await {
338            Some(Ok(fio::FileRequest::Read { count, responder })) => {
339                let count = min(count.try_into().unwrap(), data.len());
340                responder.send(Ok(&data[..count])).unwrap();
341                count
342            }
343            other => panic!("unexpected request: {other:?}"),
344        }
345    }
346
347    /// Succeeds the open request, then handle read request with the given blob data.
348    ///
349    /// # Panics
350    ///
351    /// Panics on error
352    pub async fn expect_read(mut self, blob: &[u8]) {
353        self.send_on_open_with_readable(Status::OK);
354
355        let mut rest = blob;
356        while !rest.is_empty() {
357            let count = self.handle_read(rest).await;
358            rest = &rest[count..];
359        }
360
361        // Handle one extra request with empty buffer to signal EOF.
362        self.handle_read(rest).await;
363
364        match self.stream.next().await {
365            None => {}
366            Some(Ok(fio::FileRequest::Close { responder })) => {
367                let _ = responder.send(Ok(()));
368            }
369            Some(other) => panic!("unexpected request: {other:?}"),
370        }
371    }
372
373    /// Succeeds the open request. Then handles get_attr, read, read_at, and possibly a final close
374    /// requests with the given blob data.
375    ///
376    /// # Panics
377    ///
378    /// Panics on error
379    pub async fn serve_contents(mut self, data: &[u8]) {
380        self.send_on_open_with_readable(Status::OK);
381
382        let mut pos: usize = 0;
383
384        loop {
385            match self.stream.next().await {
386                Some(Ok(fio::FileRequest::Read { count, responder })) => {
387                    let avail = data.len() - pos;
388                    let count = min(count.try_into().unwrap(), avail);
389                    responder.send(Ok(&data[pos..pos + count])).unwrap();
390                    pos += count;
391                }
392                Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
393                    let pos: usize = offset.try_into().unwrap();
394                    let avail = data.len() - pos;
395                    let count = min(count.try_into().unwrap(), avail);
396                    responder.send(Ok(&data[pos..pos + count])).unwrap();
397                }
398                Some(Ok(fio::FileRequest::GetAttr { responder })) => {
399                    let mut attr = fio::NodeAttributes {
400                        mode: 0,
401                        id: 0,
402                        content_size: 0,
403                        storage_size: 0,
404                        link_count: 0,
405                        creation_time: 0,
406                        modification_time: 0,
407                    };
408                    attr.content_size = data.len().try_into().unwrap();
409                    responder.send(Status::OK.into_raw(), &attr).unwrap();
410                }
411                Some(Ok(fio::FileRequest::Close { responder })) => {
412                    let _ = responder.send(Ok(()));
413                    return;
414                }
415                Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
416                    assert!(flags.contains(fio::VmoFlags::READ));
417                    assert!(!flags.contains(fio::VmoFlags::WRITE));
418                    assert!(!flags.contains(fio::VmoFlags::EXECUTE));
419                    let vmo = zx::Vmo::create(data.len() as u64).unwrap();
420                    vmo.write(data, 0).unwrap();
421                    let vmo = vmo
422                        .replace_handle(
423                            zx::Rights::READ
424                                | zx::Rights::BASIC
425                                | zx::Rights::MAP
426                                | zx::Rights::GET_PROPERTY,
427                        )
428                        .unwrap();
429                    responder.send(Ok(vmo)).unwrap();
430                }
431                None => {
432                    return;
433                }
434                other => panic!("unexpected request: {other:?}"),
435            }
436        }
437    }
438
439    async fn handle_truncate(&mut self, status: Status) -> u64 {
440        match self.stream.next().await {
441            Some(Ok(fio::FileRequest::Resize { length, responder })) => {
442                responder
443                    .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
444                    .unwrap();
445
446                length
447            }
448            other => panic!("unexpected request: {other:?}"),
449        }
450    }
451
452    async fn expect_truncate(&mut self) -> u64 {
453        self.handle_truncate(Status::OK).await
454    }
455
456    async fn handle_write(&mut self, status: Status) -> Vec<u8> {
457        match self.stream.next().await {
458            Some(Ok(fio::FileRequest::Write { data, responder })) => {
459                responder
460                    .send(if status == Status::OK {
461                        Ok(data.len() as u64)
462                    } else {
463                        Err(status.into_raw())
464                    })
465                    .unwrap();
466
467                data
468            }
469            other => panic!("unexpected request: {other:?}"),
470        }
471    }
472
473    async fn fail_write_with_status(mut self, status: Status) {
474        self.send_on_open(Status::OK);
475
476        let length = self.expect_truncate().await;
477        // divide rounding up
478        let expected_write_calls = length.div_ceil(fio::MAX_BUF);
479        for _ in 0..(expected_write_calls - 1) {
480            self.handle_write(Status::OK).await;
481        }
482        self.handle_write(status).await;
483    }
484
485    /// Succeeds the open request, consumes the truncate request, the initial write calls, then
486    /// fails the final write indicating the written data was corrupt.
487    ///
488    /// # Panics
489    ///
490    /// Panics on error
491    pub async fn fail_write_with_corrupt(self) {
492        self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
493    }
494
495    /// Succeeds the open request, then returns a future that, when awaited, verifies the blob is
496    /// truncated, written, and closed with the given `expected` payload.
497    ///
498    /// # Panics
499    ///
500    /// Panics on error
501    pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
502        self.send_on_open(Status::OK);
503
504        async move {
505            assert_eq!(self.expect_truncate().await, expected.len() as u64);
506
507            let mut rest = expected;
508            while !rest.is_empty() {
509                let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
510                    &rest[..fio::MAX_BUF as usize]
511                } else {
512                    rest
513                };
514                assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
515                rest = &rest[expected_chunk.len()..];
516            }
517
518            match self.stream.next().await {
519                Some(Ok(fio::FileRequest::Close { responder })) => {
520                    responder.send(Ok(())).unwrap();
521                }
522                other => panic!("unexpected request: {other:?}"),
523            }
524
525            self.expect_done().await;
526        }
527    }
528}